REST Ingestion Service

The REST Ingestion Service is the entry point to StreamX. You can ingest any data by using the common HTTP protocol with JSON Avro format which is supported by StreamX. It allows you to:

  1. browse schemas - lists schemas registered for inboxes channels in StreamX.

  2. publish data - sends publication data to be processed. Publication can be any event like price change, page update or trigger for some data processing.

  3. unpublish data - sends an unpublish trigger to inform StreamX that data identified with the given key is obsolete. Unpublishing can be used when a page is no longer desired online, or a product is no longer available, and in other similar scenarios.

Available list of schemas corresponding to the inboxes channels where data can be published and unpublished, depends on the current state of the StreamX Mesh. The list of channels and schemas are resolved from the other services running in the mesh. The REST Ingestion Service accepts data in Avro format and validates it against the Avro schema registered for the given channel. StreamX rejects data with an incorrect format as well as data that can not be serialized to channel’s Avro schema.

Available endpoints

Endpoint

Method

Parameters

Request Body

Responses

/publications/v1/schema

GET

None

None

200 OK: Schema refreshed successfully

500 Error: Error while getting latest schemas

/publications/v1/{channel}/{key}

PUT

channel

key

application/json

202 Accepted: Publishing request accepted

400 Bad Request: Publishing request rejected

500 Error: Error while processing publishing request

/publications/v1/{channel}/{key}

DELETE

channel

key

None

202 Accepted: Unpublishing request accepted

400 Bad Request: Unpublishing request rejected

500 Error: Error while processing unpublishing request

Authentication

REST Ingestion Service endpoints can be secured to restrict StreamX ingestion access only to verified parties. Quarkus framework used by REST Ingestion Service includes an authentication mechanism. By default, REST Ingestion Service enables authentication by using bearer authentication based on JWT. For more information, refer to the Quarkus Security JWT Configuration Reference.

Auth configuration

quarkus.http.auth.permission.bearer.paths=/publications/v1/*
quarkus.http.auth.permission.bearer.policy=authenticated
quarkus.http.auth.permission.bearer.auth-mechanism=bearer

Authentication can be disabled by setting
quarkus.http.auth.permission.bearer.policy to false.

Data ingestion clients

StreamX Ingestion REST Clients uses HTTP protocol. You can use it directly or by using Java Client or by using CLI.

Usage

Fetch Schema:

curl -s http://${DOMAIN}:${PORT}/publications/v1/schema

Publish Data:

curl -X PUT "http://${DOMAIN}:${PORT}/publications/v1/pages/page.html" -H 'Content-Type: application/json' -d "{\"content\": {\"bytes\": \"${PAGE_CONTENT}\"}}"

Delete Data:

curl -X DELETE "http://${DOMAIN}:${PORT}/publications/v1/pages/page.html"

StreamX Ingestion API Java Client

The StreamX Ingestion API Java client allows you to use the StreamX Ingestion API from your Java application.

StreamX CLI

Similar to the Java Client, the StreamX CLI makes calls to the REST Ingestion Service. More about the CLI.

Usage

streamx publish -s 'content.bytes=file://site/index.html' pages index.html
streamx unpublish pages index.html

Observability

  1. Metrics - Rest Ingestion Service exposes Micrometer metrics with Prometheus registry at /q/metrics/ endpoint.

  2. Traceability - Publish/Unpublish requests are marked with a Pulsar trace, which is propagated through StreamX Mesh using OpenTelemetry tracing.

Configuration

REST Ingestion Service communicate with StreamX Mesh through Apache Pulsar topics by using SmallRye Reactive Messaging. mp.messaging.incoming.ingestion is the internal channel, which can be used to propagate Pulsar Client Configuration. More information about Pulsar Configuration can be found in SmallRye Pulsar Reference.

Schema discovery in StreamX Mesh requires use of Pulsar Admin client which is not provided by Quarkus. Pulsar Admin configuration is not possible with MicroProfile properties as it is for SmallRye managed client. REST Ingestion Service requires pulsar.admin.serviceUrl to establish connection for schema discovery.

To distinguish different tenants in StreamX, streamx.tenant is required.

Property

Description

Required

streamx.tenant

StreamX Mesh tenant

Yes

mp.messaging.incoming.ingestion.serviceUrl

Pulsar Client URL

Yes

pulsar.admin.serviceUrl

Pulsar Admin Client URL for channels schema discovery

Yes