Create your own Delivery Service

This guide shows you how to create your own Delivery Service from scratch.

Prerequisites

To complete this guide, you will need:

  • Roughly 45 minutes

  • OpenJDK 17+ installed with JAVA_HOME configured appropriately

  • An IDE of your choice

  • Apache Maven 3.9.6+

  • Docker Engine installed and running

  • A Docker registry of your choice, with read/write access

For a deeper understanding of the concepts behind a Delivery Service, refer to this reference either before starting or while progressing through the tutorial.

Step 1: Create a Delivery Service project from template

Delivery services are developed in the form of Java projects utilizing the Quarkus framework.

To create a template for your service, run the following maven command:

mvn io.quarkus.platform:quarkus-maven-plugin:3.15.1:create \
  -DprojectGroupId=dev.streamx.docs \
  -DprojectArtifactId=sample-delivery-service \
  -Dextensions='rest'

The latest guide for creating a Quarkus application can be found at Creating Your First Application.

Step 2: Configure the generated project template

To make your project a Delivery Service, include the StreamX Delivery Service Extension by adding the following entry to your pom.xml file. Use the newest release version of streamx.

  <dependency>
    <groupId>dev.streamx</groupId>
    <artifactId>streamx-delivery-service-deployment</artifactId>
    <version>${streamx.version}</version>
  </dependency>

Add StreamX releases repository

The Streamx Delivery Service Extension is provided in the form of a maven artifact hosted in StreamX Public Releases repository. Add the below streamx-maven-public-releases repository definition to your pom.xml:

  <repositories>
    <repository>
      <id>streamx-maven-public-releases</id>
      <url>https://europe-west1-maven.pkg.dev/streamx-releases/streamx-maven-public-releases</url>
    </repository>
  </repositories>

Step 3: Implement sample Delivery Service

The Java code for a Delivery Service is where the business logic should be implemented according to the user’s requirements.

In this chapter, you will implement a basic Delivery Service that receives string messages from its incoming channel and makes them accessible for end users through a REST API endpoint.

Data model

To begin, create the data model class. It is a simple wrapper for java String object. The format that must be used for model classes is AVRO. For additional details, refer to the Channel Schema reference.

package dev.streamx.docs.sampledeliveryservice;

import org.apache.avro.specific.AvroGenerated;

@AvroGenerated
public class StringWrapper {

  private String value;

  public StringWrapper() {
    // needed for Avro serialization
  }

  public StringWrapper(String value) {
    this.value = value;
  }

  public String getValue() {
    return value;
  }
}

Strings repository

The service stores received strings in an in-memory repository. The repository is a simple wrapper around a key-value map:

package dev.streamx.docs.sampledeliveryservice;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;

@ApplicationScoped
public class StringsRepository {

  private final Map<String, String> strings = new HashMap<>();

  public boolean containsKey(String key) {
    return strings.containsKey(key);
  }

  public String get(String key) {
    return strings.get(key);
  }

  public void put(String key, String value) {
    strings.put(key, value);
  }

  public void remove(String key) {
    strings.remove(key);
  }

  public void clear() {
    strings.clear();
  }

}

REST API

The service exposes a simple REST API that allows users to retrieve stored strings by their key. Below is the source code for the repository:

package dev.streamx.docs.sampledeliveryservice;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import org.jboss.resteasy.reactive.RestResponse;
import org.jboss.resteasy.reactive.RestResponse.ResponseBuilder;

@Path("/api")
public class RestApi {

  @Inject
  StringsRepository stringsRepository;

  @GET
  @Path("strings/{key}")
  public RestResponse<String> getStringByKey(@PathParam("key") String key) {
    if (stringsRepository.containsKey(key)) {
      return ResponseBuilder.ok(stringsRepository.get(key)).build();
    } else {
      return ResponseBuilder.<String>notFound().build();
    }
  }
}

The REST API and its endpoints are automatically made available at application startup, whether in main or test mode. This is handled by the REST extension included in the mvn command executed in Step 1: Create a Delivery Service project from template. By default, the API is exposed on the same port that the service uses when running in a Docker container.

Service implementation

Here is the source code of the service:

package dev.streamx.docs.sampledeliveryservice;

