RTMB is a lightweight, high-performance, real-time message broker written in Go. It implements a publish-subscribe (pub-sub) messaging pattern similar to NATS, allowing clients to communicate asynchronously by publishing messages to subjects and subscribing to them.
- Features
- Architecture
- Installation
- Usage
- Code Structure
- Real-Time Messaging
- Comparison with Other Message Brokers
- Testing
- Contributing
- License
- Publish-Subscribe Messaging: Clients can publish messages to subjects and subscribe to them to receive messages.
- Subject Hierarchy with Wildcards: Supports hierarchical subjects with single-level (
*
) and multi-level (>
) wildcards for flexible subscription patterns. - Real-Time Communication: Designed for low-latency, real-time messaging without persisting data to disk.
- Concurrency Support: Handles multiple clients concurrently with proper synchronization.
- Thread-Safe: Implements a consistent locking mechanism to ensure thread safety.
- Lightweight and Efficient: Uses efficient data structures and minimizes locking overhead for high performance.
- Extensive Testing: Includes comprehensive unit tests covering various scenarios.
RTMB consists of a server that handles client connections, parses incoming commands, and manages message delivery using a trie-based topic system. The main components are:
- Server: Accepts client connections and spawns goroutines to handle each client.
- Commands Package: Parses and handles commands sent by clients.
- Topic Package: Manages subjects, subscriptions, and message delivery.
- Client Connection: Represents a client connection and handles message sending.
To build and run RTMB, you need to have Go installed (version 1.16 or later is recommended).
-
Clone the repository:
git clone https://github.com/islamghany/rtmb.git cd rtmb
-
Build the server:
go build -o rtmb
Run the RTMB server:
./rtmb
By default, the server listens on port 8080
. You can specify a different port using the -port
flag:
./rtmb -port 4222
Clients can connect to the RTMB server using TCP and communicate using simple text-based commands. You can use tools like telnet
, nc
(netcat), or write your own client in any programming language that supports TCP sockets.
- INFO: Provides information about the server.
- CONNECT: Establish a connection with the server.
- PING / PONG: Heartbeat mechanism to keep the connection alive.
- SUB: Subscribe to a subject to receive messages.
- UNSUB: Unsubscribe from a subject.
- PUB: Publish a message to a subject.
Below is a detailed description of each command, including its syntax and usage.
When the client connects to the server, the server may send an INFO
message containing information about the server.
-
Example:
INFO {"server_id":"rtmb-1.0","version":"1.0.0","proto":"tcp","host":"localhost","port":4222,"auth_required":false,"ssl_required":false,"ssl_verify":false,"max_payload":1048576}
Establishes a connection with the server.
-
Syntax:
CONNECT {}
-
Description:
- The
CONNECT
command initiates a connection with the server. - The
{}
represents an optional JSON payload for future extensions but is currently unused.
- The
-
Example:
CONNECT {}
-
Server Response:
- no response is sent upon successful connection.
Heartbeat mechanism to keep the connection alive.
-
PING
-
Syntax:
PING
-
Description:
-
PING/PONG implement a simple keep-alive mechanism to ensure the client is still connected it will send a PING to the client every amount of time and expect a PONG in return within a certain time frame.
-
If the client does not respond with PONG, the connection will be closed
-
-
-
PONG
-
Syntax:
PONG
-
Description:
- Sent by the client in response to a PING command from the server.
- Indicates that the client is still connected and responsive.
-
-
Example:
PING
-
Client Response:
PONG
-
Subscribe to a subject to receive messages.
-
Syntax:
SUB <subject> <sid>
<subject>
: The subject to subscribe to (e.g.,foo.bar
,qux.*
,baz.>
, foo.*.bar`).<sid>
: A unique subscription identifier (string).
-
Description:
- The
SUB
command registers the client to receive messages published to the specified subject. - Supports wildcards (
*
for single-level,>
for multi-level).
- The
-
Example:
SUB foo.bar sid1
-
Server Response:
-
If the request is successful, the server responds with a
+OK
message upon successful subscription.+OK
-
If the subscription fails, the server responds with an error message. For example:
-ERR 'SUB command: insufficient arguments'
-
Unsubscribe from a subject.
-
Syntax:
UNSUB <sid> [max_msgs]
<sid>
: The subscription ID to unsubscribe.[max_msgs]
(optional): If provided, the server will unsubscribe the client after sendingmax_msgs
additional messages.
-
Description:
- The
UNSUB
command removes the subscription associated with the givensid
. - If
max_msgs
is specified, the server will unsubscribe the client aftermax_msgs
more messages are sent.
- The
-
Examples:
-
Unsubscribe immediately:
UNSUB sid1
-
Unsubscribe after receiving 5 more messages:
UNSUB sid1 5
-
-
Server Response:
-
If the request is successful, the server responds with a
+OK
message upon successful unsubscription.+OK
-
If the unsubscription fails, the server responds with an error message. For example:
-ERR 'UNSUB command: subscription not found'
-
Publish a message to a subject.
-
Syntax:
PUB <subject> <msg_len> <message>
<subject>
: The subject to publish the message to.<msg_len>
: The length of the message payload in bytes.<message>
: The message payload.
-
Description:
- The
PUB
command publishes a message to the specified subject. - The message payload follows the command, separated by a newline.
- The payload length must match
<msg_len>
.
- The
-
Example:
PUB foo.bar 11 Hello World
- In this example,
Hello World
is the message payload, which is 11 bytes long.
- In this example,
-
Server Response:
-
If the request is successful, the server responds with a
+OK
message upon successful publication.+OK
-
If the publication fails, the server responds with an error message. For example:
-ERR 'PUB command: insufficient arguments'
-
-
Subject Format:
- Subjects are strings separated by dots (
.
), forming a hierarchy (e.g.,foo.bar.baz
). - Each token in the subject must be a non-empty string.
- Subjects are strings separated by dots (
-
Single-Level Wildcard (
*
):- Matches any token at a single level in the subject hierarchy.
- Example:
- Subscription:
foo.*.baz
- Matches:
foo.bar.baz
,foo.qux.baz
- Subscription:
-
Multi-Level Wildcard (
>
):- Matches any number of tokens at or beyond its position.
- Must be the last token in the subject.
- Example:
- Subscription:
foo.>
- Matches:
foo.bar
,foo.bar.baz
,foo.bar.baz.qux
- Subscription:
Below is an example of how to use RTMB using telnet
.
-
Connect to the Server:
telnet localhost 8080
-
Establish a Connection:
CONNECT {}
-
Subscribe to a Subject:
SUB foo.bar sid1
-
Publish a Message:
Open another terminal and connect to the server:
telnet localhost 8080
Then send:
CONNECT {} PUB foo.bar 12 Hello World!
-
Receive the Message:
-
The client subscribed to
foo.bar
will receive:MSG foo.bar sid1 12 Hello World!
-
-
Unsubscribe from a Subject:
UNSUB sid1
-
Ping-Pong to Keep Alive:
PING
-
Client responds:
PONG
-
The codebase is organized into several packages:
main.go
: Entry point of the server.commands
: Handles parsing and execution of client commands.topic
: Manages subjects, subscriptions, and message delivery.parser
: Parses client commands.client_connection
: Manages individual client connections.
The commands
package is responsible for handling parsed commands from clients and interacting with the topic
package to manage subscriptions and message delivery.
Key components:
Commander
: Struct that encapsulates state and methods to handle commands for a client.HandleCommand
: Dispatches commands based on their names.- Command Handlers:
handleConnect
,handleSub
,handlePub
,handleUnsub
. - Client ID Management: Assigns a unique client ID using UUIDs.
Example of the Commander
struct:
type Commander struct {
conn net.Conn
topic *topic.Topic
clientID string
clientSubs map[string]string // map of SID to subject
port string
}
The topic
package implements a trie data structure to manage subjects and subscriptions. It handles adding/removing subscribers, matching subjects for message delivery, and ensures thread safety through a consistent locking mechanism.
- Trie Structure: Represents subjects hierarchically, allowing efficient matching and wildcard support.
- Subscription Management: Adds and removes subscribers based on subjects and subscription IDs.
- Message Delivery: Matches published subjects to subscribers and delivers messages.
To ensure thread safety, the topic
package uses per-node locks (sync.RWMutex
) for each node in the trie. This approach allows concurrent read access and safe modifications.
Locking Strategy:
- Traversal and Modification:
- Locks are acquired in a consistent order: parent node is locked, then child node is locked before releasing the parent lock.
- This prevents deadlocks and ensures thread safety.
- Use of
defer
:- Locks are unlocked using
defer
immediately after they are acquired for the final node.
- Locks are unlocked using
- Avoiding Deadlocks:
- By not holding multiple locks simultaneously and locking nodes in a consistent order, deadlocks are prevented.
Example of Adding a Subscriber:
func (t *Topic) AddSubscriber(sub *Subscription) error {
// Validate subject and initialize traversal
parts := splitSubject(sub.Subject)
current := t.root
current.mu.Lock()
for _, part := range parts {
child, exists := current.children[part]
if !exists {
child = NewTrieNode()
current.children[part] = child
}
// Lock the child before unlocking the parent
child.mu.Lock()
current.mu.Unlock()
current = child
}
// At this point, current.mu is locked for the target node
defer current.mu.Unlock()
if _, exists := current.subscribers[sub.SID]; exists {
return fmt.Errorf("subscription with SID %s already exists for subject %s", sub.SID, sub.Subject)
}
current.subscribers[sub.SID] = sub
return nil
}
The ClientConnection
manages individual client connections, including sending messages asynchronously using a buffered channel.
Key features:
- Asynchronous Message Sending: Uses a write loop to send messages without blocking the main thread.
- Thread Safety: Uses a mutex to protect access to the connection when writing messages.
- Graceful Shutdown: Implements a
Close
method to properly shut down the connection.
Example of the ClientConnection
struct:
type ClientConnection struct {
ID string
Conn io.Writer
mu sync.Mutex
writeChan chan *Message // Channel for async message writing
done chan struct{} // Channel to signal shutdown
}
In the context of RTMB, "real-time" refers to the system's ability to deliver messages with minimal latency, ensuring that subscribers receive messages as soon as they are published. This is achieved by:
- In-Memory Operation: RTMB operates entirely in memory and does not persist data to disk. This eliminates disk I/O overhead, resulting in faster message processing and delivery.
- No Message Persistence: Messages are not stored persistently. If the server restarts or crashes, messages that have not been delivered are lost. This design choice prioritizes low latency over durability.
- Optimized for Speed: The server uses efficient data structures and minimizes locking overhead to handle high-throughput, low-latency messaging.
- High Performance: By not persisting messages to disk, RTMB can achieve higher throughput and lower latency compared to brokers that write messages to disk.
- No Message Durability: Messages are transient. If a subscriber is not connected at the time of publishing, it will not receive the message later.
- Use Cases: Suitable for applications where real-time data delivery is critical, and message loss in the event of failures is acceptable (e.g., live data feeds, real-time analytics, gaming).
RabbitMQ is a robust message broker that supports multiple messaging protocols and offers features like message persistence, acknowledgments, and complex routing.
- Message Persistence:
- RabbitMQ: Supports durable queues and message persistence to disk, ensuring messages are not lost if the broker restarts.
- RTMB: Does not persist messages to disk, prioritizing low latency over durability.
- Features:
- RabbitMQ: Offers a wide range of features, including transactions, message acknowledgments, and advanced routing with exchanges.
- RTMB: Focuses on simplicity and high-speed pub-sub messaging without additional overhead.
- Use Cases:
- RabbitMQ: Suitable for applications requiring reliable message delivery, complex routing, and where durability is important.
- RTMB: Ideal for real-time applications where speed is critical, and occasional message loss is acceptable.
Apache Kafka is a distributed streaming platform designed for high-throughput, scalable messaging with strong durability guarantees.
- Message Persistence:
- Kafka: Persists all messages to disk and replicates them across multiple brokers for fault tolerance.
- RTMB: Operates in-memory without persisting messages.
- Data Retention:
- Kafka: Allows configuring retention policies to keep messages for a specified duration or size.
- RTMB: Messages are transient and only exist in memory until delivered.
- Throughput and Latency:
- Kafka: Optimized for high throughput, capable of handling large volumes of data with some latency.
- RTMB: Prioritizes low latency over throughput, delivering messages in real-time.
- Use Cases:
- Kafka: Suitable for event streaming, log aggregation, and scenarios where message durability and ordering are essential.
- RTMB: Best for real-time messaging needs where immediate delivery is more important than durability.
Feature | RTMB | RabbitMQ | Apache Kafka |
---|---|---|---|
Message Persistence | No (in-memory only) | Yes (optional) | Yes (always) |
Latency | Low (real-time delivery) | Moderate | Moderate |
Throughput | High | Moderate | Very High |
Durability | No | Yes (configurable) | Yes (built-in) |
Complexity | Simple | Moderate | Complex |
Use Cases | Real-time messaging | Reliable message delivery | Event streaming and processing |
The project includes a comprehensive set of unit tests in topic/topic_test.go
that cover various scenarios:
- Adding subscribers
- Removing subscribers
- Matching and delivering messages
- Wildcard subscriptions
- Handling unsubscriptions with
MaxMsgs
- Concurrent operations
To run the tests, navigate to the project directory and execute:
go test -race ./...
Using the -race
flag enables the race detector to catch any data races during concurrent operations.
func TestMatchAndDeliver(t *testing.T) {
topic := NewTopic()
client1 := NewMockClientConnection("client1")
client2 := NewMockClientConnection("client2")
sub1 := &Subscription{
ClientID: client1.ID,
SID: "sid1",
Subject: "foo.bar",
Client: client1,
}
sub2 := &Subscription{
ClientID: client2.ID,
SID: "sid2",
Subject: "foo.*",
Client: client2,
}
err := topic.AddSubscriber(sub1)
if err != nil {
t.Errorf("Failed to add subscriber 1: %v", err)
}
err = topic.AddSubscriber(sub2)
if err != nil {
t.Errorf("Failed to add subscriber 2: %v", err)
}
msg := []byte("hello world")
subscribers, err := topic.MatchAndDeliver("foo.bar", msg)
if err != nil {
t.Errorf("Failed to match and deliver: %v", err)
}
if len(subscribers) != 2 {
t.Errorf("Expected 2 subscribers, got %d", len(subscribers))
}
// Check if clients received the message
if len(client1.Messages) != 1 {
t.Errorf("Client 1 should have received 1 message, got %d", len(client1.Messages))
}
if len(client2.Messages) != 1 {
t.Errorf("Client 2 should have received 1 message, got %d", len(client2.Messages))
}
}
Contributions are welcome! Please follow these steps:
-
Fork the repository.
-
Create a new branch:
git checkout -b feature/your-feature-name
-
Make your changes and commit them:
git commit -m "Add your message here"
-
Push to your fork:
git push origin feature/your-feature-name
-
Open a pull request.
Please ensure that your code adheres to the existing coding style and includes tests where appropriate.
This project is licensed under the terms of the MIT License.