Create your own Processing Service

This guide shows you how to create your own Processing Service from scratch.

Prerequisites

To complete this guide, you will need:

  • Roughly 60 minutes

  • OpenJDK 17+ installed with JAVA_HOME configured appropriately

  • An IDE of your choice

  • Apache Maven 3.9.6+

  • Docker Engine installed and running

  • A Docker registry of your choice, with read/write access

For a deeper understanding of the concepts behind a Processing Service, refer to this reference either before starting or while progressing through the tutorial.

Step 1: Create a Processing Service project from template

Processing services are developed in the form of Java projects utilizing the Quarkus framework.

To create a template for your service, run the following maven command:

mvn io.quarkus.platform:quarkus-maven-plugin:3.15.1:create \
  -DprojectGroupId=dev.streamx.docs \
  -DprojectArtifactId=sample-processing-service

The latest guide for creating a Quarkus application can be found at Creating Your First Application.

Step 2: Configure the generated project template

To make your project a Processing Service, include the StreamX Processing Service Extension by adding the following entry to your pom.xml file. Use the newest release version of streamx.

  <dependency>
    <groupId>dev.streamx</groupId>
    <artifactId>streamx-processing-service-deployment</artifactId>
    <version>${streamx.version}</version>
  </dependency>

Add StreamX releases repository

The Streamx Processing Service Extension is provided in the form of a maven artifact hosted in StreamX Public Releases repository. Add the below streamx-maven-public-releases repository definition to your pom.xml:

  <repositories>
    <repository>
      <id>streamx-maven-public-releases</id>
      <url>https://europe-west1-maven.pkg.dev/streamx-releases/streamx-maven-public-releases</url>
    </repository>
  </repositories>

Step 3: Implement sample Processing Service that produces capitalized Strings

The Java code for a Processing Service is where the business logic should be implemented according to the user’s requirements.

In this chapter, you will implement a basic Processing Service that receives string messages and produces outgoing messages with the strings in capitalized form.

Data model

To begin, create the data model class. It is a simple wrapper for java String object. The format that must be used for model classes is AVRO. For additional details, refer to the Channel Schema reference.

package dev.streamx.docs.sampleprocessingservice;

import org.apache.avro.specific.AvroGenerated;

@AvroGenerated
public class StringWrapper {

  private String value;

  public StringWrapper() {
    // needed for Avro serialization
  }

  public StringWrapper(String value) {
    this.value = value;
  }

  public String getValue() {
    return value;
  }
}

Service implementation

Here is the source code of the service:

package dev.streamx.docs.sampleprocessingservice;

import dev.streamx.quasar.reactive.messaging.metadata.Action;
import io.smallrye.reactive.messaging.GenericPayload;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class StringCapitalizationService {

  (1)
  @Incoming("incoming-strings")
  @Outgoing("capitalized-strings")
  public GenericPayload<StringWrapper> processRequest(StringWrapper incomingString, Action action) {
    (2)
    if (Action.PUBLISH.equals(action)) { (3)
      StringWrapper capitalized = new StringWrapper(capitalize(incomingString.getValue()));
      return GenericPayload.of(capitalized);
    } else if (Action.UNPUBLISH.equals(action)) { (4)
      return GenericPayload.of(null);
    } else { (5)
      return null;
    }
  }

  private static String capitalize(String inputString) {
    if (inputString == null || inputString.isEmpty()) {
      return inputString;
    }
    char firstLetter = Character.toUpperCase(inputString.charAt(0));
    return firstLetter + inputString.substring(1);
  }
}
1 The core of the service is the processRequest method. By annotating the method with @Incoming and @Outgoing annotations, you specify the channels it reads from and writes to.
2 Each incoming message is associated with one of two actions: PUBLISH or UNPUBLISH. The service processes the incoming message and returns a GenericPayload object, with metadata copied from the incoming message.
3 For the PUBLISH action, the service returns a payload with the string capitalized to the outgoing channel.
4 For the UNPUBLISH action, no payload is returned, because the unpublish request is simply forwarded for further processing to another service.
5 There are only two predefined actions, so if a message with any other action is received, the method returns null to consume the message without further processing.