import dev.streamx.quasar.reactive.messaging.metadata.Action;
import dev.streamx.quasar.reactive.messaging.metadata.Key;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class DeliverStringsService {

  @Inject
  StringsRepository stringsRepository;

  (1)
  @Incoming("strings")
  public void handleIncomingString(StringWrapper incomingString, Key key, Action action) {
    if (action.equals(Action.PUBLISH)) { (2)
      stringsRepository.put(key.getValue(), incomingString.getValue());
    } else if (action.equals(Action.UNPUBLISH)) { (3)
      stringsRepository.remove(key.getValue());
    }

    (4)
  }
}
1 The handleIncomingString method is the core of the service. By annotating it with @Incoming, you specify the channel it listens to for incoming data. The return type is void since the service solely consumes the data without generating any output messages.
2 Each incoming message is tied to one of two actions: PUBLISH or UNPUBLISH. For the PUBLISH action, the service stores the received string in the repository by using the key specified in the metadata.
3 A string received with the UNPUBLISH action is removed from the repository.
4 There are only two predefined actions. If a message with any other action is received, the method ignores it and does not process the message.

Step 4: Unit testing the sample Delivery Service

This chapter outlines the recommended approach for unit testing a Delivery Service. You will write unit tests for the service you’ve created in the previous step.

Maven dependencies

Add the following test dependencies to your pom.xml file:

    <dependency>
      <groupId>io.smallrye.reactive</groupId>
      <artifactId>smallrye-reactive-messaging-in-memory</artifactId> (1)
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.awaitility</groupId>
      <artifactId>awaitility</artifactId> (2)
      <version>4.2.2</version>
      <scope>test</scope>
    </dependency>
1 The smallrye-reactive-messaging-in-memory dependency provides support for testing channels communication with an in-memory connector.
2 Because the communication is asynchronous, you must implement cyclic pooling of the destination channel with a library of your choice. In this example, we use org.awaitility for this purpose.

Test properties file

Create application.properties file in src/test/resources directory of your project, with the following content:

mp.messaging.incoming.strings.connector=smallrye-in-memory (1)
quarkus.pulsar.devservices.enabled=false (2)
1 Configure the service’s incoming channel to operate in the in-memory mode
2 Prevent the Pulsar container from starting during tests, since the tests use in-memory channel

Unit test class

package dev.streamx.docs.sampledeliveryservice;

import static dev.streamx.quasar.reactive.messaging.metadata.Action.PUBLISH;
import static dev.streamx.quasar.reactive.messaging.metadata.Action.UNPUBLISH;
import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.is;

import dev.streamx.quasar.reactive.messaging.metadata.Action;
import dev.streamx.quasar.reactive.messaging.metadata.EventTime;
import dev.streamx.quasar.reactive.messaging.metadata.Key;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import java.time.Duration;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@QuarkusTest
class DeliverStringsServiceTest {

  private InMemorySource<Message<StringWrapper>> stringsSource;

  (1)
  @Inject
  StringsRepository stringsRepository;

  @Inject
  @Any
  InMemoryConnector connector;

  (2)
  @BeforeEach
  void init() {
    stringsSource = connector.source("strings");
  }

  @AfterEach
  void removeStoredStrings() {
    stringsRepository.clear();
  }

  (3)
  @Test
  void shouldStoreString() {
    // given
    String key = "key-1.txt";
    String value = "Hello world!";

    // when
    publishString(key, value);

    // then
    assertStringIsAvailable(key, value);
  }

  (4)
  @Test
  void shouldDeleteUnpublishedString() {
    // given
    String key = "key-2.txt";
    String value = "Hello World.";

    publishString(key, value);
    assertStringIsAvailable(key, value);

    // when
    unpublishString(key);

    // then
    assertStringIsNotAvailable(key);
  }

  private void publishString(String key, String value) {
    stringsSource.send(Message.of(new StringWrapper(value), createMetadata(key, PUBLISH)));
  }

  private void unpublishString(String key) {
    stringsSource.send(Message.of(null, createMetadata(key, UNPUBLISH)));
  }

  private static Metadata createMetadata(String key, Action action) {
    return Metadata.of(
        Key.of(key),
        action,
        EventTime.of(System.currentTimeMillis())
    );
  }

