REST Ingestion Service

The REST Ingestion Service is the entry point to StreamX. You can ingest any data by using the common HTTP protocol with JSON Avro format which is supported by StreamX. It allows you to:

  1. browse schema - lists schemas registered for inboxes channels in StreamX and fetches schema for given channel.

  2. ingest data - ingests data to be processed. Ingestion could be publish or unpublish action. Publish action can be any event like price change, page update or trigger for some data processing. Unpublish action is a trigger to inform StreamX that data identified with the given key is obsolete. Unpublishing can be used when a page is no longer desired online, or a product is no longer available, and in other similar scenarios.

Available list of schemas corresponding to the inboxes channels where data can be ingested, depends on the current state of the StreamX Mesh. The list of channels and schemas are resolved from the other services running in the mesh. The REST Ingestion Service accepts data in Avro format and validates it against the Avro schema registered for the given channel. StreamX rejects data with an incorrect format, along with data that can not be serialized to channel’s Avro schema.

Available endpoints

GET: /ingestion/v1/channels

Refreshes channels and returns map of available AVRO schemas for channels.

Response code:

200 OK: Schemas fetched successfully for refreshed channels.

500 Error: Schemas fetch for channels error.

GET: /ingestion/v1/channels/{channel}/schema

Refreshes channels and returns AVRO schema for given channel.

Path-params: channel - channel to fetch schema.

Response code:

200 OK: Schema fetched successfully for refreshed channel.

400 Bad Request: Requested channel is not supported.

403 Forbidden: You have not access to this channel.

500 Error: Schema fetch for channel error.

POST: /ingestion/v1/channels/{channel}/messages

Accept JSON objects following each other and there is no JSON object array, see https://en.wikipedia.org/wiki/JSON_streaming#Concatenated_JSON.

Path-params: channel - channel where data is ingested.

Query-params: schema-name if defined will be validated against schema supported by channel.

Response:

202 Accepted: Ingestion request accepted for processing.

The response payload contains individual objects that represent the processing result of each respective message provided in the request payload. The processing results are JSON objects following each other and there is no JSON object array, see https://en.wikipedia.org/wiki/JSON_streaming#Concatenated_JSON

Although the API responds with a 202 Accepted status, it does not necessarily imply that all messages are processed successfully. This status simply means that the system has accepted the request for processing, but it does not guarantee the outcome of processing. To confirm the processing results of the individual messages, clients are strongly recommended to examine the response body. This is because a 202 Accepted status could still be returned even if the processing has failed for one or all messages in the request.

202 Accepted response body example:

{"success":{"eventTime":1732022731718,"key":"product1-key"}}
{"failure":{"errorCode":"INVALID_INGESTION_INPUT","errorMessage":"Data does not match the existing schema: Expected field name not found: key"}}
{"success":{"eventTime":1732022731718,"key":"product2-key"}}

400 Bad Request: Ingestion request rejected. Channel is not supported.

403 Forbidden: Ingestion request rejected. Requested channel is forbidden.

500 Error: Error while processing ingestion request.

Resource Key Rewrite

StreamX provides a Key Rewrite mechanism to enhance the functionality. This feature allows defining a pair of matching and rewrite regular expression (regexp).

When data being ingested matches a predefined pattern, the key is modified based on the rewrite pattern. The API endpoint then responds with the updated key included in the body of the response. The rewritten key is used during further data processing in StreamX Mesh.

More about key rewrite in reference.

Authentication

REST Ingestion Service endpoints can be secured to restrict StreamX ingestion access only to verified source systems. Quarkus framework used by REST Ingestion Service includes an authentication mechanism. By default, REST Ingestion Service enables authentication by using bearer authentication based on JWT.

For more information, refer to the Quarkus Security JWT Configuration Reference.

Sources support

When mesh definition has sources section each source has it’s own token. REST Ingestion can be configured with the following source related properties:

streamx.sources.<source-name>.channels
streamx.sources.<source-name>.accept-iat-from

Source channels defines the list of the channels that can be ingested by given source. This must be equal to the list from mesh definition. For multiple values ',' separator must be used.

accept-iat-from controls the time of the JWT tokens (iat) accepted by REST Ingestion Service. Tokens issued before this timestamp are invalid. Timestamp is a UNIX epoch time in seconds.

Request authorization

The JWT token is used for both authentication and authorization in the system. It contains several claims that help identify and authorize the user.

  • JWT UPN (User Principal Name): This claim carries information about the source system name, which is considered the user in the context of the system.

  • JWT IAT (Issued at): The numeric value of the time when the token has been issued.