The next service in the StreamX Mesh pipeline is responsible for handling the outgoing published data.

There are two possibilities:

  • the next service is another Processing Service that further processes the data

  • the next service is a Delivery Service, which might for example persist or remove the data from a database

Step 4: Unit testing the string capitalization service

This chapter outlines the recommended approach for unit testing a Processing Service. You will write unit tests for the string capitalization service.

Maven dependencies

Add the following test dependencies to your pom.xml file:

    <dependency>
      <groupId>io.smallrye.reactive</groupId>
      <artifactId>smallrye-reactive-messaging-in-memory</artifactId> (1)
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.awaitility</groupId>
      <artifactId>awaitility</artifactId> (2)
      <version>4.2.2</version>
      <scope>test</scope>
    </dependency>
1 The smallrye-reactive-messaging-in-memory dependency provides support for testing channels communication with an in-memory connector.
2 Because the communication is asynchronous, you must implement cyclic pooling of the destination channel with a library of your choice. In this example, we use org.awaitility for this purpose.

Properties file

Create application.properties file in src/test/resources directory of your project, with the following content:

mp.messaging.incoming.incoming-strings.connector=smallrye-in-memory (1)
mp.messaging.outgoing.capitalized-strings.connector=smallrye-in-memory (1)

quarkus.pulsar.devservices.enabled=false (2)
1 Configure incoming and outgoing channels to operate in the in-memory mode
2 Prevent the Pulsar container from starting during tests, since the tests use in-memory channels

Unit test class

package dev.streamx.docs.sampleprocessingservice;

import static dev.streamx.quasar.reactive.messaging.metadata.Action.PUBLISH;
import static dev.streamx.quasar.reactive.messaging.metadata.Action.UNPUBLISH;
import static dev.streamx.quasar.reactive.messaging.utils.MetadataUtils.extractAction;
import static dev.streamx.quasar.reactive.messaging.utils.MetadataUtils.extractKey;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import dev.streamx.quasar.reactive.messaging.metadata.Action;
import dev.streamx.quasar.reactive.messaging.metadata.EventTime;
import dev.streamx.quasar.reactive.messaging.metadata.Key;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import java.time.Duration;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@QuarkusTest
class StringCapitalizationServiceTest {

  private InMemorySource<Message<StringWrapper>> stringsSource;
  private InMemorySink<StringWrapper> capitalizedStringsSink;

  @Inject
  @Any
  InMemoryConnector connector;

  (1)
  @BeforeEach
  void init() {
    stringsSource = connector.source("incoming-strings");
    capitalizedStringsSink = connector.sink("capitalized-strings");
    capitalizedStringsSink.clear();
  }

  (2)
  @Test
  void shouldPublishCapitalizedStringToOutgoingChannel() {
    // given
    String key = "key-1";
    String value = "hello world";

    // when
    publishString(key, value);

    // then
    assertOutgoingPublishMessage(key, "Hello world");
  }

  (3)
  @Test
  void shouldRelayUnpublishRequestToOutgoingChannel() {
    // given
    String key = "key-2";

    // when
    unpublishString(key);

    // then
    assertOutgoingUnpublishMessage(key);
  }

  private void publishString(String key, String value) {
    StringWrapper wrappedString = new StringWrapper(value);
    stringsSource.send(Message.of(wrappedString, createMetadata(key, PUBLISH)));
  }

  private void unpublishString(String key) {
    stringsSource.send(Message.of(null, createMetadata(key, UNPUBLISH)));
  }

  private static Metadata createMetadata(String key, Action action) {
    return Metadata.of(
        Key.of(key),
        EventTime.of(System.currentTimeMillis()),
        action
    );
  }

