Elixir Client for RabbitMQ Streams Protocol.
RabbiMQ 3.9 introduced the Streams as an alternative to Queues, but differs mainly by implementing a "non-destructive consumer semantics". A consumer can read the messages starting at any offset, while receiving new messages.
While this feature is avaiable when using the existing Queues, it shines when used with its dedicated protocol, that allows messages to be consumed extremelly fast, in comparisson to Queues.
This library aims to be a Client for the Streams Protocol, managing connections and providing an idiomatic way of interacting with all the features avaiable for this functionallity.
- Producing and Consuming from Streams
- Offset Tracking and Control Flow(Credits) helpers
- Stream Filtering
- Single Active Consumer
- Super Streams
The package can be installed by adding rabbitmq_stream
to your list of dependencies in mix.exs
:
def deps do
[
{:rabbitmq_stream, "~> 0.4.1"},
# ...
]
end
RabbitMQ Streams protocol needs a static :reference_name
per producer. This is used to prevent message duplication. For this reason, each stream needs, for now, a static module to publish messages, which keeps track of its own publishing_id
.
You can define a Producer
module like this:
defmodule MyApp.MyProducer do
use RabbitMQStream.Producer,
stream_name: "stream-01",
connection: MyApp.MyConnection
end
Then you can publish messages to the stream:
MyApp.MyProducer.publish("Hello World")
First you define a connection
defmodule MyApp.MyConnection do
use RabbitMQStream.Connection
end
You then can declare a consumer module with the RabbitMQStream.Consumer
:
defmodule MyApp.MyConsumer do
use RabbitMQStream.Consumer,
connection: MyApp.MyConnection,
stream_name: "my_stream",
initial_offset: :first
@impl true
def handle_message(_message) do
:ok
end
end
Or you could manually consume from the stream with
{:ok, _subscription_id} = MyApp.MyConnection.subscribe("stream-01", self(), :next, 999)
The caller process will start receiving messages with the format {:deliver, %RabbitMQStream.Message.Types.DeliverData{} = deliver_data}
def handle_info({:deliver, %RabbitMQStream.Message.Types.DeliverData{} = deliver_data}, state) do
# do something with message
{:noreply, state}
end
A super stream is a logical stream made of individual, regular streams.
You can declare SuperStreams with:
:ok = MyApp.MyConnection.create_super_stream("my_super_stream", "route-A": ["stream-01", "stream-02"], "route-B": ["stream-03"])
And you can consume from it with:
defmodule MyApp.MySuperConsumer do
use RabbitMQStream.SuperConsumer,
initial_offset: :next,
super_stream: "my_super_stream"
@impl true
def handle_message(_message) do
# ...
:ok
end
end
The configuration for the connection can be set in your config.exs
file:
config :rabbitmq_stream, MyApp.MyConnection,
username: "guest",
password: "guest"
# ...
end
You can configure a default Serializer module by passing it to the defaults configuration option
config :rabbitmq_stream, :defaults,
serializer: Jason
end
You can configure the RabbitmqStream to use TLS connections:
coonfig :rabbitmq_stream, :defaults,
connection: [
transport: :ssl,
ssl_opts: [
keyfile: "services/cert/client_box_key.pem",
certfile: "services/cert/client_box_certificate.pem",
cacertfile: "services/cert/ca_certificate.pem"
]
]
For more information, check the documentation.