This version is still in development and is not considered stable yet. For the latest stable version, please use StreamX Guides 1.0.0!

StreamX Delivery Service

StreamX Delivery Service is a category of a microservice that facilitates the delivery of processed data. What characterizes Delivery Services is that they are the final components of the StreamX mesh. They read data from the outboxes and are responsible for either exposing this data externally or transmitting it to other services exposing the data.

StreamX Mesh

StreamX Delivery Service Extension

StreamX Delivery Service requires the StreamX Delivery Service Extension dependency. The StreamX Delivery Service Extension is a Quarkus Extension that includes the Quasar Extension. StreamX is designed to work with different messaging systems by using SmallRye Reactive Messaging. The default implementation is based on the Apache Pulsar messaging system.

Maven dependency

To use Maven, add the following dependency to your pom.xml and define the version in the version property.

<dependency>
  <groupId>dev.streamx</groupId>
  <artifactId>streamx-delivery-service</artifactId>
  <version>${version}</version>
</dependency>

Adding Streamx Delivery Service Extension supplies the project with the following Quasar capabilities and default configurations for channels.

Quasar capabilities

  • Stateful storage

  • Messages synchronization

  • Outdated messages rejection

  • Metadata enrichment

Supported Metadata

StreamX enables functions to use Quasar Metadata. By default, strict mode is enabled, which means that messages without Quasar metadata are skipped. This ensures safe usage of the metadata. You can change this behavior by disabling the strict mode:

quasar.messaging.metadata.strict=false

When strict mode is disabled, Quasar metadata might not be present, and you must validate it yourself

You can choose from the following types:

  1. Key - Identifier of a data item

    • Key can contain optional namespace prefix. If key contains at least one : character, then fragment before first : character is namespace.

  2. Action - Publication action (publish/unpublish)

  3. EventTime - Time of data publication into StreamX Mesh

Default configuration

Instance ID - identifier of the service instance - should be unique for each replica of the service.

quasar.application.instance-id=<default value is generated UUID>

Default configuration for StreamX Delivery Service consumer

StreamX Extension provides default streamx-delivery-service-consumer-default-config. This default config might be applied to channel by adding line:

 +mp.messaging.incoming.[channel-name].consumer-configuration=streamx-delivery-service-consumer-default-config+

Then such configuration:

mp.messaging.incoming.[channel-name].subscriptionInitialPosition=Earliest
mp.messaging.incoming.[channel-name].subscriptionMode=Durable
mp.messaging.incoming.[channel-name].subscriptionType=Exclusive
mp.messaging.incoming.[channel-name].subscriptionName=${quasar.application.instance-id}
mp.messaging.incoming.[channel-name].readCompacted=true
mp.messaging.incoming.[channel-name].poolMessages=true
mp.messaging.incoming.[channel-name].ackTimeoutMillis=60000

is applied to given channel.

StreamX Extension scans your service and if there are only @Incoming methods it is treated like a delivery service, and the default configuration is applied without any user interaction. For more advanced setup you can disable this behavior by setting streamx.channels.default-config.enabled to false. In that case you should provide all required configurations on your own.

Delivery Function

A Delivery Function is a Java method annotated with single @Incoming annotation. StreamX uses SmallRye Reactive Messaging, so you can use any data consuming method signature offered by SmallRye.

The recommended signature:

@Incoming(INCOMING_CHANNEL)
public Uni<Void> consume(Data data) {...}

You can pass any supported metadata to method arguments. They are optional, so you can choose only the ones you need.

@Incoming(INCOMING_CHANNEL)
public Uni<Void> consume(Data data, Key key, Action action, EventTime eventTime) {...}

Incoming channels

Delivery Functions consume messages from incoming channels and do not send them to any outgoing channels. StreamX uses a messaging system under the hood Apache Pulsar by default. In cases like testing, when you do not need a messaging system, you can use an in-memory channel.

Channel configuration

To switch between available options, you must set up a connector for each incoming channel:

mp.messaging.incoming.[channel-name].connector=smallrye-in-memory

In most cases, Delivery Services consume messages produced by Processing Services, so you need a messaging system to communicate with them.

For Managed Delivery Service StreamX provides a set of default configurations that you can use for incoming channels:

mp.messaging.incoming.[channel-name].connector=smallrye-pulsar
mp.messaging.incoming.[channel-name].consumer-configuration=streamx-delivery-service-consumer-default-config
mp.messaging.incoming.[channel-name].topic=some-example-incoming-topic-name
mp.messaging.incoming.[channel-name].ack-strategy=cumulative

streamx-delivery-service-consumer-default-config identify provided configurations for Pulsar consumer.

You can still override any property for a given channel:

mp.messaging.incoming.[channel-name].propertyName=propertyValue

Channel schema

StreamX works with Avro schema. To register a schema for use in a channel, you must create a data model annotated with @AvroGenerated and include it in signature of a method annotated with @Incoming. The model class must contain a public, no-argument constructor, which is required for the serialization process.

@AvroGenerated
public class Data {

  public Data() {
    // needed for Avro serialization
  }

  // ...
}

You can also use objects that are not annotated with @AvroGenerated, but you must provide a Message Converter in such cases and explicitly configure the channel schema.

import org.apache.pulsar.client.api.Schema;

// ...

@Produces
@Identifier("channel-name")
Schema<Data> dataSchema = Schema.AVRO(Data.class);

Message ordering

Delivery Services should not conduct any message processing on its own. It should rather work on messages processed by Processing Services hence there is no need for stateful computation. There is however a need for message ordering provided by Quasar. To achieve that you can use a Store instance for any @Incoming channel that you want to be ordered.

@FromChannel("incoming-data")
Store<Data> dataStore;

@Incoming(INCOMING_CHANNEL)
public Uni<Void> consume(Data data) {...}

In such cases there is no need for sharing Store across different microservices, so you do not need a messaging system for store. To disable messaging system you must disable connector and switch synchronization mode:

quasar.messaging.store.[store-identifier].use-connector=false
quasar.messaging.store.synchronization.mode=CHANNELS

store-identifier consists of [channel]_[store class simple name]-[store type class simple name].

Message acknowledgment

When you work with GenericPayload or payloads, framework acknowledges the messages for you without any interaction. By default, messages are acked after processing is finished and nacked when exception is thrown from the function. However, when you use the Message<> object, you have to ack or nack the messages manually. If message is not acked in given timeout, it is redelivered and processed again. See SmallRye Message Acknowledgment for more details.

Managed compared to External Delivery Service

There are 2 types of delivery services: managed and external. The types differ in the characteristic.

Characteristic Managed Delivery Service External Delivery Service

data state

stateful - contains outbox data in some form

stateless - does not contain outbox data directly; transfers the data to an external service that manages state

initialization

initializes by synchronization - reading the outboxes from the beginning

no initialization; read outbox from the last position

experience delivery

delivers final experiences to clients directly through one of the containers

delegate delivery of the final experiences to an external service, which is only fed by the Delivery Service

scaling

scalable for handling clients requests

scaling not supported

Managed Delivery Service

The Managed Delivery Service on startup always reads the outbox data from beginning till the end and initialize its own state - this is the initialization phase. After the initialization the service starts to handle requests and continue processing of incoming data.

The Managed Delivery Service reads the outbox differently from External Services - it does not share the data reading between replicas and always starts reading from scratch.

The behaviour of the Managed Delivery Service results with the replication of the data to each independent service replica. That removes single point of failure while scaling the Delivery Service, and allows features like geo-replication by locating replicas in different geolocations.

Failure handling

Managed Delivery Services uses Exclusive subscription type, which does not support DLQ mechanism. See available failure strategies for more details.

External Delivery Service

External Delivery Service is responsible only for reading the outbox data and pushing them to the external service. The current implementation does not support scaling. However, the service is solely responsible for transmitting data to an external service to handle user requests.

To achieve that it must be configured in a different way than Managed Delivery Service. The subscription must be exclusive because the outboxes topics consumed by the Delivery Services are compacted, and compacted topics can only be read by subscriptions with a single active consumer. The subscription must be durable so that a restarted service can continue reading the topic from where it left off.

mp.messaging.incoming.[channel-name].subscriptionType=Exclusive
mp.messaging.incoming.[channel-name].subscriptionMode=Durable
mp.messaging.incoming.[channel-name].subscriptionName=${streamx.service.id}

streamx.service.id - By default is resolved to quarkus.application.name. This id must be the same for each replica.