  private void assertOutgoingPublishMessage(String key, String expectedValue) {
    Message<StringWrapper> resultMessage = waitForMessage();
    assertEquals(key, extractKey(resultMessage));
    assertEquals(PUBLISH, extractAction(resultMessage));
    assertEquals(expectedValue, resultMessage.getPayload().getValue());
  }

  private void assertOutgoingUnpublishMessage(String key) {
    Message<StringWrapper> resultMessage = waitForMessage();
    assertEquals(key, extractKey(resultMessage));
    assertEquals(UNPUBLISH, extractAction(resultMessage));
    assertNull(resultMessage.getPayload());
  }

  private Message<StringWrapper> waitForMessage() {
    await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
        assertEquals(1, capitalizedStringsSink.received().size())
    );

    return capitalizedStringsSink.received().get(0);
  }
}
1 The init() method connects the service’s incoming channel to an in-memory source and links the outgoing channel to an in-memory sink
2 This test demonstrates the standard processing path, where the service generates an outgoing message containing the incoming string in capitalized version
3 When the service receives an unpublish request, the request is relayed to the outgoing channel, allowing another service in the pipeline to handle it, for example by removing it from its data store.

Step 5: More advanced features of Processing Services - weather conditions service

In the previous chapters, you have explored a simple example of a service that generates capitalized strings. Now, you will delve into more advanced features that can enhance and enrich your Processing Service.

To achieve this, you will implement a second example of a Processing Service that delivers basic weather information for a specified city. This service internally collects and integrates data from temperature and wind sources to produce a short textual weather conditions summary.

Create data model

To begin, create the data model classes in AVRO format. For additional details, refer to the Channel Schema reference.

The data model classes for our sample service are simple wrappers for temperature and wind speed, but can be extended with any number of additional fields while the requirements grow.

Here are the model classes:

package dev.streamx.docs.sampleprocessingservice;

import org.apache.avro.specific.AvroGenerated;

@AvroGenerated
public class Temperature {

  private int temperature;

  public Temperature() {
    // needed for Avro serialization
  }

  public Temperature(int temperature) {
    this.temperature = temperature;
  }

  public int getTemperature() {
    return temperature;
  }
}
package dev.streamx.docs.sampleprocessingservice;

import org.apache.avro.specific.AvroGenerated;

@AvroGenerated
public class Wind {

  private int speed;

  public Wind() {
    // needed for Avro serialization
  }

  public Wind(int speed) {
    this.speed = speed;
  }

  public int getSpeed() {
    return speed;
  }
}
package dev.streamx.docs.sampleprocessingservice;

import org.apache.avro.specific.AvroGenerated;

@AvroGenerated
public class WeatherConditions {

  private String weatherConditions;

  public WeatherConditions() {
    // needed for Avro serialization
  }

  public WeatherConditions(String weatherConditions) {
    this.weatherConditions = weatherConditions;
  }

  public String getWeatherConditions() {
    return weatherConditions;
  }
}

Properties file

Create src/main/resources/application.properties file with the following content:

mp.messaging.outgoing.weather-conditions.merge=true

This configuration is necessary because the service you’re implementing will compute or recompute weather conditions whenever any of the data sources (winds, temperatures) sends new data. Therefore, both @Incoming methods might trigger publishing messages to the same outgoing channel.

Service implementation

Here is the complete code of the weather conditions service:

package dev.streamx.docs.sampleprocessingservice;