REST Ingestion Service validates the provided source name with the list of configured sources and checks if the given source is allowed to ingest the given channel. Whenever sources are modified the old tokens with iat lower than modification time stop to work.

Auth configuration

quarkus.http.auth.permission.bearer.paths=/ingestion/v1/*
quarkus.http.auth.permission.bearer.policy=authenticated
quarkus.http.auth.permission.bearer.auth-mechanism=bearer

If the auth is enabled requests to REST Ingestion Service endpoints must provide Authorization header with barer authentication containing the JWT token.
Authorization: Bearer <token>

Authentication can be disabled by setting
quarkus.http.auth.permission.bearer.policy to permit.

Data ingestion clients

StreamX Ingestion REST Clients uses HTTP protocol. You can use it directly or by using Java Client or by using CLI.

Usage

Below examples assume authentication is disabled. For enabled authentication each request must contain Authentication Header

Fetch schemas:

curl -s http://${DOMAIN}:${PORT}/ingestion/v1/channels

Fetch schema:

curl -s http://${DOMAIN}:${PORT}/ingestion/v1/channels/${CHANNEL_NAME}/schema

Ingest data:

curl -X POST http://${DOMAIN}:${PORT}/ingestion/v1/channels/${CHANNEL_NAME}/messages \
-H 'Content-Type: application/json' \
-d "{
     \"key\" : \"${KEY}\",
     \"action\" : \"${ACTION}\",
     \"eventTime\" : \"${EVENT_TIME}\",
     \"properties\" : \"${METADATA_PROPERTIES}\",
     \"payload\" : \"${PAYLOAD}\"
   }"
Where:

${CHANNEL_NAME} - inboxes channel name

${KEY} - identifier of the ingested data

${ACTION} - publish/unpublish

${EVENT_TIME} - use null for current time; or provide value representing number of milliseconds from 1970-01-01T00:00:00Z. Examples:

"eventTime": null,
"eventTime": {
    "long": 1729759830189
}

${METADATA_PROPERTIES} - object containing optional metadata properties; values must be strings. Example:

"properties": {
    "authorId": "534756348"
}

${PAYLOAD} - ingestion payload; can be null for unpublish action; if present must match the channel schema. Examples:

"payload": null
"payload" : {
    "dev.streamx.blueprints.data.Page" : {
        "content" : {
            "bytes" : "<h1>Hello World!</h1>"
        }
    }
}
Example
curl -X POST http://${DOMAIN}:${PORT}/ingestion/v1/channels/pages/messages \
   -H 'Content-Type: application/json' \
   -H "Authorization: Bearer ${TOKEN}" \
   -d  '{
         "key" : "/homepage.html",
         "action" : "publish",
         "eventTime" : null,
         "properties" : { },
         "payload" : {
           "dev.streamx.blueprints.data.Page" : {
             "content" : {
               "bytes" : "<h1>Hello World!</h1>"
             }
           }
         }
       }'

StreamX Ingestion API Java Client

The StreamX Ingestion API Java client allows you to use the StreamX Ingestion API from your Java application.

StreamX CLI

Similar to the Java Client, the StreamX CLI makes calls to the REST Ingestion Service. More about the CLI.

Usage

streamx publish -s 'content.bytes=file://site/index.html' pages index.html
streamx unpublish pages index.html

Observability

  1. Metrics - Rest Ingestion Service exposes Micrometer metrics with Prometheus registry at /q/metrics/ endpoint.

  2. Traceability - Publish/Unpublish requests are marked with a Pulsar trace, which is propagated through StreamX Mesh by using OpenTelemetry tracing.

Configuration

REST Ingestion Service communicate with StreamX Mesh through Apache Pulsar topics by using SmallRye Reactive Messaging. mp.messaging.incoming.ingestion is the internal channel, which can be used to propagate Pulsar Client Configuration. More information about Pulsar Configuration can be found in SmallRye Pulsar Reference.

Schema discovery in StreamX Mesh requires use of Pulsar Admin client which is not provided by Quarkus. Pulsar Admin configuration is not possible with MicroProfile properties, because it is intended for SmallRye managed client. REST Ingestion Service requires pulsar.admin.serviceUrl to establish connection for schema discovery.

To distinguish different tenants in StreamX, streamx.tenant is required.

Property

Description

Required

streamx.tenant

StreamX Mesh tenant

Yes

mp.messaging.incoming.ingestion.serviceUrl

Pulsar Client URL

Yes

pulsar.admin.serviceUrl

Pulsar Admin Client URL for channels schema discovery

Yes