Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub/rabbitpubsub: add query string set the qos prefetch count #3431

Merged
merged 7 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pubsub/rabbitpubsub/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type amqpChannel interface {
QueueDeclareAndBind(qname, ename string) error
ExchangeDelete(string) error
QueueDelete(qname string) error
Qos(prefetchCount, prefetchSize int, global bool) error
}

// connection adapts an *amqp.Connection to the amqpConnection interface.
Expand All @@ -79,6 +80,7 @@ func (c *connection) Channel() (amqpChannel, error) {
if err := ch.Confirm(wait); err != nil {
return nil, err
}

return &channel{ch}, nil
}

Expand Down Expand Up @@ -168,3 +170,7 @@ func (ch *channel) QueueDelete(qname string) error {
_, err := ch.ch.QueueDelete(qname, false, false, false)
return err
}

func (ch *channel) Qos(prefetchCount, prefetchSize int, global bool) error {
return ch.ch.Qos(prefetchCount, prefetchSize, global)
}
8 changes: 8 additions & 0 deletions pubsub/rabbitpubsub/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,14 @@ func (ch *fakeChannel) QueueDelete(name string) error {
return nil
}

func (ch *fakeChannel) Qos(_, _ int, _ bool) error {
if ch.isClosed() {
return amqp.ErrClosed
}

return nil
}

