Create your own Processing Service
This guide shows you how to create your own Processing Service from scratch.
Prerequisites
To complete this guide, you will need:
-
Roughly 60 minutes
-
OpenJDK 17+ installed with JAVA_HOME configured appropriately
-
An IDE of your choice
-
Apache Maven 3.9.6+
-
A Docker registry of your choice, with read/write access
For a deeper understanding of the concepts behind a Processing Service, refer to this reference either before starting or while progressing through the tutorial.
Step 1: Create a Processing Service project from template
Processing services are developed in the form of Java projects utilizing the Quarkus framework.
To create a template for your service, run the following maven command:
mvn io.quarkus.platform:quarkus-maven-plugin:3.15.1:create \
-DprojectGroupId=dev.streamx.docs \
-DprojectArtifactId=sample-processing-service
The latest guide for creating a Quarkus application can be found at Creating Your First Application.
Step 2: Configure the generated project template
To make your project a Processing Service, include the StreamX Processing Service Extension by adding the following entry to your pom.xml
file. Use the newest release version of streamx.
<dependency>
<groupId>dev.streamx</groupId>
<artifactId>streamx-processing-service</artifactId>
<version>${streamx.version}</version>
</dependency>
Add StreamX releases repository
The Streamx Processing Service Extension is provided in the form of a maven artifact hosted in StreamX Public Releases repository. Add the below streamx-maven-public-releases
repository definition to your pom.xml
:
<repositories>
<repository>
<id>streamx-maven-public-releases</id>
<url>https://europe-west1-maven.pkg.dev/streamx-releases/streamx-maven-public-releases</url>
</repository>
</repositories>
Step 3: Implement sample Processing Service that produces capitalized Strings
The Java code for a Processing Service is where the business logic should be implemented according to the user’s requirements.
In this chapter, you will implement a basic Processing Service that receives string messages and produces outgoing messages with the strings in capitalized form.
Data model
To begin, create the data model class. It is a simple wrapper for java String object. The format that must be used for model classes is AVRO. For additional details, refer to the Channel Schema reference.
package dev.streamx.docs.sampleprocessingservice;
import org.apache.avro.specific.AvroGenerated;
@AvroGenerated
public class StringWrapper {
private String value;
public StringWrapper() {
// needed for Avro serialization
}
public StringWrapper(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}
Service implementation
Here is the source code of the service:
package dev.streamx.docs.sampleprocessingservice;
import dev.streamx.quasar.reactive.messaging.metadata.Action;
import io.smallrye.reactive.messaging.GenericPayload;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class StringCapitalizationService {
(1)
@Incoming("incoming-strings")
@Outgoing("capitalized-strings")
public GenericPayload<StringWrapper> processRequest(StringWrapper incomingString, Action action) {
(2)
if (Action.PUBLISH.equals(action)) { (3)
StringWrapper capitalized = new StringWrapper(capitalize(incomingString.getValue()));
return GenericPayload.of(capitalized);
} else if (Action.UNPUBLISH.equals(action)) { (4)
return GenericPayload.of(null);
} else { (5)
return null;
}
}
private static String capitalize(String inputString) {
if (inputString == null || inputString.isEmpty()) {
return inputString;
}
char firstLetter = Character.toUpperCase(inputString.charAt(0));
return firstLetter + inputString.substring(1);
}
}
1 | The core of the service is the processRequest method. By annotating the method with @Incoming and @Outgoing annotations, you specify the channels it reads from and writes to. |
2 | Each incoming message is associated with one of two actions: PUBLISH or UNPUBLISH . The service processes the incoming message and returns a GenericPayload object, with metadata copied from the incoming message. |
3 | For the PUBLISH action, the service returns a payload with the string capitalized to the outgoing channel. |
4 | For the UNPUBLISH action, no payload is returned, because the unpublish request is simply forwarded for further processing to another service. |
5 | There are only two predefined actions, so if a message with any other action is received, the method returns null to consume the message without further processing. |
The next service in the StreamX Mesh pipeline is responsible for handling the outgoing published data.
There are two possibilities:
-
the next service is another Processing Service that further processes the data
-
the next service is a Delivery Service, which might for example persist or remove the data from a database
Step 4: Unit testing the string capitalization service
This chapter outlines the recommended approach for unit testing a Processing Service. You will write unit tests for the string capitalization service.
Maven dependencies
Add the following test dependencies to your pom.xml
file:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-in-memory</artifactId> (1)
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId> (2)
<version>4.2.2</version>
<scope>test</scope>
</dependency>
1 | The smallrye-reactive-messaging-in-memory dependency provides support for testing channels communication with an in-memory connector. |
2 | Because the communication is asynchronous, you must implement cyclic pooling of the destination channel with a library of your choice. In this example, we use org.awaitility for this purpose. |
Properties file
Create application.properties
file in src/test/resources
directory of your project, with the following content:
mp.messaging.incoming.incoming-strings.connector=smallrye-in-memory (1)
mp.messaging.outgoing.capitalized-strings.connector=smallrye-in-memory (1)
quarkus.pulsar.devservices.enabled=false (2)
1 | Configure incoming and outgoing channels to operate in the in-memory mode |
2 | Prevent the Pulsar container from starting during tests, since the tests use in-memory channels |
Unit test class
package dev.streamx.docs.sampleprocessingservice;
import static dev.streamx.quasar.reactive.messaging.metadata.Action.PUBLISH;
import static dev.streamx.quasar.reactive.messaging.metadata.Action.UNPUBLISH;
import static dev.streamx.quasar.reactive.messaging.utils.MetadataUtils.extractAction;
import static dev.streamx.quasar.reactive.messaging.utils.MetadataUtils.extractKey;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import dev.streamx.quasar.reactive.messaging.metadata.Action;
import dev.streamx.quasar.reactive.messaging.metadata.EventTime;
import dev.streamx.quasar.reactive.messaging.metadata.Key;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import java.time.Duration;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@QuarkusTest
class StringCapitalizationServiceTest {
private InMemorySource<Message<StringWrapper>> stringsSource;
private InMemorySink<StringWrapper> capitalizedStringsSink;
@Inject
@Any
InMemoryConnector connector;
(1)
@BeforeEach
void init() {
stringsSource = connector.source("incoming-strings");
capitalizedStringsSink = connector.sink("capitalized-strings");
capitalizedStringsSink.clear();
}
(2)
@Test
void shouldPublishCapitalizedStringToOutgoingChannel() {
// given
String key = "key-1";
String value = "hello world";
// when
publishString(key, value);
// then
assertOutgoingPublishMessage(key, "Hello world");
}
(3)
@Test
void shouldRelayUnpublishRequestToOutgoingChannel() {
// given
String key = "key-2";
// when
unpublishString(key);
// then
assertOutgoingUnpublishMessage(key);
}
private void publishString(String key, String value) {
StringWrapper wrappedString = new StringWrapper(value);
stringsSource.send(Message.of(wrappedString, createMetadata(key, PUBLISH)));
}
private void unpublishString(String key) {
stringsSource.send(Message.of(null, createMetadata(key, UNPUBLISH)));
}
private static Metadata createMetadata(String key, Action action) {
return Metadata.of(
Key.of(key),
EventTime.of(System.currentTimeMillis()),
action
);
}
private void assertOutgoingPublishMessage(String key, String expectedValue) {
Message<StringWrapper> resultMessage = waitForMessage();
assertEquals(key, extractKey(resultMessage));
assertEquals(PUBLISH, extractAction(resultMessage));
assertEquals(expectedValue, resultMessage.getPayload().getValue());
}
private void assertOutgoingUnpublishMessage(String key) {
Message<StringWrapper> resultMessage = waitForMessage();
assertEquals(key, extractKey(resultMessage));
assertEquals(UNPUBLISH, extractAction(resultMessage));
assertNull(resultMessage.getPayload());
}
private Message<StringWrapper> waitForMessage() {
await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
assertEquals(1, capitalizedStringsSink.received().size())
);
return capitalizedStringsSink.received().get(0);
}
}
1 | The init() method connects the service’s incoming channel to an in-memory source and links the outgoing channel to an in-memory sink |
2 | This test demonstrates the standard processing path, where the service generates an outgoing message containing the incoming string in capitalized version |
3 | When the service receives an unpublish request, the request is relayed to the outgoing channel, allowing another service in the pipeline to handle it, for example by removing it from its data store. |
Step 5: More advanced features of Processing Services - weather conditions service
In the previous chapters, you have explored a simple example of a service that generates capitalized strings. Now, you will delve into more advanced features that can enhance and enrich your Processing Service.
To achieve this, you will implement a second example of a Processing Service that delivers basic weather information for a specified city. This service internally collects and integrates data from temperature and wind sources to produce a short textual weather conditions summary.
Create data model
To begin, create the data model classes in AVRO format. For additional details, refer to the Channel Schema reference.
The data model classes for our sample service are simple wrappers for temperature and wind speed, but can be extended with any number of additional fields while the requirements grow.
Here are the model classes:
package dev.streamx.docs.sampleprocessingservice;
import org.apache.avro.specific.AvroGenerated;
@AvroGenerated
public class Temperature {
private int temperature;
public Temperature() {
// needed for Avro serialization
}
public Temperature(int temperature) {
this.temperature = temperature;
}
public int getTemperature() {
return temperature;
}
}
package dev.streamx.docs.sampleprocessingservice;
import org.apache.avro.specific.AvroGenerated;
@AvroGenerated
public class Wind {
private int speed;
public Wind() {
// needed for Avro serialization
}
public Wind(int speed) {
this.speed = speed;
}
public int getSpeed() {
return speed;
}
}
package dev.streamx.docs.sampleprocessingservice;
import org.apache.avro.specific.AvroGenerated;
@AvroGenerated
public class WeatherConditions {
private String weatherConditions;
public WeatherConditions() {
// needed for Avro serialization
}
public WeatherConditions(String weatherConditions) {
this.weatherConditions = weatherConditions;
}
public String getWeatherConditions() {
return weatherConditions;
}
}
Properties file
Create src/main/resources/application.properties
file with the following content:
mp.messaging.outgoing.weather-conditions.merge=true
This configuration is necessary because the service you’re implementing will compute or recompute weather conditions whenever any of the data sources (winds, temperatures) sends new data. Therefore, both @Incoming
methods might trigger publishing messages to the same outgoing channel.
Service implementation
Here is the complete code of the weather conditions service:
package dev.streamx.docs.sampleprocessingservice;
import dev.streamx.quasar.reactive.messaging.Store;
import dev.streamx.quasar.reactive.messaging.annotations.FromChannel;
import dev.streamx.quasar.reactive.messaging.metadata.Action;
import dev.streamx.quasar.reactive.messaging.metadata.Key;
import dev.streamx.quasar.reactive.messaging.utils.MetadataUtils;
import io.smallrye.reactive.messaging.GenericPayload;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class WeatherConditionsProcessingService {
(1)
@FromChannel("temperatures")
Store<Temperature> temperaturesPerCityStore;
(1)
@FromChannel("winds")
Store<Wind> windsPerCityStore;
@Incoming("temperatures")
@Outgoing("weather-conditions")
public GenericPayload<WeatherConditions> consumeTemperature(Temperature temperature, Key key,
Action action) {
(2)
if (action.equals(Action.PUBLISH)) {
Wind windForCity = getPublishedPayload(windsPerCityStore, key);
if (windForCity != null) {
return computeWeatherConditions(temperature, windForCity);
}
}
return null;
}
@Incoming("winds")
@Outgoing("weather-conditions")
public GenericPayload<WeatherConditions> consumeWind(Wind wind, Key key, Action action) {
(3)
if (action.equals(Action.PUBLISH)) {
Temperature temperatureForCity = getPublishedPayload(temperaturesPerCityStore, key);
if (temperatureForCity != null) {
return computeWeatherConditions(temperatureForCity, wind);
}
}
return null;
}
private static GenericPayload<WeatherConditions> computeWeatherConditions(Temperature temperature,
Wind wind) {
String weatherConditionsString = "Temperature: %d, wind speed: %d".formatted(
temperature.getTemperature(), wind.getSpeed()
);
WeatherConditions weatherConditions = new WeatherConditions(weatherConditionsString);
return GenericPayload.of(weatherConditions);
}
(4)
private <T> T getPublishedPayload(Store<T> store, Key key) {
GenericPayload<T> storedPayload = store.getWithMetadata(key.getValue());
if (storedPayload == null) {
return null;
}
if (MetadataUtils.extractAction(storedPayload).equals(Action.UNPUBLISH)) {
return null;
}
return storedPayload.getPayload();
}
}
1 | The Service uses Stores to maintain last received temperature and wind speed data for each city. You can find more detailed information about how Stores work in Stores reference. |
2 | The consumeTemperature method reads temperature messages from the incoming channel. When a temperature is published, it checks for the most recent wind data. If wind data is found, the weather conditions for the specified city are computed and the result is sent to the outgoing channel. The message is sent with metadata copied from the input message, including the city key. |
3 | The consumeWind method functions similarly to consumeTemperature , computing the weather conditions for the requested city when all necessary data is available. Neither method produces results for unpublish messages. Both methods use an extended @Incoming method signature (see the Processing Service reference for available method signatures) to simplify access to key and action metadata fields. |
4 | The getPublishedPayload method retrieves saved data for a city from the provided Store. If the data hasn’t yet been published or has been unpublished, null is returned, to indicate that weather conditions cannot be computed in these cases. |
Step 6: Unit testing the weather conditions service
This chapter outlines the recommended approach for unit testing a Processing Service. It shares the key concepts that we presented in Step 4: Unit testing the string capitalization service.
Properties file
Add the following entries to src/test/resources/application.properties
file:
mp.messaging.incoming.temperatures.connector=smallrye-in-memory (1)
mp.messaging.incoming.winds.connector=smallrye-in-memory (1)
mp.messaging.outgoing.weather-conditions.connector=smallrye-in-memory (1)
quasar.messaging.store.temperatures_store-temperature.use-connector=false (2)
quasar.messaging.store.winds_store-wind.use-connector=false (2)
1 | Configure incoming and outgoing channels to operate in the in-memory mode |
2 | Disable the use of a connector for the Stores, as described in the Connector reference |
Unit test class
Here is the unit test class for the weather conditions service:
package dev.streamx.docs.sampleprocessingservice;
import static dev.streamx.quasar.reactive.messaging.metadata.Action.PUBLISH;
import static dev.streamx.quasar.reactive.messaging.utils.MetadataUtils.extractAction;
import static dev.streamx.quasar.reactive.messaging.utils.MetadataUtils.extractKey;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import dev.streamx.quasar.reactive.messaging.metadata.Action;
import dev.streamx.quasar.reactive.messaging.metadata.EventTime;
import dev.streamx.quasar.reactive.messaging.metadata.Key;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Set;
import java.util.stream.Collectors;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@QuarkusTest
class WeatherConditionsProcessingServiceTest {
private InMemorySource<Message<Temperature>> temperaturesSource;
private InMemorySource<Message<Wind>> windsSource;
private InMemorySink<WeatherConditions> weatherConditionsSink;
@Inject
@Any
InMemoryConnector connector;
(1)
@BeforeEach
void init() {
temperaturesSource = connector.source("temperatures");
windsSource = connector.source("winds");
weatherConditionsSink = connector.sink("weather-conditions");
weatherConditionsSink.clear();
}
(2)
@Test
void shouldComputeWeatherConditions_AndPublishResultToOutgoingChannel() {
// given
String city = "New York";
// when
publishTemperature(city, 30);
publishWind(city, 5);
// then
assertOutgoingPublishMessage(city, "Temperature: 30, wind speed: 5");
}
(3)
@Test
void shouldUseMostRecentDataToComputeWeatherConditions_AndPublishResultToOutgoingChannel() {
// given
String city = "Tokio";
// when
publishTemperature(city, 30);
publishTemperature(city, 20);
publishTemperature(city, 10);
publishWind(city, 5);
publishWind(city, 15);
publishWind(city, 25);
// then
assertOutgoingPublishMessage(city, "Temperature: 10, wind speed: 25");
}
(4)
@Test
void shouldNotProduceOutgoingMessage_WhenMissingSourceData() {
// given
String city = "Paris";
// when: temperature is published, but wind is not published
publishTemperature(city, 5);
// then
assertNoOutgoingMessages();
}
private void publishTemperature(String city, int temperature) {
Message<Temperature> message = createPublishMessage(city, new Temperature(temperature));
temperaturesSource.send(message);
}
private void publishWind(String city, int speed) {
Message<Wind> message = createPublishMessage(city, new Wind(speed));
windsSource.send(message);
}
private static <T> Message<T> createPublishMessage(String city, T payload) {
Metadata metadata = Metadata.of(
Key.of(city),
EventTime.of(System.currentTimeMillis()),
Action.PUBLISH
);
return Message.of(payload, metadata);
}
private void assertOutgoingPublishMessage(String expectedCity, String expectedWeatherConditions) {
await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> {
Set<Message<WeatherConditions>> matchingMessages = weatherConditionsSink.received()
.stream()
.filter(message ->
extractKey(message).equals(expectedCity)
&& extractAction(message).equals(PUBLISH)
&& message.getPayload().getWeatherConditions().equals(expectedWeatherConditions))
.collect(Collectors.toSet());
assertFalse(matchingMessages.isEmpty());
});
}
private void assertNoOutgoingMessages() {
await().during(Duration.ofSeconds(1)).untilAsserted(() ->
assertEquals(0, weatherConditionsSink.received().size())
);
}
}
1 | The init() method connects each of the service’s incoming channels to an in-memory source and links the outgoing channel to an in-memory sink |
2 | The test demonstrates the standard processing path, where the service generates an outgoing message containing the computed weather conditions for the requested city by using previously collected component data |
3 | The test highlights one of the fundamental features of Stores: they prevent the processing of outdated data by retaining only the most recent version for each key |
4 | If only temperature data is available for the requested city, the response message cannot be created |
Step 7: Message converters
In the previous sections, we discussed the use of Stores. Each Store is populated with messages from the topics associated with its channel. If direct access to payloads from a Store is not required and you only need the Store to filter out outdated messages, you can optimize it to store less data.
Stores make use of Message Converters to determine what part of the incoming message should be saved instead of saving the whole original message. Metadata is always stored for every message.
A common use case is to store only the most recent action for each message. This approach can simplify the code needed to verify whether data from all component sources is available (and not unpublished) for generating a summary result.
Below are the additional code snippets needed to implement this functionality. Add them to the existing weather conditions service codebase, created earlier in this guide.
Add custom action-based Store
Before you implement a message converter, let’s first declare a custom Store that will make use of the message converter.
Open the WeatherConditionsProcessingService
source file and declare the Store (along with a getter required for unit testing):
@Inject
@FromChannel(WINDS_INCOMING_CHANNEL)
Store<String> windActionsPerCityStore;
public Store<String> getWindActionsPerCityStore() {
return windActionsPerCityStore;
}
Message converter
Create the Java class below that can produce modified copies of messages based on the value of the Action
input metadata field. This message converter will be used to add the modified messages to the Store of the corresponding type.
package dev.streamx.docs.sampleprocessingservice;
import dev.streamx.quasar.reactive.messaging.metadata.Action;
import dev.streamx.quasar.reactive.messaging.utils.MetadataUtils;
import io.smallrye.reactive.messaging.MessageConverter;
import jakarta.enterprise.context.ApplicationScoped;
import java.lang.reflect.Type;
import org.eclipse.microprofile.reactive.messaging.Message;
@ApplicationScoped
public class MessageToActionConverter implements MessageConverter {
@Override
public boolean canConvert(Message<?> message, Type type) {
return type == String.class && message.getMetadata(Action.class).isPresent();
}
@Override
public Message<?> convert(Message<?> message, Type type) {
Action action = MetadataUtils.extractAction(message);
return message.withPayload(action.getValue());
}
}
Test properties file
In the existing src/test/resources/application.properties
file, add the following configuration line to turn off connector usage for the newly added store, as described in the Connector reference
quasar.messaging.store.winds_store-string.use-connector=false
Unit test class
Now, let’s write a unit test that verifies behavior of the custom Store. In the WeatherConditionsProcessingServiceTest
, add the following code blocks:
import dev.streamx.quasar.reactive.messaging.Store;
@Inject
WeatherConditionsProcessingService weatherConditionsProcessingService;
@Test
void shouldStoreLastActionForEachWind() {
// given
publishWind("Madrid", 15);
publishWind("Barcelona", 20);
unpublishWind("Madrid");
// when
Store<String> store = weatherConditionsProcessingService.getWindActionsPerCityStore();
// then
assertEquals(UNPUBLISH.getValue(), store.get("Madrid"));
assertEquals(PUBLISH.getValue(), store.get("Barcelona"));
}
The test ensures that the windActionsPerCityStore
contains only the last action of wind messages for each city.
Step 8: Build a Docker Image for your Processing Service
So far, you have only run the implemented services in unit tests. The next step is to integrate them into a StreamX Mesh, where they will operate in production mode.
Processing Services are packaged and deployed in the form of Docker images. To enable building a Docker image for your service during the project build, open the pom.xml
file of the sample-processing-service
project. Locate the quarkus-maven-plugin
definition, and add the following execution goal to the existing list under executions/execution/goals
:
<goal>image-build</goal>
Rebuild the project:
mvn clean install
Once the build is complete, verify that an image with sample-processing-service
in its name exists in your local Docker instance. Then, push the image to your Docker registry, making it available for inclusion in a StreamX Mesh YAML file.
Next steps
Now that you have a Docker image with your Processing Service, it’s ready to be integrated into the StreamX Mesh, where services operate in production mode.
To run a StreamX Mesh, in addition to a Processing Service, at least one Delivery Service is required. This is because a Processing Service sends processed data to a Pulsar topic, and it’s the Delivery Service that handles delivering the data to the final destinations.
Once you’ve completed both the Processing and Delivery Service tutorials, you are ready to create your first StreamX Mesh!