import dev.streamx.quasar.reactive.messaging.Store;
import dev.streamx.quasar.reactive.messaging.annotations.FromChannel;
import dev.streamx.quasar.reactive.messaging.metadata.Action;
import dev.streamx.quasar.reactive.messaging.metadata.Key;
import dev.streamx.quasar.reactive.messaging.utils.MetadataUtils;
import io.smallrye.reactive.messaging.GenericPayload;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class WeatherConditionsProcessingService {

  (1)
  @FromChannel("temperatures")
  Store<Temperature> temperaturesPerCityStore;

  (1)
  @FromChannel("winds")
  Store<Wind> windsPerCityStore;

  @Incoming("temperatures")
  @Outgoing("weather-conditions")
  public GenericPayload<WeatherConditions> consumeTemperature(Temperature temperature, Key key,
      Action action) {
    (2)
    if (action.equals(Action.PUBLISH)) {
      Wind windForCity = getPublishedPayload(windsPerCityStore, key);
      if (windForCity != null) {
        return computeWeatherConditions(temperature, windForCity);
      }
    }
    return null;
  }

  @Incoming("winds")
  @Outgoing("weather-conditions")
  public GenericPayload<WeatherConditions> consumeWind(Wind wind, Key key, Action action) {
    (3)
    if (action.equals(Action.PUBLISH)) {
      Temperature temperatureForCity = getPublishedPayload(temperaturesPerCityStore, key);
      if (temperatureForCity != null) {
        return computeWeatherConditions(temperatureForCity, wind);
      }
    }
    return null;
  }

  private static GenericPayload<WeatherConditions> computeWeatherConditions(Temperature temperature,
      Wind wind) {
    String weatherConditionsString = "Temperature: %d, wind speed: %d".formatted(
        temperature.getTemperature(), wind.getSpeed()
    );
    WeatherConditions weatherConditions = new WeatherConditions(weatherConditionsString);
    return GenericPayload.of(weatherConditions);
  }

  (4)
  private <T> T getPublishedPayload(Store<T> store, Key key) {
    GenericPayload<T> storedPayload = store.getWithMetadata(key.getValue());
    if (storedPayload == null) {
      return null;
    }
    if (MetadataUtils.extractAction(storedPayload).equals(Action.UNPUBLISH)) {
      return null;
    }
    return storedPayload.getPayload();
  }
}
1 The Service uses Stores to maintain last received temperature and wind speed data for each city. You can find more detailed information about how Stores work in Stores reference.
2 The consumeTemperature method reads temperature messages from the incoming channel. When a temperature is published, it checks for the most recent wind data. If wind data is found, the weather conditions for the specified city are computed and the result is sent to the outgoing channel. The message is sent with metadata copied from the input message, including the city key.
3 The consumeWind method functions similarly to consumeTemperature, computing the weather conditions for the requested city when all necessary data is available. Neither method produces results for unpublish messages. Both methods use an extended @Incoming method signature (see the Processing Service reference for available method signatures) to simplify access to key and action metadata fields.
4 The getPublishedPayload method retrieves saved data for a city from the provided Store. If the data hasn’t yet been published or has been unpublished, null is returned, to indicate that weather conditions cannot be computed in these cases.

Step 6: Unit testing the weather conditions service

This chapter outlines the recommended approach for unit testing a Processing Service. It shares the key concepts that we presented in Step 4: Unit testing the string capitalization service.

Properties file

Add the following entries to src/test/resources/application.properties file:

mp.messaging.incoming.temperatures.connector=smallrye-in-memory (1)
mp.messaging.incoming.winds.connector=smallrye-in-memory (1)
mp.messaging.outgoing.weather-conditions.connector=smallrye-in-memory (1)

quasar.messaging.store.temperatures_store-temperature.use-connector=false (2)
quasar.messaging.store.winds_store-wind.use-connector=false (2)
1 Configure incoming and outgoing channels to operate in the in-memory mode
2 Disable the use of a connector for the Stores, as described in the Connector reference

Unit test class

Here is the unit test class for the weather conditions service:

package dev.streamx.docs.sampleprocessingservice;

import static dev.streamx.quasar.reactive.messaging.metadata.Action.PUBLISH;
import static dev.streamx.quasar.reactive.messaging.utils.MetadataUtils.extractAction;
import static dev.streamx.quasar.reactive.messaging.utils.MetadataUtils.extractKey;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

