wrapper for all (to-be) kinds of message brokers (go v1.16.x)
- RabbitMQ (fanout)
- Google Cloud Pub/Sub
- NSQ
- Generic terms & functions to use message brokers
- Auto reconnection
- Limit & requeue messages*
- Concurrent subscribers
- Support for mockgen unit-testing
# go get
$ go get github.com/febytanzil/gobroker
Complete examples are provided in examples
folder/ package
// initialize publisher RabbitMQ
p := pubsub.NewPublisher(gobroker.RabbitMQ, pubsub.RabbitMQAMQP("amqp://guest:guest@localhost:5672/", "vhost"))
p.Publish("test.fanout", "msg"+t.String())
// register RabbitMQ subscriber(s) & run it
s := pubsub.NewSubscriber(gobroker.RabbitMQ, []*pubsub.SubHandler{
{
Name: "test.consumer",
Topic: "test.fanout",
Handler: testRMQ,
MaxRequeue: 10,
Concurrent: 2,
MaxInFlight: 3,
},
}, pubsub.RabbitMQAMQP("amqp://guest:guest@localhost:5672/", "vhost"))
s.Start()
// initialize publisher Google
p := pubsub.NewPublisher(gobroker.Google, pubsub.GoogleJSONFile("gcp-project-id", "cluster-name", "/path/to/google/application/credentials/cred.json"))
p.Publish("test", "msg"+t.String())
// register Google subscriber(s) & run it
s := pubsub.NewSubscriber(gobroker.Google, []*pubsub.SubHandler{
{
Name: "consumer-test",
Topic: "test-topic",
Handler: testGoogle,
MaxRequeue: 10,
Concurrent: 3,
Timeout: 10 * time.Minute,
MaxInFlight: 1,
},
},
pubsub.GoogleJSONFile("gcp-project-id", "cluster-name", "/path/to/google/application/credentials/cred.json"))
s.Start()
// subcriber function format
// return nil will ack the message as success
// return error will requeue based on config
func testRMQ(msg *gobroker.Message) error {
var encoded string
gobroker.StdJSONCodec.Decode(msg.Body, &encoded)
log.Println("consume rabbitmq:", encoded)
return nil
}
func testGoogle(msg *gobroker.Message) error {
var encoded string
gobroker.StdJSONCodec.Decode(msg.Body, &encoded)
log.Println("consume google pubsub", encoded)
return errors.New("requeue msg body: " + encoded)
}
Due to requeue limiter, the behavior both in RabbitMQ & Google Pub/Sub is changed to republish to the topic with additional header that contains counter to make this possible
Please use a fork to create a pull request