StreamX Delivery Service

StreamX Delivery Service is a category of a microservice that facilitates the delivery of processed data. What characterizes Delivery Services is that they are the final components of the StreamX mesh. They read data from the outboxes and are responsible for either exposing this data externally or transmitting it to other services exposing the data.

streamx mesh

StreamX Service Extension

StreamX Service requires the StreamX Service Extension dependency. The StreamX Service Extension is a Quarkus Extension that includes the Quasar Extension. StreamX is designed to work with different messaging systems by using SmallRye Reactive Messaging. The default implementation is based on the Apache Pulsar messaging system.

Maven Dependency

To use Maven, add the following dependency to your pom.xml and define the version in the version property.

<dependency>
  <groupId>dev.streamx</groupId>
  <artifactId>streamx-service</artifactId>
  <version>${version}</version>
</dependency>

Adding Streamx Service Extension supplies the project with the following Quasar capabilities and default configurations for channels.

Quasar capabilities

  • Stateful storage

  • Messages synchronization

  • Outdated messages rejection

  • Metadata enrichment

Supported Metadata

StreamX enables functions to use Quasar Metadata. By default, strict mode is enabled, which means that messages without Quasar metadata are skipped. This ensures safe usage of the metadata. You can change this behavior by disabling the strict mode:

quasar.messaging.metadata.strict=false

When strict mode is disabled, Quasar metadata might not be present, and you need to validate it yourself

You can choose from the following types:

  1. Key - Identifier of a data item

  2. Action - Publication action (publish/unpublish)

  3. EventTime - Time of data publication into StreamX Mesh

Default configuration

Instance ID - identifier of the service instance - should be unique for each replica of the service.

quasar.application.instance-id=<default value is generated UUID>

streamx-delivery-service-consumer-default-config

StreamX Extension provides default streamx-processing-service-consumer-default-config. This default config may be applied to channel by adding line mp.messaging.incoming.[channel-name].consumer-configuration=streamx-delivery-service-consumer-default-config

Then such configuration as below is applied to given channel:

mp.messaging.[channel-name].subscriptionInitialPosition=Earliest
mp.messaging.[channel-name].subscriptionMode=NonDurable
mp.messaging.[channel-name].subscriptionType=Exclusive
mp.messaging.[channel-name].subscriptionName=${quasar.application.instance-id}
mp.messaging.[channel-name].readCompacted=true
mp.messaging.[channel-name].poolMessages=true
mp.messaging.[channel-name].ackTimeoutMillis=60000
StreamX Extension scans your service and if there are only @Incoming methods it is treated as delivery service and default configuration is applied without any user interaction. For more advanced setup you can disable this behavior by setting streamx.channels.default-config.enabled to false. In that case you should provide all required configurations on your own.

Delivery Function

A Delivery Function is a Java method annotated with single @Incoming annotation. StreamX uses SmallRye Reactive Messaging, so you can use any data consuming method signature offered by SmallRye.

The recommended signature:

@Incoming(INCOMING_CHANNEL)
public Uni<Void> consume(Data data) {...}

You can pass any supported metadata to method arguments. They are optional, so you can choose only the ones you need.

@Incoming(INCOMING_CHANNEL)
public Uni<Void> consume(Data data, Key key, Action action, EventTime eventTime) {...}

Incoming Channels

Delivery Functions consume messages from incoming channels and do not send them to any outgoing channels. StreamX uses a messaging system under the hood Apache Pulsar by default. In cases like testing, when you do not need a messaging system, you can use an in-memory channel.

Channel Configuration

To switch between available options, you need to set up a connector for each incoming channel:

mp.messaging.incoming.[channel-name].connector=smallrye-in-memory

In most cases, Delivery Services consume messages produced by Processing Services, so you need a messaging system to communicate with them.

For Managed Delivery Service StreamX provides a set of default configurations that you can use for incoming channels:

mp.messaging.incoming.[channel-name].connector=smallrye-pulsar
mp.messaging.incoming.[channel-name].consumer-configuration=streamx-delivery-service-consumer-default-config
mp.messaging.incoming.[channel-name].topic=some-example-incoming-topic-name

