This version is still in development and is not considered stable yet. For the latest stable version, please use StreamX Guides 1.0.0!

StreamX Processing Service

StreamX Processing Service is a category of a microservice that facilitates the processing of data pipelines. You can use them for building blocks in the processing layer of the StreamX Mesh. StreamX Processing Service Extension uses Quasar framework, which enables stateful computation, messages synchronization between channels and service instances, and ensures no outdated message are processed. It plays a key role in executing business logic and transforming data, situated in the middle of StreamX Mesh, between inboxes and outboxes channels.

StreamX Mesh

StreamX Processing Service Extension

StreamX Processing Service requires the StreamX Processing Service Extension dependency. The StreamX Processing Service Extension is a Quarkus Extension that includes the Quasar Extension. StreamX is designed to work with different messaging systems by using SmallRye Reactive Messaging. The default implementation is based on the Apache Pulsar messaging system.

Maven dependency

To use Maven, add the following dependency to your pom.xml and define the version in the version property.

<dependency>
  <groupId>dev.streamx</groupId>
  <artifactId>streamx-processing-service</artifactId>
  <version>${version}</version>
</dependency>

Adding Streamx Processing Service Extension supplies the project with the following Quasar capabilities and default configurations for channels.

Quasar capabilities

  • Stateful storage

  • Messages synchronization

  • Outdated messages rejection

  • Metadata enrichment

Supported Metadata

StreamX enables functions to use Quasar Metadata. By default, strict mode is enabled, which means that messages without Quasar metadata are skipped. This ensures safe usage of the metadata. You can change this behavior by disabling the strict mode:

quasar.messaging.metadata.strict=false

When strict mode is disabled, Quasar metadata might not be present, and you must validate it yourself

You can choose from the following types:

  1. Key - Identifier of a data item

    • Key can contain optional namespace prefix. If key contains at least one : character, then fragment before first : character is namespace.

  2. Action - Publication action (publish/unpublish)

  3. EventTime - Time of data publication into StreamX Mesh

Default configuration

Service ID - identifier of the service - should be common for all replicas of the service.

streamx.service.id=${quarkus.application.name}

Default configuration for StreamX Processing Service consumer

StreamX Extension provides default streamx-processing-service-consumer-default-config. This default config might be applied to channel by adding line:

mp.messaging.incoming.[channel-name].consumer-configuration=streamx-processing-service-consumer-default-config

Then such configuration:

mp.messaging.incoming.[channel-name].subscriptionInitialPosition=Earliest
mp.messaging.incoming.[channel-name].subscriptionType=Shared
mp.messaging.incoming.[channel-name].subscriptionMode=Durable
mp.messaging.incoming.[channel-name].subscriptionName=${streamx.service.id}
mp.messaging.incoming.[channel-name].deadLetterPolicy.initialSubscriptionName=${streamx.service.id}
mp.messaging.incoming.[channel-name].deadLetterPolicy.maxRedeliverCount=3
mp.messaging.incoming.[channel-name].poolMessages=true
mp.messaging.incoming.[channel-name].ackTimeoutMillis=60000

is applied to given channel.

Default configuration for StreamX Processing Service producer

StreamX Extension provides also default streamx-processing-service-producer-default-config. This default config might be applied to channel by adding line:

mp.messaging.outgoing.[channel-name].producer-configuration=streamx-processing-service-producer-default-config

Then such configuration:

mp.messaging.outgoing.[channel-name].producerName=${streamx.service.id}
mp.messaging.outgoing.[channel-name].accessMode=Shared

is applied to given channel.

StreamX Extension scans your service and if there are only processing methods, it is considered a processing service, and default configurations are applied without any user interaction. For more advanced setup you can disable this behavior by setting streamx.channels.default-config.enabled to false. In that case you should provide all required configurations on your own.

Processing Function

In most cases, Processing Function is a Java method annotated with both @Incoming and @Outgoing annotations. However, there might be some special cases where you might need just @Incoming or @Outgoing which is also supported. StreamX uses SmallRye Reactive Messaging, so you can use any data processing method signature offered by SmallRye.

The recommended signature uses GenericPayload.

@Incoming(INCOMING_CHANNEL)
@Outgoing(OUTGOING_CHANNEL)
public GenericPayload<Page> relay(Page page) { ... }

You can pass any supported metadata to method arguments. They are optional, so you can choose only the ones you need.

@Incoming(INCOMING_CHANNEL)
@Outgoing(OUTGOING_CHANNEL)
public GenericPayload<Page> relay(Page page, Key key, Action action, EventTime eventTime) { ... }

In cases where you want to create multiple outputs from a single input, you must work with Messages.

@Incoming(INCOMING_CHANNEL)
@Outgoing(OUTGOING_CHANNEL)
public Multi<Message<Page>> relay(Message<Page> message) { ... }

Some functions in processing services might only consume messages and outgoing messages might be sent out by using Emitter. In such cases, you can use only @Incoming annotation and Emitter

@Incoming(INCOMING_CHANNEL)
public void consume(Payload payload) { ... }
@Inject
@Channel(OUTGOING_CHANNEL)
Emitter<OutputPayload> emitter;

public void send(OutputPayload out) {
  emitter.send(out);
}

