Stateful Store

A Store is a key-value storage system where incoming data is maintained, and their state is made available for functions. The Store is essential for message synchronization. It is used not only to store data, but also to reject outdated messages. Read more about the stateful computation.

The dev.streamx.quasar.reactive.messaging.Store interface provides methods to retrieve data for a given key and stream all stored entries. It works with Generic Payload, which allows metadata associated with the payload to be exposed through the interface.

Not all metadata is stored in the Store. If a client needs its own metadata to be available in the Store, it must implement the dev.streamx.quasar.reactive.messaging.metadata.MetadataEntry interface, which is a marker for Quasar to handle such metadata.

Store declaration and usage

To use a Store in your class, declare the Store<T> field and annotate it with the @FromChannel annotation. Every message sent to the channel that the Store is created from, will automatically be placed in the Store and made available to all functions across all replicas. The generic type Store<T> can represent the payload type of the message sent to the incoming channel or any other type if a Message Converter is present and capable of converting the message.

StreamX comes with SyncMessageConverter with the priority of 1. If you provide a converter with this priority you must handle internal sync message conversion.
  @Inject
  @FromChannel("incoming-channel-name")
  Store<String> store;

  @Incoming("incoming-channel-name")
  void sink(String payload) {
    String fromStore = store.get("key");
    // your logic here
  }

  @Incoming("incoming-other-channel-name")
  void sinkOther(String payload) {
    String fromStore = store.get("key");
    // your logic here
  }
In the example above, you can see that the Store can be used regardless of the channel it was created from. However, be aware that the data inside the Store comes only from incoming-channel-name. You can still declare another store for other channels if needed.
When incoming message from Store channel is processed it is guaranteed that the Store has already been populated. The Store can be updated at any time, so you should not expect the same result when fetching data successively. You should store the data in a local variable if you want to use it in different places.

Custom Store

For more sophisticated use cases where the Store interface is insufficient, or the default implementation does not meet client needs, you can define your own Store. To do this, implement a StoreFactory annotated with @StoreFactoryFor.

@ApplicationScoped
@StoreFactoryFor(CustomStore.class)
public class CustomStoreFactory implements StoreFactory<CustomStore> {

  @Override
  public CustomStore createStore(StoreConfiguration configuration) {
    return new CustomStoreImpl();
  }

  @Inject
  StoreRegistry storeRegistry;

  @Produces
  @Typed(CustomStore.class)
  // Stream name is ignored during type-safe resolution
  @FromChannel("")
  CustomStore produce(InjectionPoint injectionPoint) {
    String channelName = FromChannelUtils.getChannelName(injectionPoint);
    return storeRegistry.getStore(StoreConfiguration.of(channelName, CustomStore.class));
  }
}

Store identifier

Each store has a unique identifier used for store configuration. The identifier consists of lowercase:

[channel-name]_[store-type-simple-name]-[value-type-simple-name]
  • [channel-name] - the name of the channel passed to the @FromChannel annotation

  • [store-type-simple-name] - the simple name of the class representing the Store

  • [value-type-simple-name] - the simple name of the TypeParameter passed to the Store

For example, a Store:

@Inject
@FromChannel("incoming-channel-name")
Store<String> store;

has the identifier incoming-channel-name_store_string.

Storage backend

By default, Stores are backed by an in-memory map that is fast because it uses memory operations only, but has some drawbacks. If many messages with unique keys sent to the channel that the Store originates from, you can encounter an OutOfMemoryError depending on your memory settings and the size of the message payloads. Another drawback is that each application restart requires the Store to be recreated from scratch. While this might be sufficient in many cases, if you expect the storage to grow significantly, you might want to switch to RocksDb.

RocksDb uses disk to persist the state, therefore it might be slower. This can affect the processing performance of functions, even if they are not consuming messages from the same channel, depending on the selected synchronization mode.

To switch between available options (in-memory, rocksdb), configure the backend for each store identifier:

