This version is still in development and is not considered stable yet. For the latest stable version, please use StreamX Guides 1.0.1! |
REST Ingestion Service
The REST Ingestion Service is the entry point to StreamX. You can ingest any data by using the common HTTP protocol with JSON Avro format which is supported by StreamX. It allows you to:
-
browse schema - lists schemas registered for inboxes channels in StreamX and fetches schema for given channel.
-
ingest data - ingests data to be processed. Ingestion could be
publish
orunpublish
action. Publish action can be any event like price change, page update or trigger for some data processing. Unpublish action is a trigger to inform StreamX that data identified with the given key is obsolete. Unpublishing can be used when a page is no longer desired online, or a product is no longer available, and in other similar scenarios.
Available list of schemas corresponding to the inboxes channels where data can be ingested, depends on the current state of the StreamX Mesh. The list of channels and schemas are resolved from the other services running in the mesh. The REST Ingestion Service accepts data in Avro format and validates it against the Avro schema registered for the given channel. StreamX rejects data with an incorrect format, along with data that can not be serialized to channel’s Avro schema.
Available endpoints
GET: /ingestion/v1/channels
Refreshes channels and returns map of available AVRO schemas for channels.
Response code:
200 OK
: Schemas fetched successfully for refreshed channels.
500 Error
: Schemas fetch for channels error.
GET: /ingestion/v1/channels/{channel}/schema
Refreshes channels and returns AVRO schema for given channel.
Path-params: channel
- channel to fetch schema.
Response code:
200 OK
: Schema fetched successfully for refreshed channel.
400 Bad Request
: Requested channel is not supported.
403 Forbidden
: You have not access to this channel.
500 Error
: Schema fetch for channel error.
POST: /ingestion/v1/channels/{channel}/messages
Accept JSON objects following each other and there is no JSON object array, see https://en.wikipedia.org/wiki/JSON_streaming#Concatenated_JSON.
Path-params: channel
- channel where data is ingested.
Query-params: schema-name
if defined will be validated against schema supported by channel.
Response:
202 Accepted
: Ingestion request accepted for processing.
The response payload contains individual objects that represent the processing result of each respective message provided in the request payload. The processing results are JSON objects following each other and there is no JSON object array, see https://en.wikipedia.org/wiki/JSON_streaming#Concatenated_JSON
Although the API responds with a 202 Accepted status, it does not necessarily imply that all messages are processed successfully. This status simply means that the system has accepted the request for processing, but it does not guarantee the outcome of processing. To confirm the processing results of the individual messages, clients are strongly recommended to examine the response body. This is because a 202 Accepted status could still be returned even if the processing has failed for one or all messages in the request.
|
202 Accepted response body example:
{"success":{"eventTime":1732022731718,"key":"product1-key"}}
{"failure":{"errorCode":"INVALID_INGESTION_INPUT","errorMessage":"Data does not match the existing schema: Expected field name not found: key"}}
{"success":{"eventTime":1732022731718,"key":"product2-key"}}
400 Bad Request
: Ingestion request rejected. Channel is not supported.
403 Forbidden
: Ingestion request rejected. Requested channel is forbidden.
500 Error
: Error while processing ingestion request.
Response headers:
X-Trace-ID
- contains traceId
. More about the traces.
Resource Key Rewrite
StreamX provides a Key Rewrite mechanism to enhance the functionality. This feature allows defining a pair of matching and rewrite regular expression (regexp).
When data being ingested matches a predefined pattern, the key is modified based on the rewrite pattern. The API endpoint then responds with the updated key included in the body of the response. The rewritten key is used during further data processing in StreamX Mesh.
More about key rewrite in reference.
Authentication
REST Ingestion Service endpoints can be secured to restrict StreamX ingestion access only to verified source systems. Quarkus framework used by REST Ingestion Service includes an authentication mechanism. By default, REST Ingestion Service enables authentication by using bearer authentication based on JWT.
For more information, refer to the Quarkus Security JWT Configuration Reference.
Sources support
When mesh definition has sources section each source has it’s own token. REST Ingestion can be configured with the following source related properties:
streamx.sources.<source-name>.channels
streamx.sources.<source-name>.accept-iat-from
Source channels
defines the list of the channels that can be ingested by given source. This must be equal to the list from mesh definition. For multiple values ',' separator must be used.
accept-iat-from
controls the time of the JWT tokens (iat) accepted by REST Ingestion Service. Tokens issued before this timestamp are invalid. Timestamp is a UNIX epoch time in seconds.
Request authorization
The JWT token is used for both authentication and authorization in the system. It contains several claims that help identify and authorize the user.
-
JWT UPN (User Principal Name): This claim carries information about the source system name, which is considered the user in the context of the system.
-
JWT IAT (Issued at): The numeric value of the time when the token has been issued.
REST Ingestion Service validates the provided source name with the list of configured sources and checks if the given source is allowed to ingest the given channel. Whenever sources are modified the old tokens with iat
lower than modification time stop to work.
Auth configuration
quarkus.http.auth.permission.bearer.paths=/ingestion/v1/*
quarkus.http.auth.permission.bearer.policy=authenticated
quarkus.http.auth.permission.bearer.auth-mechanism=bearer
If the auth is enabled requests to REST Ingestion Service endpoints must provide Authorization
header with barer authentication containing the JWT token.
Authorization: Bearer <token>
Authentication can be disabled by setting
quarkus.http.auth.permission.bearer.policy
to permit
.
Data ingestion clients
StreamX Ingestion REST Clients uses HTTP protocol. You can use it directly or by using Java Client or by using CLI.
Usage
Below examples assume authentication is disabled. For enabled authentication each request must contain Authentication Header
|
Ingest data:
curl -X POST http://${DOMAIN}:${PORT}/ingestion/v1/channels/${CHANNEL_NAME}/messages \
-H 'Content-Type: application/json' \
-d "{
\"key\" : \"${KEY}\",
\"action\" : \"${ACTION}\",
\"eventTime\" : \"${EVENT_TIME}\",
\"properties\" : \"${METADATA_PROPERTIES}\",
\"payload\" : \"${PAYLOAD}\"
}"
Where:
${CHANNEL_NAME}
- inboxes channel name
${KEY}
- identifier of the ingested data
${ACTION}
- publish/unpublish
${EVENT_TIME}
- use null for current time; or provide value representing number of milliseconds from 1970-01-01T00:00:00Z. Examples:
"eventTime": null,
"eventTime": { "long": 1729759830189 }
${METADATA_PROPERTIES}
- object containing optional metadata properties; values must be strings. Example:
"properties": { "authorId": "534756348" }
${PAYLOAD}
- ingestion payload; can be null for unpublish action; if present must match the channel schema. Examples:
"payload": null
"payload" : { "dev.streamx.blueprints.data.Page" : { "content" : { "bytes" : "<h1>Hello World!</h1>" } } }
Example
curl -X POST http://${DOMAIN}:${PORT}/ingestion/v1/channels/pages/messages \
-H 'Content-Type: application/json' \
-H "Authorization: Bearer ${TOKEN}" \
-d '{
"key" : "/homepage.html",
"action" : "publish",
"eventTime" : null,
"properties" : { },
"payload" : {
"dev.streamx.blueprints.data.Page" : {
"content" : {
"bytes" : "<h1>Hello World!</h1>"
}
}
}
}'
StreamX Ingestion API Java Client
The StreamX Ingestion API Java client allows you to use the StreamX Ingestion API from your Java application.
StreamX CLI
Similar to the Java Client, the StreamX CLI makes calls to the REST Ingestion Service. More about the CLI.
Observability
-
Metrics - Rest Ingestion Service exposes Micrometer metrics with Prometheus registry at
/q/metrics/
endpoint. -
Traceability - Publish/Unpublish requests are marked with a Pulsar trace, which is propagated through StreamX Mesh by using OpenTelemetry tracing.
Configuration
REST Ingestion Service communicate with StreamX Mesh through Apache Pulsar
topics by using
SmallRye
Reactive Messaging. mp.messaging.incoming.ingestion
is the internal
channel, which can be used to propagate Pulsar Client Configuration. More
information about Pulsar Configuration can be found in
SmallRye
Pulsar Reference.
Schema discovery in StreamX Mesh requires use of Pulsar Admin client
which is not provided by Quarkus. Pulsar Admin configuration is not
possible with MicroProfile properties, because it is intended for SmallRye managed client. REST Ingestion Service requires pulsar.admin.serviceUrl
to
establish connection for schema discovery.
To distinguish different tenants in StreamX, streamx.tenant
is
required.
Property |
Description |
Required |
|
StreamX Mesh tenant |
Yes |
|
Pulsar Client URL |
Yes |
|
Pulsar Admin Client URL for channels schema discovery |
Yes |