import dev.streamx.quasar.reactive.messaging.metadata.Action;
import dev.streamx.quasar.reactive.messaging.metadata.EventTime;
import dev.streamx.quasar.reactive.messaging.metadata.Key;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Set;
import java.util.stream.Collectors;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@QuarkusTest
class WeatherConditionsProcessingServiceTest {

  private InMemorySource<Message<Temperature>> temperaturesSource;
  private InMemorySource<Message<Wind>> windsSource;

  private InMemorySink<WeatherConditions> weatherConditionsSink;

  @Inject
  @Any
  InMemoryConnector connector;

  (1)
  @BeforeEach
  void init() {
    temperaturesSource = connector.source("temperatures");
    windsSource = connector.source("winds");

    weatherConditionsSink = connector.sink("weather-conditions");
    weatherConditionsSink.clear();
  }

  (2)
  @Test
  void shouldComputeWeatherConditions_AndPublishResultToOutgoingChannel() {
    // given
    String city = "New York";

    // when
    publishTemperature(city, 30);
    publishWind(city, 5);

    // then
    assertOutgoingPublishMessage(city, "Temperature: 30, wind speed: 5");
  }

  (3)
  @Test
  void shouldUseMostRecentDataToComputeWeatherConditions_AndPublishResultToOutgoingChannel() {
    // given
    String city = "Tokio";

    // when
    publishTemperature(city, 30);
    publishTemperature(city, 20);
    publishTemperature(city, 10);

    publishWind(city, 5);
    publishWind(city, 15);
    publishWind(city, 25);

    // then
    assertOutgoingPublishMessage(city, "Temperature: 10, wind speed: 25");
  }

  (4)
  @Test
  void shouldNotProduceOutgoingMessage_WhenMissingSourceData() {
    // given
    String city = "Paris";

    // when: temperature is published, but wind is not published
    publishTemperature(city, 5);

    // then
    assertNoOutgoingMessages();
  }

  private void publishTemperature(String city, int temperature) {
    Message<Temperature> message = createPublishMessage(city, new Temperature(temperature));
    temperaturesSource.send(message);
  }

  private void publishWind(String city, int speed) {
    Message<Wind> message = createPublishMessage(city, new Wind(speed));
    windsSource.send(message);
  }

  private static <T> Message<T> createPublishMessage(String city, T payload) {
    Metadata metadata = Metadata.of(
        Key.of(city),
        EventTime.of(System.currentTimeMillis()),
        Action.PUBLISH
    );
    return Message.of(payload, metadata);
  }

  private void assertOutgoingPublishMessage(String expectedCity, String expectedWeatherConditions) {
    await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> {
      Set<Message<WeatherConditions>> matchingMessages = weatherConditionsSink.received()
          .stream()
          .filter(message ->
              extractKey(message).equals(expectedCity)
              && extractAction(message).equals(PUBLISH)
              && message.getPayload().getWeatherConditions().equals(expectedWeatherConditions))
          .collect(Collectors.toSet());
      assertFalse(matchingMessages.isEmpty());
    });
  }

  private void assertNoOutgoingMessages() {
    await().during(Duration.ofSeconds(1)).untilAsserted(() ->
        assertEquals(0, weatherConditionsSink.received().size())
    );
  }
}
1 The init() method connects each of the service’s incoming channels to an in-memory source and links the outgoing channel to an in-memory sink
2 The test demonstrates the standard processing path, where the service generates an outgoing message containing the computed weather conditions for the requested city by using previously collected component data
3 The test highlights one of the fundamental features of Stores: they prevent the processing of outdated data by retaining only the most recent version for each key
4 If only temperature data is available for the requested city, the response message cannot be created

Step 7: Message converters

In the previous sections, we discussed the use of Stores. Each Store is populated with messages from the topics associated with its channel. If direct access to payloads from a Store is not required and you only need the Store to filter out outdated messages, you can optimize it to store less data.

Stores make use of Message Converters to determine what part of the incoming message should be saved instead of saving the whole original message. Metadata is always stored for every message.