quasar.messaging.store.[store-identifier].backend=rocksdb

To configure the backend globally for all stores, add the following configuration:

quasar.messaging.store.backend=rocksdb

Store connector

By default, Stores use Apache Pulsar under the hood to expose the state across all replicas and achieve message synchronization. However, this might not always be necessary, for example, if your application does not require to share persistent state between replicas or during testing. You can switch the Store’s channels to in-memory by setting the use-connector property to false.

quasar.messaging.store.[store-identifier].use-connector=false

Store with pulsar connector

Stores with pulsar connector have their own topic in stores namespace to handle both incoming and outgoing channels. The name of the topic is generated by StreamX using streamx.tenant and streamx.service.id property passed to the service.

persistent://[streamx.tenant]/stores/[streamx.service.id]_[store-identifier]

Store default configuration

StreamX provides default configuration for both incoming and outgoing store channels:

Incoming channel properties:

mp.messaging.incoming.[channel-name].connector=smallrye-pulsar
mp.messaging.incoming.[channel-name].client-configuration=quasar-default-client-config
mp.messaging.incoming.[channel-name].consumer-configuration=quasar-default-store-consumer-config
mp.messaging.incoming.[channel-name].topic=persistent://${streamx.tenant}/stores/${streamx.service.id}_[store-identifier]
mp.messaging.incoming.[channel-name].ack-strategy=cumulative

Outgoing channel properties:

mp.messaging.outgoing.[channel-name].connector=smallrye-pulsar
mp.messaging.outgoing.[channel-name].client-configuration=quasar-default-client-config
mp.messaging.outgoing.[channel-name].producer-configuration=quasar-default-store-producer-config
mp.messaging.outgoing.[channel-name].topic=persistent://${streamx.tenant}/stores/${streamx.service.id}_[store-identifier]

quasar-default-store-consumer-config:

mp.messaging.incoming.[channel-name].subscriptionInitialPosition=Earliest
mp.messaging.incoming.[channel-name].subscriptionType=Exclusive
mp.messaging.incoming.[channel-name].subscriptionMode=Durable
mp.messaging.incoming.[channel-name].subscriptionName=${quasar.application.instance-id}
mp.messaging.incoming.[channel-name].poolMessages=true
mp.messaging.incoming.[channel-name].readCompacted=true
mp.messaging.incoming.[channel-name].batchIndexAckEnabled=true
mp.messaging.incoming.[channel-name].ackTimeoutMillis=60000

quasar-default-store-producer-config:

mp.messaging.outgoing.[channel-name].producerName=${quasar.application.instance-id}

quasar-default-client-config:

mp.messaging.incoming/outgoing.[channel-name].serviceUrl=${pulsar.client.serviceUrl}

Message synchronization

Message synchronization is the mechanism that:

  • Rejects outdated messages from processing

  • Ensures consistent state across all Stores within the scope of synchronization

In StreamX, messages are enriched with eventTime metadata that contains a timestamp defined by application or if not provided populated by ingestion mechanism with current time. This timestamp is used to reject older messages. If a channel is synchronized, each incoming message is processed only after all prior messages have been handled, regardless of network latencies. This applies to all replicas of a given application. There are two phases of message rejection:

  1. The incoming message is verified against the Store to check if a more up-to-date message exists. If it does, the incoming message is ignored.

  2. If the first step passes, the message is then stored. Before storing, the eventTime is validated again to ensure no other replica has updated the Store in the meantime.

A channel is considered within the scope of synchronization depending on the quasar.messaging.store.synchronization.mode property.

  1. ALL:

    • The default synchronization mode

    • Implies synchronization among all stores within the application

  2. CHANNELS:

    • Specifies channel-based synchronization

    • Only stores associated with the same channel are synchronized

    • Other stores operate independently without synchronization

Understanding and choosing the appropriate synchronization mode based on your application’s requirements is crucial for achieving the desired coordination between distributed stores.