-
Notifications
You must be signed in to change notification settings - Fork 52
Conversation
@@ -216,24 +214,14 @@ func (r *Receiver) Listen(ctx context.Context, handler Handler) *ListenerHandle | |||
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.Listen") | |||
defer span.End() | |||
|
|||
messages := make(chan *amqp.Message) | |||
go r.listenForMessages(ctx, messages) | |||
go r.handleMessages(ctx, messages, handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the handleMessages is now folded into the amqpAdapterHandler
that wraps the user handler
@@ -216,24 +214,14 @@ func (r *Receiver) Listen(ctx context.Context, handler Handler) *ListenerHandle | |||
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.Listen") | |||
defer span.End() | |||
|
|||
messages := make(chan *amqp.Message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no more channel to manage. we don't need an implicit buffer. prefetch messages are already buffered in go-amqp
|
||
return &ListenerHandle{ | ||
r: r, | ||
ctx: ctx, | ||
} | ||
} | ||
|
||
func (r *Receiver) handleMessages(ctx context.Context, messages chan *amqp.Message, handler Handler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need since we pass it down to the handler without the buffer channel
} | ||
receiver = r.receiver | ||
r.clientMu.RUnlock() | ||
msg, err := receiver.Receive(ctx) | ||
err := receiver.HandleMessage(ctx, func(message *amqp.Message) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call HandleMessage instead of Receive which we deprecate
I would prefer this PR to merge in go-amqp : Azure/go-amqp#23 |
89edcc8
to
9f2d739
Compare
Using this version instead of More info here: google/go-cloud#2980 |
@Segflow are you calling a disposition action after processing the message? This is a requirement per the AMQP protocol. Earlier versions didn't honor this and caused the problems described in Azure/go-amqp#20 |
Later the user decides if they want to Ack or Nack the message. |
this was the implementation before, and it did not allow to fulfil flow control in AMQP, which needs to communicate back to the upstream peer. |
Make use of the new HandleMessage api in go-amqp
This bumps the version of go-amqp to include the following bug fixes:
fixes #189