A common use case is to store only the most recent action for each message. This approach can simplify the code needed to verify whether data from all component sources is available (and not unpublished) for generating a summary result.

Below are the additional code snippets needed to implement this functionality. Add them to the existing weather conditions service codebase, created earlier in this guide.

Add custom action-based Store

Before you implement a message converter, let’s first declare a custom Store that will make use of the message converter.

Open the WeatherConditionsProcessingService source file and declare the Store (along with a getter required for unit testing):

  @Inject
  @FromChannel(WINDS_INCOMING_CHANNEL)
  Store<String> windActionsPerCityStore;

  public Store<String> getWindActionsPerCityStore() {
    return windActionsPerCityStore;
  }

Message converter

Create the Java class below that can produce modified copies of messages based on the value of the Action input metadata field. This message converter will be used to add the modified messages to the Store of the corresponding type.

package dev.streamx.docs.sampleprocessingservice;

import dev.streamx.quasar.reactive.messaging.metadata.Action;
import dev.streamx.quasar.reactive.messaging.utils.MetadataUtils;
import io.smallrye.reactive.messaging.MessageConverter;
import jakarta.enterprise.context.ApplicationScoped;
import java.lang.reflect.Type;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class MessageToActionConverter implements MessageConverter {

  @Override
  public boolean canConvert(Message<?> message, Type type) {
    return type == String.class && message.getMetadata(Action.class).isPresent();
  }

  @Override
  public Message<?> convert(Message<?> message, Type type) {
    Action action = MetadataUtils.extractAction(message);
    return message.withPayload(action.getValue());
  }
}

Test properties file

In the existing src/test/resources/application.properties file, add the following configuration line to turn off connector usage for the newly added store, as described in the Connector reference

quasar.messaging.store.winds_store-string.use-connector=false

Unit test class

Now, let’s write a unit test that verifies behavior of the custom Store. In the WeatherConditionsProcessingServiceTest, add the following code blocks:

import dev.streamx.quasar.reactive.messaging.Store;

  @Inject
  WeatherConditionsProcessingService weatherConditionsProcessingService;

  @Test
  void shouldStoreLastActionForEachWind() {
    // given
    publishWind("Madrid", 15);
    publishWind("Barcelona", 20);
    unpublishWind("Madrid");

    // when
    Store<String> store = weatherConditionsProcessingService.getWindActionsPerCityStore();

    // then
    assertEquals(UNPUBLISH.getValue(), store.get("Madrid"));
    assertEquals(PUBLISH.getValue(), store.get("Barcelona"));
  }

The test ensures that the windActionsPerCityStore contains only the last action of wind messages for each city.

Step 8: Build a Docker Image for your Processing Service

So far, you have only run the implemented services in unit tests. The next step is to integrate them into a StreamX Mesh, where they will operate in production mode.

Processing Services are packaged and deployed in the form of Docker images. To enable building a Docker image for your service during the project build, open the pom.xml file of the sample-processing-service project. Locate the quarkus-maven-plugin definition, and add the following execution goal to the existing list under executions/execution/goals:

<goal>image-build</goal>

Rebuild the project:

mvn clean install

Once the build is complete, verify that an image with sample-processing-service in its name exists in your local Docker instance. Then, push the image to your Docker registry, making it available for inclusion in a StreamX Mesh YAML file.

Summary

Congratulations on successfully creating your first Processing Service! You are now equipped to develop your own production-ready services.

Next steps

Now that you have a Docker image with your Processing Service, it’s ready to be integrated into the StreamX Mesh, where services operate in production mode.

To run a StreamX Mesh, in addition to a Processing Service, at least one Delivery Service is required. This is because a Processing Service sends processed data to a Pulsar topic, and it’s the Delivery Service that handles delivering the data to the final destinations.

Once you’ve completed both the Processing and Delivery Service tutorials, you are ready to create your first StreamX Mesh!