  (5)
  private void assertStringIsAvailable(String key, String expectedValue) {
    await().atMost(Duration.ofSeconds(3)).untilAsserted(() ->
        given()
            .when().get("/api/strings/" + key)
            .then()
            .statusCode(200)
            .body(is(expectedValue)));
  }

  (5)
  private void assertStringIsNotAvailable(String key) {
    await().during(Duration.ofSeconds(3)).untilAsserted(() ->
        given()
            .when().get("/api/strings/" + key)
            .then()
            .statusCode(404));
  }
}
1 The tests will verify that the published strings are stored in the StringsRepository which is an @ApplicationScoped service. This ensures its availability during @QuarkusTest execution.
2 The init() method connects the service’s incoming channel to an in-memory source.
3 This test demonstrates the standard processing workflow, where the service receives an incoming message containing a string and stores it in a manner that makes it accessible through its REST API endpoint.
4 When the service receives a request to unpublish a string, it removes the string from its internal repository. As a result, any subsequent attempts to retrieve it by the endpoint will fail.
5 The tests use the rest-assured library to call the REST API endpoints and verify their responses. This library was automatically added to your pom.xml during project creation in Step 1: Create a Delivery Service project from template.

Step 5: Add Store to filter outdated data

To improve your service to save only most recent versions of content identified by the same key, you can make use of Stores. Each Store is populated with messages from the topics associated with its channel. When your service starts while there are outdated versions of for example a web page, a Store ensures that only the latest version is passed to the @Incoming method and delivered to end users. The intermediate versions are bypassed since they are outdated.

Sample Delivery Service to deliver product prices

In this chapter, you will implement a sample Delivery Service that delivers product prices.

Data model

To begin, create the data model class:

package dev.streamx.docs.sampledeliveryservice;

import org.apache.avro.specific.AvroGenerated;

@AvroGenerated
public class ProductPrice {

  private String productName;
  private int price;

  public ProductPrice() {
    // needed for Avro serialization
  }

  public ProductPrice(String productName, int price) {
    this.productName = productName;
    this.price = price;
  }

  public String getProductName() {
    return productName;
  }

  public int getPrice() {
    return price;
  }
}

Service implementation

Here is the source code of the service:

package dev.streamx.docs.sampledeliveryservice;

import dev.streamx.quasar.reactive.messaging.Store;
import dev.streamx.quasar.reactive.messaging.annotations.FromChannel;
import dev.streamx.quasar.reactive.messaging.metadata.Action;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.LinkedList;
import java.util.List;
import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class DeliverPricesService {

  (1)
  private final List<ProductPrice> receivedPrices = new LinkedList<>();

  (2)
  @FromChannel("prices")
  Store<ProductPrice> pricesStore;

  (3)
  @Incoming("prices")
  public void handleIncomingPrice(ProductPrice productPrice, Action action) {
    String productName = productPrice.getProductName();
    if (action.equals(Action.PUBLISH)) {
      receivedPrices.add(productPrice);
    } else if (action.equals(Action.UNPUBLISH)) {
      receivedPrices.removeIf(item -> item.getProductName().equals(productName));
    }
  }

  List<ProductPrice> getReceivedPrices() {
    return receivedPrices;
  }

  Store<ProductPrice> getPricesStore() {
    return pricesStore;
  }
}
1 To keep the example straightforward, the service delivers the received data to an in-memory list
2 The pricesStore will be automatically populated with messages from the prices channel
3 The handleIncomingPrice method either adds the received price to its internal list or removes it, depending on the incoming Action.

Test properties file

To set up the service for unit test execution, add the following configuration entries to the existing src/test/resources/application.properties file:

mp.messaging.incoming.prices.connector=smallrye-in-memory (1)
quasar.messaging.store.prices_store-productprice.use-connector=false (2)
1 Configure the service’s incoming channel to operate in the in-memory mode
2 Turn off connector usage for the prices store, as described in the Connector reference

Unit test class

Now, let’s create a unit test to validate the behavior of the custom Store. Below is the source code:

package dev.streamx.docs.sampledeliveryservice;

import static dev.streamx.quasar.reactive.messaging.metadata.Action.PUBLISH;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

