Beam is an http based streaming and queueing system backed up by Apache Pulsar.
- A message can be sent to Pulsar via an HTTP POST method as a producer.
- A message can be pushed to a webhook or Cloud Function for consumption.
- A webhook or Cloud Function receives a message, process it and reply another message, in a response body, back to another Pulsar topic via Pulsar Beam.
- Messages can be streamed via HTTP Sever Sent Event, SSE
- Support HTTP polling of batch messages
Opening an issue and PR are welcomed! Please email contact@kafkaesque.io
for any inquiry or demo.
- Since Beam speaks http, it is language and OS independent. You can take advantage of powerhouse of Apache Pulsar without limitation of client library and OS.
Immediately, Pulsar can be supported on Windows and any languages with HTTP support.
-
It has a very small footprint with a 15MB docker image size.
-
Supports HTTP SSE streaming
REST API and endpoint swagger document is published at this link
This is the endpoint to POST
a message to Pulsar.
/v2/firehose/{persistent}/{tenant}/{namespace}/{topic}
Valid values of {persistent} are p
, persistent
, np
, nonpersistent
These HTTP headers may be required to map to Pulsar topic.
- Authorization -> Bearer token as Pulsar token
- PulsarUrl -> optional a fully qualified pulsar or pulsar+ssl URL where the message should be sent to. It is optional. The message will be sent to Pulsar URL specified under
PulsarBrokerURL
in the pulsar-beam.yml file if it is absent.
This is the endpoint to GET
messages from Pulsar as a consumer subscription
/v2/sse/{persistent}/{tenant}/{namespace}/{topic}
Valid values of {persistent} are p
, persistent
, np
, nonpersistent
These HTTP headers may be required to map to Pulsar topic.
- Authorization -> Bearer token as Pulsar token
- PulsarUrl -> optional a fully qualified pulsar or pulsar+ssl URL where the message should be sent to. It is optional. The message will be sent to Pulsar URL specified under
PulsarBrokerURL
in the pulsar-beam.yml file if it is absent.
Query parameters
- SubscriptionType -> Supported type strings are
exclusive
as default,shared
, andfailover
- SubscriptionInitialPosition -> supported type are
latest
as default andearliest
- SubscriptionName -> the length must be 5 characters or longer. An auto-generated name will be provided in absence. Only the auto-generated subscription will be unsubscribed.
Polls a batch of messages always from the earliest subscription position from a topic.
/v2/poll/{persistent}/{tenant}/{namespace}/{topic}
These HTTP headers may be required to map to Pulsar topic.
- Authorization -> Bearer token as Pulsar token
- PulsarUrl -> optional a fully qualified pulsar or pulsar+ssl URL where the message should be sent to. It is optional. The message will be sent to Pulsar URL specified under
PulsarBrokerURL
in the pulsar-beam.yml file if it is absent.
Query parameters
- SubscriptionType -> Supported type strings are
exclusive
as default,shared
, andfailover
- SubscriptionName -> the length must be 5 characters or longer. An auto-generated name will be provided in absence. Only the auto-generated subscription will be unsubscribed.
- batchSize -> Replies to a client when the batch size limit is reached. The default is 10 messages per batch.
- perMessageTimeoutMs -> is a time out to wait for the next message's arrival from a Pulsar topic. It is in milliseconds per message. The default is 300ms.
Webhook registration is done via REST API backed by a database of your choice, such as MongoDB, in momery cache, and Pulsar itself. Yes, you can use a compacted Pulsar topic as a database table to perform CRUD. The configuration parameter is "PbDbType": "inmemory",
in the pulsar_beam.yml
file or the env variable PbDbType
.
The management REAT API has this endpoint. Here is the swagger document
/v2/topic
Pulsar Beam can decode and authenticate JWT generated by Pulsar. Webhook management requires a subject in JWT that matches the tenant name in the topic full name. pulsar-admin token
can be used to generate such token.
Pulsar Beam requires the same public and private keys to generate and verify JWT. These public and private key should be specified in the config to be loaded.
To disable JWT authentication, set the paramater HTTPAuthImpl
in the config file or env variable to noauth
.
If a webhook's response contains a body and three headers including Authorization
for Pulsar JWT, TopicFn
for a topic fully qualified name, and PulsarUrl
, the beam server will send the body as a new message to the Pulsar's topic specified as in TopicFn and PulsarUrl.
Both json and yml format are supported as configuration file. The configuration paramters are specified by config.go. Every parameter can be overridden by an environment variable with the same name.
In order to offer high performance and division of responsiblity, webhook and receiver endpoint can run independently -mode broker
or -mode receiver
. By default, the server runs in a hybrid mode with all features running in the same process.
The docker image can be pulled from dockerhub.io.
$ sudo docker pull kafkaesqueio/pulsar-beam
Here are steps to build docker image and run docker container in a file based configuration.
- Build docker image
$ sudo docker build -t pulsar-beam .
- Run docker
This is an example of a default configurations using in-memory database. Customized
pulsar_beam.yml
and private and public key files can be mounted and passed in as an env variablePULSAR_BEAM_CONFIG
. The certificate is required to connect to Pulsar with TLS enabled.
$ sudo docker run -d -it -v /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem:/etc/ssl/certs/ca-bundle.crt -p 8085:8085 --name=pbeam-server pulsar-beam
gops
is built in the docker image for troubleshooting purpose.
Pulsar Beam can be deployed within the same cluster as Pulsar. This helm chart deploys a webhook broker in its own pod. The rest of HTTP receiver endpoint and REST API are deployed as a container within the Pulsar proxy pod, that offers scalability with multiple replicas.
Clone the repo at your gopath src/github.com/kafkaesque-io/pulsar-beam folder.
Install golint.
$ go install github.com/golang/lint
$ cd src
$ golint ./...
There are two scripts used for CI. You might want to run them in the local environment before submitting a PR. This CI script does linting, go vet and go build. The code coverage script runs unit test and tallies up the code coverage.
The steps how to start the web server.
$ cd src
$ go run main.go
There are scripts under ./scripts
folder to run code analysis, vetting, compilation, unit test, and code coverage manually as all of these are part of CI checks by Github Actions.
One end to end test is under ./src/e2e/e2etest.go
, that performs the following steps in order:
- Create a topic and its webhook via RESTful API. The webhook URL can be an HTTP triggered Cloud Function. CI process uses a GCP
- Send a message to Pulsar Beam's v1 injestion endpoint
- Waiting on the sink topic where the first message will be sent to a GCP Cloud Function (in CI) and in turn reply to Pulsar Beam to forward to the second sink topic
- Verify the replied message on the sink topic
- Delete the topic and its webhook document via RESTful API
Since the set up is non-trivial involving Pulsar Beam, a Cloud function or webhook, the test tool, and Pulsar itself with SSL, we recommend to take advantage of the free plan at kesque.com as the Pulsar server and a Cloud Function that we have verified GCP Fcuntion, Azure Function or AWS Lambda will suffice in the e2e flow.
Step to perform unit test
$ cd src/unit-test
$ go test -v .