streamx-delivery-service-consumer-default-config identify provided configurations for Pulsar consumer.

You can still override any property for a given channel:

mp.messaging.incoming.[channel-name].propertyName=propertyValue

Channel Schema

StreamX works with Avro schema. To register a schema to be used by a channel, you need to create a data model annotated with @AvroGenerated and use it in signature of a method annotated with @Incoming. The model class must contain a public, no-argument constructor, which is required for the serialization process.

@AvroGenerated
public class Data {

  public Data() {
    // needed for Avro serialization
  }

  // ...
}

You can also use objects which are not annotated with @AvroGenerated, but you need to provide a Message Converter in such cases and explicitly configure the channel schema.

import org.apache.pulsar.client.api.Schema;

// ...

@Produces
@Identifier("channel-name")
Schema<Data> dataSchema = Schema.AVRO(Data.class);

Message ordering

Delivery Services should not conduct any message processing on its own. It should rather work on messages processed by Processing Services hence there is no need for stateful computation. There is however a need for message ordering provided by Quasar. To achieve that you can use a Store instance for any @Incoming channel that you need to be ordered.

@FromChannel("incoming-data")
Store<Data> dataStore;

@Incoming(INCOMING_CHANNEL)
public Uni<Void> consume(Data data) {...}

In such cases there is no need for sharing Store across different microservices, so you do not need a messaging system for store. To disable messaging system you need to disable connector and switch synchronization mode:

quasar.messaging.store.[store-identifier].use-connector=false
quasar.messaging.store.synchronization.mode=CHANNELS

store-identifier consists of [channel]_[store class simple name]-[store type class simple name].

Message acknowledgment

When you work with GenericPayload or payloads, framework acknowledges the messages for you without any interaction. By default, messages are acked after processing is finished and nacked when exception is thrown from the function. However, when you use the Message<> object, you have to ack or nack the messages manually. If message is not acked in given timeout, it is redelivered and processed again. See SmallRye Message Acknowledgment for more details.

Managed vs External Delivery Services

There are 2 types of delivery services: managed and external. The types differ in the characteristic.

Characteristic Managed Delivery Service External Delivery Service

data state

stateful - contains outbox data in some form

stateless - does not contain outbox data directly; transfers the data to an external service that manages state

initialization

initializes by synchronization - reading the outboxes from the beginning

no initialization; read outbox from the last position

experience delivery

delivers final experiences to clients directly through one of the containers

delegate delivery of the final experiences to an external service, which is only fed by the Delivery Service

scaling

scalable for handling clients requests

scaling not supported

Managed Delivery Service

The Managed Delivery Service on startup always reads the outbox data from beginning till the end and initialize its own state - this is the initialization phase. After the initialization the service starts to handle requests and continue processing of incoming data.

The Managed Delivery Service reads the outbox differently from External Services - it does not share the data reading between replicas and always starts reading from scratch.

The behaviour of the Managed Delivery Service results with the replication of the data to each independent service replica. That removes single point of failure while scaling the Delivery Service, and allows features like geo-replication by locating replicas in different geolocations.

Failure Handling

Managed Delivery Services uses Exclusive subscription type, which does not support DLQ mechanism. See available failure strategies for more details.

External Delivery Service

External Delivery Service is responsible only for reading the outbox data and pushing them to the external service. The current implementation does not support scaling. However, the service is solely responsible for transmitting data to an external service to handle user requests.

To achieve that it needs to be configured in a different way than Managed Delivery Service. The subscription must be exclusive because the outboxes topics consumed by the Delivery Services are compacted, and compacted topics can only be read by subscriptions with a single active consumer. The subscription must be durable so that a restarted service can continue reading the topic from where it left off.

mp.messaging.incoming.[channel-name].subscriptionType=Exclusive
mp.messaging.incoming.[channel-name].subscriptionMode=Durable
mp.messaging.incoming.[channel-name].subscriptionName=${streamx.service.id}

streamx.service.id - By default is resolved to quarkus.application.name. This id needs to be the same for each replica.