Messaging channels

Processing functions typically consume messages from external sources and send them out through an outgoing channel. In such scenarios, StreamX uses a messaging system under the hood. By default, StreamX uses Apache Pulsar. In cases such as testing, where you do not need a messaging system, you can use an in-memory channel.

Channel configuration

To switch between the available options, you must set up a connector for each incoming and outgoing channel:

mp.messaging.incoming.[incoming-channel-name].connector=smallrye-in-memory
mp.messaging.outgoing.[outgoing-channel-name].connector=smallrye-in-memory

In most cases, you want your data processed by different processing functions (different microservices), so you need a messaging system to communicate with them. StreamX provides a set of default configurations that you should use for both incoming and outgoing channels:

mp.messaging.incoming.[incoming-channel-name].connector=smallrye-pulsar
mp.messaging.incoming.[incoming-channel-name].consumer-configuration=streamx-processing-service-consumer-default-config
mp.messaging.incoming.[incoming-channel-name].topic=inPagesTopicName
mp.messaging.outgoing.[outgoing-channel-name].connector=smallrye-pulsar
mp.messaging.outgoing.[outgoing-channel-name].producer-configuration=streamx-processing-service-producer-default-config
mp.messaging.outgoing.[outgoing-channel-name].topic=outPagesTopicName

streamx-processing-service-consumer-default-config and streamx-processing-service-producer-default-config identify provided configurations for Pulsar consumer and producer.

You can still override any property for a given channel:

mp.messaging.outgoing.[channel-name].propertyName=propertyValue

Channel schema

StreamX works with Avro schema. To register a schema for use in a channel, you must create a data model annotated with @AvroGenerated and include it in signature of a method annotated with @Incoming. The model class must contain a public, no-argument constructor, which is required for the serialization process.

@AvroGenerated
public class Data {

  public Data() {
    // needed for Avro serialization
  }

  // ...
}

You can also use objects that are not annotated with @AvroGenerated, but you must provide a Message Converter in such cases and explicitly configure the channel schema.

import org.apache.pulsar.client.api.Schema;

// ...

@Produces
@Identifier("channel-name")
Schema<Data> dataSchema = Schema.AVRO(Data.class);

When any service registers a new schema for inboxes channels, it is automatically available for the REST Ingestion Service.

Blocking processing

Processing function should be fast because they are executed on the event loop thread. However you might have to run some long running code like heavy processing or blocking IO operations. In this case, annotate your method with the @Blocking annotation, which moves the processing to the worker thread.

To read more about reactive architecture on Quarkus guide.

Store

Store can keep the state of the data from the incoming channel. A processing function can use zero, one, or many stores. Stores enable functions to process incoming data based on the state of incoming channels. Quasar ensures that data in the Store is fresh and all service instances share the same view of the data. Stores backed by Pulsar topics have schemas generated automatically from generic types. Like processing methods, Stores also support message converters.

Store declaration and usage

Annotate Store<Data> with @FromChannel("incoming-data") to access the state of the incoming-data channel.

@ApplicationScoped
public class FunctionBean {

  @FromChannel("incoming-data")
  Store<Data> dataStore;

  @Incoming("incoming-data")
  @Outgoing("outgoing-data")
  public GenericPayload<Data> process(Data data, Key key) {
    Data fromStore = dataStore.get("some key");
    // processing data with fromStore data
    return GenericPayload.of(processedData, newKey);
  }
}

Store keeps the most recent data. You should never call it twice while processing a single message, because the data could change in the meantime. Instead, create a variable to hold the data from the store.

Store synchronization

Stores keep data in sync between service instances and between Store instances. This means a function can start processing a message once all stores are in sync (for example, have all data present until the message arrived). You can choose from two available synchronization modes:

  1. ALL - Every store in the application is up to date. Default mode.

  2. CHANNELS - Every store created from the same channel is up to date.

Use quasar.messaging.store.synchronization.mode=CHANNELS to change the sync mode.

Use connector

Stores do not always have to be backed by a messaging system. When you do not need data synchronization and persistence, you can exclude it by configuring:

quasar.messaging.store.[store-identifier].use-connector=false

[store-identifier] consists of [channel]_[store class simple name]-[store type class simple name].

Multiple instances

StreamX is designed to be scalable. You can use any number of processing function (service instances) required. The Quasar framework ensures that data in Stores are in sync between instances. The load is distributed among all instances using Shared subscriptions for incoming channels.

Message acknowledgment

When you work with GenericPayload or payloads, framework acknowledges the messages for you without any interaction. By default, messages are acked after processing is finished and nacked when exception is thrown from the function. However, when you use the Message<> object, you have to ack or nack the messages manually. If message is not acked in given timeout, it is redelivered and processed again. See SmallRye Message Acknowledgment for more details.

Failure handling

There are several cases where things can go wrong. Few examples: function throws exception, message can not be processed and is nacked manually or message could not be delivered within timeout. Message is redelivered configurable number of times and if it reaches the limit, it is sent to dead letter topic. You can use this topic to take some action on failed messages. The Dead-letter-topic is created in the same namespace where incoming channel’s topic resides. The name of Dead-letter-topic is [channel’s topic name]-DLQ.

You can change this behavior by setting a different failure strategy.