This version is still in development and is not considered stable yet. For the latest stable version, please use StreamX Guides 1.0.1! |
StreamX Delivery Service
StreamX Delivery Service is a category of a microservice that facilitates the delivery of processed data.
What characterizes Delivery Services is that they are the final components of the StreamX mesh.
They read data from the outboxes
and are responsible for either exposing this data externally or transmitting it to other services exposing the data.
StreamX Delivery Service Extension
StreamX Delivery Service requires the StreamX Delivery Service Extension dependency. The StreamX Delivery Service Extension is a Quarkus Extension that includes the Quasar Extension. StreamX is designed to work with different messaging systems by using SmallRye Reactive Messaging. The default implementation is based on the Apache Pulsar messaging system.
Maven dependency
To use Maven, add the following dependency to your pom.xml
and define the version in the version
property.
<dependency>
<groupId>dev.streamx</groupId>
<artifactId>streamx-delivery-service</artifactId>
<version>${version}</version>
</dependency>
Adding Streamx Delivery Service Extension supplies the project with the following Quasar capabilities and default configurations for channels.
Quasar capabilities
-
Stateful storage
-
Messages synchronization
-
Outdated messages rejection
-
Metadata enrichment
Supported Metadata
StreamX enables functions to use Quasar Metadata. By default, strict mode is enabled, which means that messages without Quasar metadata are skipped. This ensures safe usage of the metadata. You can change this behavior by disabling the strict mode:
quasar.messaging.metadata.strict=false
When strict mode is disabled, Quasar metadata might not be present, and you must validate it yourself |
You can choose from the following types:
-
Key - Identifier of a data item
-
Key can contain optional namespace prefix. If key contains at least one
:
character, then fragment before first:
character is namespace.
-
-
Action - Publication action (publish/unpublish)
-
EventTime - Time of data publication into StreamX Mesh
Default configuration
Instance ID - identifier of the service instance - should be unique for each replica of the service.
quasar.application.instance-id=<default value is generated UUID>
Default configuration for StreamX Delivery Service consumer
StreamX Extension provides default streamx-delivery-service-consumer-default-config
. This default config might be applied to channel by adding line:
+mp.messaging.incoming.[channel-name].consumer-configuration=streamx-delivery-service-consumer-default-config+
Then such configuration:
mp.messaging.incoming.[channel-name].subscriptionInitialPosition=Earliest mp.messaging.incoming.[channel-name].subscriptionMode=Durable mp.messaging.incoming.[channel-name].subscriptionType=Exclusive mp.messaging.incoming.[channel-name].subscriptionName=${quasar.application.instance-id} mp.messaging.incoming.[channel-name].readCompacted=true mp.messaging.incoming.[channel-name].poolMessages=true mp.messaging.incoming.[channel-name].ackTimeoutMillis=60000
is applied to given channel.
StreamX Extension scans your service and if there are only @Incoming methods it is treated like a delivery service, and the default configuration is applied without any user interaction. For more advanced
setup you can disable this behavior by setting streamx.channels.default-config.enabled to false . In that case you should provide all required
configurations on your own.
|
Delivery Function
A Delivery Function is a Java method annotated with single @Incoming
annotation.
StreamX uses SmallRye Reactive Messaging, so you can use any
data consuming method signature
offered by SmallRye.
The recommended signature:
@Incoming(INCOMING_CHANNEL)
public Uni<Void> consume(Data data) {...}
You can pass any supported metadata to method arguments. They are optional, so you can choose only the ones you need.
@Incoming(INCOMING_CHANNEL)
public Uni<Void> consume(Data data, Key key, Action action, EventTime eventTime) {...}
Incoming channels
Delivery Functions consume messages from incoming channels and do not send them to any outgoing channels. StreamX uses a messaging system under the hood Apache Pulsar by default. In cases like testing, when you do not need a messaging system, you can use an in-memory channel.
Channel configuration
To switch between available options, you must set up a connector for each incoming channel:
mp.messaging.incoming.[channel-name].connector=smallrye-in-memory
In most cases, Delivery Services consume messages produced by Processing Services, so you need a messaging system to communicate with them.
For Managed Delivery Service StreamX provides a set of default configurations that you can use for incoming channels:
mp.messaging.incoming.[channel-name].connector=smallrye-pulsar mp.messaging.incoming.[channel-name].consumer-configuration=streamx-delivery-service-consumer-default-config mp.messaging.incoming.[channel-name].topic=some-example-incoming-topic-name mp.messaging.incoming.[channel-name].ack-strategy=cumulative
streamx-delivery-service-consumer-default-config
identify provided configurations for Pulsar consumer.
You can still override any property for a given channel:
mp.messaging.incoming.[channel-name].propertyName=propertyValue
Channel schema
StreamX works with Avro schema.
To register a schema for use in a channel, you must create a data model annotated with @AvroGenerated
and include it in signature of a method annotated with @Incoming
.
The model class must contain a public, no-argument constructor, which is required for the serialization process.
@AvroGenerated
public class Data {
public Data() {
// needed for Avro serialization
}
// ...
}
You can also use objects that are not annotated with @AvroGenerated
, but you must provide a
Message Converter in such cases and explicitly configure the channel schema.
import org.apache.pulsar.client.api.Schema;
// ...
@Produces
@Identifier("channel-name")
Schema<Data> dataSchema = Schema.AVRO(Data.class);
Message ordering
Delivery Services should not conduct any message processing on its own.
It should rather work on messages processed by Processing Services hence there is no need for stateful computation.
There is however a need for message ordering provided by Quasar.
To achieve that you can use a Store
instance for any @Incoming
channel that you want to be ordered.
@FromChannel("incoming-data")
Store<Data> dataStore;
@Incoming(INCOMING_CHANNEL)
public Uni<Void> consume(Data data) {...}
In such cases there is no need for sharing Store
across different microservices, so you do not need a messaging system for store.
To disable messaging system you must disable connector and switch synchronization mode:
quasar.messaging.store.[store-identifier].use-connector=false quasar.messaging.store.synchronization.mode=CHANNELS
store-identifier
consists of [channel]_[store class simple name]-[store type class simple name]
.
Message acknowledgment
When you work with GenericPayload
or payloads, framework acknowledges the messages for you without any interaction.
By default, messages are acked after processing is finished and nacked when exception is thrown from the function.
However, when you use the Message<>
object, you have to ack or nack the messages manually.
If message is not acked in given timeout, it is redelivered and processed again.
See SmallRye Message Acknowledgment for more details.
Managed compared to External Delivery Service
There are 2 types of delivery services: managed and external. The types differ in the characteristic.
Characteristic | Managed Delivery Service | External Delivery Service |
---|---|---|
data state |
stateful - contains outbox data in some form |
stateless - does not contain outbox data directly; transfers the data to an external service that manages state |
initialization |
initializes by synchronization - reading the outboxes from the beginning |
no initialization; read outbox from the last position |
experience delivery |
delivers final experiences to clients directly through one of the containers |
delegate delivery of the final experiences to an external service, which is only fed by the Delivery Service |
scaling |
scalable for handling clients requests |
scaling not supported |
Managed Delivery Service
The Managed Delivery Service on startup always reads the outbox data from beginning till the end and initialize its own state - this is the initialization phase. After the initialization the service starts to handle requests and continue processing of incoming data.
The Managed Delivery Service reads the outbox differently from External Services - it does not share the data reading between replicas and always starts reading from scratch.
The behaviour of the Managed Delivery Service results with the replication of the data to each independent service replica. That removes single point of failure while scaling the Delivery Service, and allows features like geo-replication by locating replicas in different geolocations.
Failure handling
Managed Delivery Services uses Exclusive subscription type, which does not support DLQ mechanism. See available failure strategies for more details.
External Delivery Service
External Delivery Service is responsible only for reading the outbox data and pushing them to the external service. The current implementation does not support scaling. However, the service is solely responsible for transmitting data to an external service to handle user requests.
To achieve that it must be configured in a different way than Managed Delivery Service. The subscription must be exclusive because the outboxes topics consumed by the Delivery Services are compacted, and compacted topics can only be read by subscriptions with a single active consumer. The subscription must be durable so that a restarted service can continue reading the topic from where it left off.
mp.messaging.incoming.[channel-name].subscriptionType=Exclusive mp.messaging.incoming.[channel-name].subscriptionMode=Durable mp.messaging.incoming.[channel-name].subscriptionName=${streamx.service.id}
streamx.service.id
- By default is resolved to quarkus.application.name
.
This id must be the same for each replica.