// Assumes nothing is ever written to the channel.
func chanIsClosed(ch chan struct{}) bool {
select {
Expand Down
65 changes: 59 additions & 6 deletions pubsub/rabbitpubsub/rabbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"os"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -96,7 +97,9 @@ const Scheme = "rabbit"
//
// For subscriptions, the URL's host+path is used as the queue name.
//
// No query parameters are supported.
// An optional query string can be used to set the Qos consumer prefetch on subscriptions
// like "rabbit://myqueue?qos=1000" to set the consumer prefetch count to 1000
// see also https://www.rabbitmq.com/docs/consumer-prefetch
type URLOpener struct {
// Connection to use for communication with the server.
Connection *amqp.Connection
Expand All @@ -118,8 +121,25 @@ func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic

// OpenSubscriptionURL opens a pubsub.Subscription based on u.
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) {
for param := range u.Query() {
return nil, fmt.Errorf("open subscription %v: invalid query parameter %q", u, param)
peczenyj marked this conversation as resolved.
Show resolved Hide resolved
for param, value := range u.Query() {
switch param {
case "qos":
peczenyj marked this conversation as resolved.
Show resolved Hide resolved
if len(value) == 0 {
return nil, fmt.Errorf("open subscription %v: invalid query parameter %q", u, param)
}
count := value[0]
if o.SubscriptionOptions.Qos == nil {
o.SubscriptionOptions.Qos = new(Qos)
}
prefetchCount, err := strconv.Atoi(count)
if err != nil {
return nil, fmt.Errorf("open subscription %v: invalid query parameter %q", u, count)
}

o.SubscriptionOptions.Qos.PrefetchCount = prefetchCount
default:
return nil, fmt.Errorf("open subscription %v: invalid query parameter %q", u, param)
}
}
queueName := path.Join(u.Host, u.Path)
return OpenSubscription(o.Connection, queueName, &o.SubscriptionOptions), nil
Expand All @@ -142,7 +162,15 @@ type TopicOptions struct{}

// SubscriptionOptions sets options for constructing a *pubsub.Subscription
// backed by RabbitMQ.
type SubscriptionOptions struct{}
type SubscriptionOptions struct {
// Qos properties
Qos *Qos
}

// Qos options to be used when we create a subscription.
peczenyj marked this conversation as resolved.
Show resolved Hide resolved
type Qos struct {
PrefetchCount int
}

// OpenTopic returns a *pubsub.Topic corresponding to the named exchange.
// See the package documentation for an example.
Expand Down Expand Up @@ -515,14 +543,16 @@ func (*topic) Close() error { return nil }
// The documentation of the amqp package recommends using separate connections for
// publishing and subscribing.
func OpenSubscription(conn *amqp.Connection, name string, opts *SubscriptionOptions) *pubsub.Subscription {
return pubsub.NewSubscription(newSubscription(&connection{conn}, name), nil, nil)
return pubsub.NewSubscription(newSubscription(&connection{conn}, name, opts), nil, nil)
}

type subscription struct {
conn amqpConnection
queue string // the AMQP queue name
consumer string // the client-generated name for this particular subscriber

opts *SubscriptionOptions

mu sync.Mutex
ch amqpChannel // AMQP channel used for all communication.
delc <-chan amqp.Delivery
Expand All @@ -533,11 +563,12 @@ type subscription struct {

var nextConsumer int64 // atomic

func newSubscription(conn amqpConnection, name string) *subscription {
func newSubscription(conn amqpConnection, name string, opts *SubscriptionOptions) *subscription {
return &subscription{
peczenyj marked this conversation as resolved.
Show resolved Hide resolved
conn: conn,
queue: name,
consumer: fmt.Sprintf("c%d", atomic.AddInt64(&nextConsumer, 1)),
opts: opts,
receiveBatchHook: func() {},
}
}
Expand All @@ -564,15 +595,37 @@ func (s *subscription) establishChannel(ctx context.Context) error {
if err != nil {
return err
}
// applay subscription options to channel.
peczenyj marked this conversation as resolved.
Show resolved Hide resolved
err = applyOptionsToChannel(s.opts, ch)
peczenyj marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
// Subscribe to messages from the queue.
s.delc, err = ch.Consume(s.queue, s.consumer)
return err
})
if err != nil {
return err
}

s.ch = ch
s.closec = ch.NotifyClose(make(chan *amqp.Error, 1)) // closec will get at most one element

return nil
}

func applyOptionsToChannel(opts *SubscriptionOptions, ch amqpChannel) error {
if opts == nil {
peczenyj marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

if opts.Qos != nil {
err := ch.Qos(opts.Qos.PrefetchCount, 0, false)
if err != nil {
return fmt.Errorf("unable to set channel Qos: %w", err)
}
}

return nil
}

Expand Down
64 changes: 62 additions & 2 deletions pubsub/rabbitpubsub/rabbit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const rabbitURL = "amqp://guest:guest@localhost:5672/"
var logOnce sync.Once

func mustDialRabbit(t testing.TB) amqpConnection {
t.Helper()

if !setup.HasDockerTestEnvironment() {
logOnce.Do(func() {
t.Log("using the fake because the RabbitMQ server is not available")
Expand Down Expand Up @@ -138,12 +140,12 @@ func (h *harness) CreateSubscription(_ context.Context, dt driver.Topic, testNam
}
ch.QueueDelete(queue)
}
ds = newSubscription(h.conn, queue)
ds = newSubscription(h.conn, queue, nil)
return ds, cleanup, nil
}

func (h *harness) MakeNonexistentSubscription(_ context.Context) (driver.Subscription, func(), error) {
return newSubscription(h.conn, "nonexistent-subscription"), func() {}, nil
return newSubscription(h.conn, "nonexistent-subscription", nil), func() {}, nil
}

func (h *harness) Close() {
Expand Down Expand Up @@ -425,6 +427,8 @@ func TestOpenSubscriptionFromURL(t *testing.T) {
{"rabbit://myqueue", true},
// Invalid parameter.
{"rabbit://myqueue?param=value", true},
// Invalid value for an existing parameter.
{"rabbit://myqueue?qos=value", true},
}

ctx := context.Background()
Expand All @@ -438,3 +442,59 @@ func TestOpenSubscriptionFromURL(t *testing.T) {
}
}
}

func TestOpenSubscriptionFromURLViaRealServer(t *testing.T) {
t.Setenv("RABBIT_SERVER_URL", rabbitURL)

tests := []struct {
label string
URL string
WantErr bool
}{

{"url with no qos", "rabbit://%s", false},
{"invalid parameters", "rabbit://%s?param=value", true},
{"valid url with qos", "rabbit://%s?qos=1024", false},
{"invalid url with qos", "rabbit://%s?qos=value", true},
}

for _, test := range tests {
t.Run(test.label, func(t *testing.T) {
conn := mustDialRabbit(t)
_, isFake := conn.(*fakeConnection)
if isFake {
t.Skip("test requires real rabbitmq")
}

h := &harness{conn: conn}

ctx := context.Background()

dt, cleanupTopic, err := h.CreateTopic(ctx, t.Name())
if err != nil {
t.Fatalf("unable to create topic: %v", err)
}

t.Cleanup(cleanupTopic)

ds, cleanupSubscription, err := h.CreateSubscription(ctx, dt, t.Name())
if err != nil {
t.Fatalf("unable to create subscription: %v", err)
}

t.Cleanup(cleanupSubscription)

queue := ds.(*subscription).queue
url := fmt.Sprintf(test.URL, queue)

sub, err := pubsub.OpenSubscription(ctx, url)
if (err != nil) != test.WantErr {
t.Errorf("%s: got error %v, want error %v", test.URL, err, test.WantErr)
}

if sub != nil {
sub.Shutdown(ctx)
}
})
}
}