Fast, Lightweight Pub/Sub over TCP, QUIC - powered by Async I/O
Few days back I worked on an Embeddable, Fast, Light-weight Pub/Sub System for Go Projects, called pubsub
, which is built only using native Go functionalities & as the title suggests, you can embed that system in your application for doing in-app message passing using any of following patterns
- Single Publisher Single Subscriber
- Single Publisher Multiple Subscriber
- Multiple Publisher Single Subscriber
- Multiple Publisher Multiple Subscriber
That enables making multiple go routines talk to each other over topics. Also there's no involvement of network I/O, so all operations are quite low-latency.
If you're interested in taking a look at
pubsub
Now I'm interested in extending aforementioned pubsub
architecture to a more generic form so that clients i.e. {publishers, subscribers} can talk to Pub/Sub Hub over network i.e. TCP, QUIC.
What it gives us is, ability to publish messages to topics over network, where Pub/Sub Hub might sit somewhere else; subscribe to topics of interest & keep receiving messages as soon as they're published, over network.
QUIC to be preferred choice of network I/O, due to benefits it brings on table.
⭐️ Primary implementation is on top of TCP.
Add pub0sub
into your project ( GOMOD enabled )
go get -u github.com/itzmeanjan/pub0sub/...
pub0sub
has three components
You probably would like to use 0hub
for this purpose.
Default Port | Default Interface |
---|---|
13000 | 127.0.0.1 |
Build using
make build_hub
Run using
./0hub -help
./0hub # run
Single step build-and-run with
make hub
If interested, you can check
0hub
implementation here
Dockerised 0hub
You may prefer Docker-ised setup for running Hub
make docker_hub # assuming docker daemon running
List images
docker images # find `0hub` in list
Remove <none>
tagged images [ optional ]
docker images -a | grep none | awk '{ print $3}' | xargs docker rmi --force
Run 0hub
as container. You may want to change ENV variables, check defaults in 0hub.env
make run_hub
docker ps # must list `hub`
Or, if you want
0hub
to be reachable onlocalhost:13000
docker run --name hub -p 13000:13000 --env-file 0hub.env -d 0hub # reachable at localhost:13000
Check container log
docker logs hub
If you want to inspect container
docker inspect hub
Stop when done
make stop_hub
make remove_hub
You can interact with Hub, using minimalistic publisher CLI client 0pub
. Implementation can be found here
Build using
make build_pub
Run using
./0pub -help
./0pub # run
Single step build-and-run with
make pub
, using defaults
You're probably interested in publishing messages programmatically.
- Let's first create a publisher, which will establish TCP connection with Hub
ctx, cancel := context.WithCancel(context.Background())
pub, err := publisher.New(ctx, "tcp", "127.0.0.1:13000")
if err != nil {
return
}
- Construct message you want to publish
data := []byte("hello")
topics := []string{"topic_1", "topic_2"}
msg := ops.Msg{Topics: topics, Data: data}
- Publish message
n, err := pub.Publish(&msg)
if err != nil {
return
}
log.Printf("Approximate %d receiver(s)\n", n)
- When done using publisher instance, cancel context, which will tear down network connection gracefully
cancel()
<-time.After(time.Second) // just wait a second
- You can always check whether network connection with Hub in unaffected or not
if pub.Connected() {
log.Println("Yes, still connected")
}
You're encouraged to first test out 0sub
- minimalistic CLI subscriber client for interacting with Hub.
Build using
make build_sub
Run using
./0sub -help
./0sub # run
Take a look at implementation here
Single step build-and-run using
make sub
, runs with default config
But probably you want to programmatically interact with Hub for subscribing to topics of interest & receive messages as soon as they're published
- Start by creating subscriber instance, which will establish a long-lived TCP connection with Hub & subscribe initially to topics provided with
ctx, cancel := context.WithCancel(context.Background())
capacity := 128 // pending message inbox capacity
topics := []string{"topic_1", "topic_2"}
sub, err := subscriber.New(ctx, "tcp", "127.0.0.1:13000", capacity, topics...)
if err != nil {
return
}
- As soon as new message is available for consumption ( queued in inbox ), subscriber process to be notified over go channel. It's better to listen & pull message from inbox
for {
select {
case <-ctx.Done():
return
// watch to get notified
case <-sub.Watch():
if msg := sub.Next(); msg != nil {
// consume message
}
}
}
- You can add more on-the-fly topic subscriptions
n, err := sub.AddSubscription("topic_3")
if err != nil {
return
}
log.Printf("Subscribed to %d topic(s)\n", n)
- You might need to unsubscribe from topics
n, err := sub.Unsubscribe("topic_1")
if err != nil {
return
}
log.Printf("Unsubscribed from %d topic(s)\n", n)
- You can unsubscribe from all topics
n, err := sub.UnsubscribeAll()
if err != nil {
return
}
log.Printf("Unsubscribed from %d topic(s)\n", n)
- Any time you can check existence of unconsumed bufferred messages in inbox
if sub.Queued() {
log.Println("We've messages to consume")
if msg := sub.Next(); msg != nil {
// act on message
}
}
- Or you may need to check whether client is still connected to Hub over TCP
if sub.Connected() {
log.Println("Yes, still connected")
}
- When done using, it's better to gracefully tear down TCP connection
if err := sub.Disconnect(); err != nil {
// may happen when already teared down
log.Println(err.Error())
}
For running all test cases
go test -v -race -covermode=atomic ./... # excludes `stress` testing, check 👇
For running stress testing with 1k, 2k, 4k, 8k simultaneous TCP connections
go test -v -tags stress -run=8k # also try 1k/ 2k/ 4k
Make sure your system is able to open > 8k file handles at a time or you'll get
too many open files
Publisher's message publish flow
go test -run=XXX -tags stress -bench Publisher
Subscriber's message consumption flow
go test -run=XXX -tags stress -bench Subscriber
Subscriber's topic subscription flow
go test -run=XXX -tags stress -bench TopicSubscription
I wrote one simulator-visualiser tool --- which can be used for testing 0hub
with flexible configuration. You can ask it to use N -concurrent clients, all sending/ receiving messages to/ from M -topics. Finally it generates some simple visuals depicting performance.
Check here
More coming soon