import dev.streamx.quasar.reactive.messaging.Store.Entry;
import dev.streamx.quasar.reactive.messaging.metadata.EventTime;
import dev.streamx.quasar.reactive.messaging.metadata.Key;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@QuarkusTest
class DeliverPricesServiceTest {

  (1)
  private static final AtomicLong eventTime = new AtomicLong(1L);

  private InMemorySource<Message<ProductPrice>> pricesSource;

  @Inject
  @Any
  InMemoryConnector connector;

  @Inject
  DeliverPricesService deliverPricesService;

  (2)
  @BeforeEach
  void init() {
    pricesSource = connector.source("prices");
  }

  (3)
  @Test
  void shouldKeepOnlyMostRecentPriceForProductInStore() {
    // given
    String productName = "product-1";

    ProductPrice price1 = new ProductPrice(productName, 123);
    ProductPrice price2 = new ProductPrice(productName, 345);
    ProductPrice price3 = new ProductPrice(productName, 567);
    ProductPrice price4 = new ProductPrice(productName, 789);

    // when
    publishPrices(price1, price2, price3, price4);

    // then
    assertPricesSavedToMemory(price1, price2, price3, price4);

    // and
    assertSinglePriceInStore(price4);
  }

  private void publishPrices(ProductPrice... productPrices) {
    for (ProductPrice productPrice : productPrices) {
      Metadata metadata = Metadata.of(
          Key.of(productPrice.getProductName()),
          PUBLISH,
          EventTime.of(eventTime.incrementAndGet())
      );
      pricesSource.send(Message.of(productPrice, metadata));
    }
  }

  private void assertPricesSavedToMemory(ProductPrice... expectedPrices) {
    await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
        assertEquals(expectedPrices.length, deliverPricesService.getReceivedPrices().size())
    );

    List<ProductPrice> savedPrices = deliverPricesService.getReceivedPrices();
    for (int i = 0; i < expectedPrices.length; i++) {
      assertSameProductPrice(expectedPrices[i], savedPrices.get(i));
    }
  }

  private void assertSinglePriceInStore(ProductPrice expectedPrice) {
    List<ProductPrice> storedPrices = deliverPricesService.getPricesStore().entries()
        .map(Entry::value).toList();
    assertEquals(1, storedPrices.size());
    assertSameProductPrice(expectedPrice, storedPrices.get(0));
  }

  private static void assertSameProductPrice(ProductPrice expectedPrice, ProductPrice storedPrice) {
    assertEquals(expectedPrice.getProductName(), storedPrice.getProductName());
    assertEquals(expectedPrice.getPrice(), storedPrice.getPrice());
  }
}
1 In this test, we will publish multiple price messages ensuring that each message has a later event time than the previous one.
2 The init() method connects the service’s incoming channel to an in-memory source
3 The test illustrates that while every price message is processed by the service, only the latest price for each product is saved in the Store. If the service is started with four messages available on the input channel, only the most recent version will be processed.

Step 6: Build a Docker Image for your Delivery Service

So far, you have only run the implemented services in unit tests. The next step is to integrate them into a StreamX Mesh, where they will operate in production mode.

Delivery Services are packaged and deployed in the form of Docker images. To enable building a Docker image for your service during the project build, open the pom.xml file of the sample-delivery-service project. Locate the quarkus-maven-plugin definition, and add the following execution goal to the existing list under executions/execution/goals:

<goal>image-build</goal>

Rebuild the project:

mvn clean install

Once the build is complete, verify that an image with sample-delivery-service in its name exists in your local Docker instance. Then, push the image to your Docker registry, making it available for inclusion in a StreamX Mesh YAML file.

Summary

Congratulations on successfully creating your first Delivery Service! You are now equipped to develop your own production-ready services.

Next steps

Now that you have a Docker image with your Delivery Service, it’s ready to be integrated into the StreamX Mesh, where services operate in production mode.

To run a StreamX Mesh, in addition to a Delivery Service, at least one Processing Service is required. This is because a Delivery Service sends processed data to a Pulsar topic, and it’s the Delivery Service that handles delivering the data to the final destinations.

Once you’ve completed both the Processing and Delivery Service tutorials, you are ready to create your first StreamX Mesh!