Stateful Store
A Store
is a key-value storage system where incoming data is maintained, and their state is made available for functions. The Store is essential for message synchronization. It is used not only to store data, but also to reject outdated messages. Read more about the stateful computation.
The dev.streamx.quasar.reactive.messaging.Store
interface provides methods to retrieve data for a given key and stream all stored entries. It works with Generic Payload, which allows metadata associated with the payload to be exposed through the interface.
Not all metadata is stored in the Store. If a client needs its own metadata to be available in the Store, it must implement the dev.streamx.quasar.reactive.messaging.metadata.MetadataEntry interface, which is a marker for Quasar to handle such metadata.
|
Store declaration and usage
To use a Store
in your class, declare the Store<T>
field and annotate it with the @FromChannel
annotation. Every message sent to the channel that the Store is created from, will automatically be placed in the Store and made available to all functions across all replicas. The generic type Store<T>
can represent the payload type of the message sent to the incoming channel or any other type if a Message Converter is present and capable of converting the message.
StreamX comes with SyncMessageConverter with the priority of 1. If you provide a converter with this priority you must handle internal sync message conversion.
|
@Inject
@FromChannel("incoming-channel-name")
Store<String> store;
@Incoming("incoming-channel-name")
void sink(String payload) {
String fromStore = store.get("key");
// your logic here
}
@Incoming("incoming-other-channel-name")
void sinkOther(String payload) {
String fromStore = store.get("key");
// your logic here
}
In the example above, you can see that the Store can be used regardless of the channel it was created from. However, be aware that the data inside the Store comes only from incoming-channel-name . You can still declare another store for other channels if needed.
|
When incoming message from Store channel is processed it is guaranteed that the Store has already been populated. The Store can be updated at any time, so you should not expect the same result when fetching data successively. You should store the data in a local variable if you want to use it in different places.
|
Custom Store
For more sophisticated use cases where the Store
interface is insufficient, or the default implementation does not meet client needs, you can define your own Store. To do this, implement a StoreFactory
annotated with @StoreFactoryFor
.
@ApplicationScoped
@StoreFactoryFor(CustomStore.class)
public class CustomStoreFactory implements StoreFactory<CustomStore> {
@Override
public CustomStore createStore(StoreConfiguration configuration) {
return new CustomStoreImpl();
}
@Inject
StoreRegistry storeRegistry;
@Produces
@Typed(CustomStore.class)
// Stream name is ignored during type-safe resolution
@FromChannel("")
CustomStore produce(InjectionPoint injectionPoint) {
String channelName = FromChannelUtils.getChannelName(injectionPoint);
return storeRegistry.getStore(StoreConfiguration.of(channelName, CustomStore.class));
}
}
Store identifier
Each store has a unique identifier used for store configuration. The identifier consists of lowercase:
[channel-name]_[store-type-simple-name]-[value-type-simple-name]
-
[channel-name]
- the name of the channel passed to the@FromChannel
annotation -
[store-type-simple-name]
- the simple name of the class representing the Store -
[value-type-simple-name]
- the simple name of theTypeParameter
passed to the Store
For example, a Store
:
@Inject
@FromChannel("incoming-channel-name")
Store<String> store;
has the identifier incoming-channel-name_store_string
.
Storage backend
By default, Stores are backed by an in-memory map that is fast because it uses memory operations only, but has some drawbacks. If many messages with unique keys sent to the channel that the Store originates from, you can encounter an OutOfMemoryError
depending on your memory settings and the size of the message payloads. Another drawback is that each application restart requires the Store to be recreated from scratch. While this might be sufficient in many cases, if you expect the storage to grow significantly, you might want to switch to RocksDb.
RocksDb uses disk to persist the state, therefore it might be slower. This can affect the processing performance of functions, even if they are not consuming messages from the same channel, depending on the selected synchronization mode. |
To switch between available options (in-memory
, rocksdb
), configure the backend
for each store identifier:
quasar.messaging.store.[store-identifier].backend=rocksdb
To configure the backend globally for all stores, add the following configuration:
quasar.messaging.store.backend=rocksdb
Store connector
By default, Stores use Apache Pulsar under the hood to expose the state across all replicas and achieve message synchronization. However, this might not always be necessary, for example, if your application does not require to share persistent state between replicas or during testing. You can switch the Store’s channels to in-memory
by setting the use-connector
property to false
.
quasar.messaging.store.[store-identifier].use-connector=false
Store with pulsar connector
Stores with pulsar connector have their own topic in stores
namespace to handle both incoming and outgoing channels. The name of the topic is generated by StreamX using streamx.tenant
and streamx.service.id
property passed to the service.
persistent://[streamx.tenant]/stores/[streamx.service.id]_[store-identifier]
Store default configuration
StreamX provides default configuration for both incoming and outgoing store channels:
Incoming channel properties:
mp.messaging.incoming.[channel-name].connector=smallrye-pulsar mp.messaging.incoming.[channel-name].client-configuration=quasar-default-client-config mp.messaging.incoming.[channel-name].consumer-configuration=quasar-default-store-consumer-config mp.messaging.incoming.[channel-name].topic=persistent://${streamx.tenant}/stores/${streamx.service.id}_[store-identifier] mp.messaging.incoming.[channel-name].ack-strategy=cumulative
Outgoing channel properties:
mp.messaging.outgoing.[channel-name].connector=smallrye-pulsar mp.messaging.outgoing.[channel-name].client-configuration=quasar-default-client-config mp.messaging.outgoing.[channel-name].producer-configuration=quasar-default-store-producer-config mp.messaging.outgoing.[channel-name].topic=persistent://${streamx.tenant}/stores/${streamx.service.id}_[store-identifier]
quasar-default-store-consumer-config:
mp.messaging.incoming.[channel-name].subscriptionInitialPosition=Earliest mp.messaging.incoming.[channel-name].subscriptionType=Exclusive mp.messaging.incoming.[channel-name].subscriptionMode=Durable mp.messaging.incoming.[channel-name].subscriptionName=${quasar.application.instance-id} mp.messaging.incoming.[channel-name].poolMessages=true mp.messaging.incoming.[channel-name].readCompacted=true mp.messaging.incoming.[channel-name].batchIndexAckEnabled=true mp.messaging.incoming.[channel-name].ackTimeoutMillis=60000
quasar-default-store-producer-config:
mp.messaging.outgoing.[channel-name].producerName=${quasar.application.instance-id}
quasar-default-client-config:
mp.messaging.incoming/outgoing.[channel-name].serviceUrl=${pulsar.client.serviceUrl}
Message synchronization
Message synchronization is the mechanism that:
-
Rejects outdated messages from processing
-
Ensures consistent state across all Stores within the scope of synchronization
In StreamX, messages are enriched with eventTime
metadata that contains a timestamp defined by application or if not provided populated by ingestion mechanism with current time. This timestamp is used to reject older messages. If a channel is synchronized, each incoming message is processed only after all prior messages have been handled, regardless of network latencies. This applies to all replicas of a given application. There are two phases of message rejection:
-
The incoming message is verified against the Store to check if a more up-to-date message exists. If it does, the incoming message is ignored.
-
If the first step passes, the message is then stored. Before storing, the
eventTime
is validated again to ensure no other replica has updated the Store in the meantime.
A channel is considered within the scope of synchronization depending on the quasar.messaging.store.synchronization.mode
property.
-
ALL
:-
The default synchronization mode
-
Implies synchronization among all stores within the application
-
-
CHANNELS
:-
Specifies channel-based synchronization
-
Only stores associated with the same channel are synchronized
-
Other stores operate independently without synchronization
-
Understanding and choosing the appropriate synchronization mode based on your application’s requirements is crucial for achieving the desired coordination between distributed stores.