Skip to content

Commit

Permalink
avoid changing Receive behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
serbrech committed Feb 10, 2021
1 parent 5b92b35 commit f1f2eda
Showing 1 changed file with 12 additions and 15 deletions.
27 changes: 12 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,9 @@ type Receiver struct {

// HandleMessage takes in a func to handle the incoming message.
// Blocks until a message is received, ctx completes, or an error occurs.

// Note: You must take an action on the message in the provided handler (Accept/Reject/Release/Modify)
// or the pending message tracker will get out of sync, and reduce the flow.
func (r *Receiver) HandleMessage(ctx context.Context, handle func(*Message) error) error {
debug(3, "Entering link %s Receive()", r.link.key.name)

Expand Down Expand Up @@ -2002,6 +2005,7 @@ func (r *Receiver) HandleMessage(ctx context.Context, handle func(*Message) erro
}
}

// Deprecated: prefer HandleMessage
// Receive returns the next message from the sender.
//
// Blocks until a message is received, ctx completes, or an error occurs.
Expand All @@ -2013,24 +2017,14 @@ func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
}
}

trackCompletion := func(msg *Message) {
<-msg.doneSignal
r.link.deletePending(msg)
debug(3, "Receive() deleted pending %d", msg.deliveryID)
if atomic.LoadUint32(&r.link.paused) == 1 {
select {
case r.link.receiverReady <- struct{}{}:
debug(3, "Receive() unpause link on completion")
default:
}
}
}

// non-blocking receive to ensure buffered messages are
// delivered regardless of whether the link has been closed.
select {
case msg := <-r.link.messages:
go trackCompletion(&msg)
// we remove the message from pending as soon as it's popped off the channel
// This makes the pending count the same as messages buffer count
// and keeps the behavior the same as before the pending messages tracking was introduced
defer r.link.deletePending(&msg)
debug(3, "Receive() non blocking %d", msg.deliveryID)
msg.receiver = r
return &msg, nil
Expand All @@ -2042,7 +2036,10 @@ func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
// wait for the next message
select {
case msg := <-r.link.messages:
go trackCompletion(&msg)
// we remove the message from pending as soon as it's popped off the channel
// This makes the pending count the same as messages buffer count
// and keeps the behavior the same as before the pending messages tracking was introduced
defer r.link.deletePending(&msg)
debug(3, "Receive() blocking %d", msg.deliveryID)
msg.receiver = r
return &msg, nil
Expand Down

0 comments on commit f1f2eda

Please sign in to comment.