Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: emit amqp Flow on message handling completion #20

Merged
merged 11 commits into from
Feb 12, 2021

Conversation

serbrech
Copy link
Member

@serbrech serbrech commented Feb 10, 2021

see Azure/azure-service-bus-go#189 for details of the problem this fixes.

To fix the flow control, it is necessary to be aware of when a message is terminally handled by the consumer. We should not emit a Flow frame before the message has been disposed.

Implementation

debug logs

Lots of debug logs were added to the code. I would like to keep them in, although they should maybe be trimmed/reviewed

receiver's pendingMessages map[uint32]struct{}

The receiver was previously looking at the receiver's messages channel to decide whether a Flow frame should be emitted.
This channel is drained by the consumer as soon as the message start being handled.
The messages are pulled from the channel to be passed to the downstream handler. While the handler processes the messages, the messages channel is empty, which causes the receiver to emit a flow frame right away and buffer new messages well before a receiver is ready.

this PR introduces a pendingMessages map to keep track of messages being handled.
the message DeliveryID is used to identify and track a message. It is added to the pending map right before the message is pushed into the receiver's messages channel.

go-amqp/client.go

Lines 875 to 876 in f1f2eda

pendingMessages map[uint32]struct{} // used to send completed messages to receiver
pendingMessagesLock sync.RWMutex // lock to protect concurrent access to pendingMessages

The map is accessed concurrently, so I provide add and delete funcs on the receiver to always respect the locking sequence:

go-amqp/client.go

Lines 1029 to 1039 in f1f2eda

func (l *link) addPending(msg *Message) {
l.pendingMessagesLock.Lock()
l.pendingMessages[msg.deliveryID] = struct{}{}
l.pendingMessagesLock.Unlock()
}
func (l *link) deletePending(msg *Message) {
l.pendingMessagesLock.Lock()
delete(l.pendingMessages, msg.deliveryID)
l.pendingMessagesLock.Unlock()
}

Message doneSignal

We need to remove messages from the pending map after they are handled.
Since the downstream handler can be asynchronous, we cannot rely on code flow. We need the message itself to emit a signal.
I added a private doneSignal that can be triggered via msg.done() func. It gets triggered in each disposition funcs.

go-amqp/types.go

Lines 1765 to 1766 in f1f2eda

// doneSignal is a channel that indicate when a message is considered acted upon by downstream handler
doneSignal chan struct{}

go-amqp/types.go

Lines 1781 to 1787 in f1f2eda

// done closes the internal doneSignal channel to let the receiver know that this message has been acted upon
func (m *Message) done() {
// TODO: move initialization in ctor and use ctor everywhere?
if m.doneSignal != nil {
close(m.doneSignal)
}
}

go-amqp/types.go

Lines 1800 to 1806 in f1f2eda

func (m *Message) Accept(ctx context.Context) error {
defer m.done()
if !m.shouldSendDisposition() {
return nil
}
return m.receiver.messageDisposition(ctx, m.deliveryID, &stateAccepted{})
}

We use this signal to trigger a delete from the pending map, and tell the link to resume the flow:

go-amqp/client.go

Lines 1963 to 1964 in f1f2eda

<-msg.doneSignal
r.link.deletePending(msg)

HandleMessage()

HandleMessage takes in a func handler. it follows the same pattern as the Receive implementation to drain the buffer if the link is closed. it ensure the message signal isn't nil, and start a tracking goroutine that awaits the doneSignal.

The same behavior could be achieved with the Receive signature, since we use an asynchronous signal. However, the behavior would change for the users, and could cause the user's message processor to stall due to not terminating the message correctly.
The requirement for the consumer to explicitly send a disposition for each message becomes stricter to keep the Flow in sync.
This can certainly be mitigated but is not addressed in this PR.

The HandleMessage() signature also allows for middleware and to plug in functionality without impacting existing code, which is a nice added benefit.

Deprecate receiver Receive()

The receiver.Receive() func is marked as deprecated in favor of receiver.HandleMessage() func.
The intent is to keep the receiver.Receive() func behavior unchanged, while providing a safer API to control the flow of message accurately via the HandleMessage func.

Consideration

The current implementation of HandleMessage requires the user to call a disposition on the message.
if no disposition is called, the message will not signal to the receiver that it was handled, and it will stay in the receiver's tracking map.
As a result, the flow will stop once n messages are leaked in the tracking map, where n > linkCredit/2 This is reasonable in my opinion, provided that it is well documented, and discoverable.

I don't think we have a reliable way to ensure the message is removed from the pending map if the user fails to do it.
We can make it actionable though :

  • expose message.Ignore() which would only triggers the doneSignal. This allows downstream code to release the message in a handler, in a defer statement, or on errors, in case something goes wrong before they get a chance to invoke a disposition.

  • we can log spans than earn about likely message leaks

  • Consumers can be smarter about message TTL and peeklock timeouts, and clean up after themselves.

client.go Outdated Show resolved Hide resolved
client.go Outdated Show resolved Hide resolved
client.go Outdated Show resolved Hide resolved
client.go Outdated Show resolved Hide resolved
@@ -1264,7 +1289,9 @@ func (l *link) muxReceive(fr performTransfer) error {
// discard message if it's been aborted
if fr.Aborted {
l.buf.reset()
l.msg = Message{}
l.msg = Message{
doneSignal: make(chan struct{}),
Copy link
Member Author

@serbrech serbrech Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init the signal channel to avoid nil. Probably unecessary, since we take care of it in the Receive()/HandleMessage()

client.go Outdated
return nil
}

func uuidFromLockTokenBytes(bytes []byte) (*uuid.UUID, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

brought this in for better logging. might be better in a different file, but none really stood out :)

debug(3, "Receive() unpause link on completion")
default:
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allow link to check for flow each time we complete a message.

msg.receiver = r
if msg.doneSignal == nil {
msg.doneSignal = make(chan struct{})
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init doneSignal, it's used only once by getting closed.

if msg.doneSignal == nil {
msg.doneSignal = make(chan struct{})
}
go trackCompletion(msg)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block on doneSignal in a separate goroutine

// tracks messages until exiting handler
if err := handle(msg); err != nil {
debug(3, "Receive() blocking %d - error: %s", msg.deliveryID, err.Error())
return err
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call msg.done() here?
I think it's not necessary since a handler error closes the connection and stops the listener entirely.

client.go Outdated
// we remove the message from pending as soon as it's popped off the channel
// This makes the pending count the same as messages buffer count
// and keeps the behavior the same as before the pending messages tracking was introduced
defer r.link.deletePending(&msg)
Copy link
Member Author

@serbrech serbrech Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defering the deletePending call effectively makes the Receive() func behave almost the same as before. it removes the message from the map as soon as it's given to the caller

types.go Show resolved Hide resolved
types.go Show resolved Hide resolved
@serbrech
Copy link
Member Author

local integration tests logs with no concurrency and no linkcredit = 1 (no prefetch) :

17:15:35.900990 RX: Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "C\xeaŜvo\bB\x86\x9bv^%\xe1\x91\xf7", MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: true, Payload [size]: 393}
17:15:35.901069 9cc5ea43-6f76-4208-869b-765e25e191f7 before push to receiver - deliveryCount : 0 - linkCredit: 1, len(messages): 0, len(inflight): 0
17:15:35.901089 9cc5ea43-6f76-4208-869b-765e25e191f7 after push to receiver - deliveryCount : 0 - linkCredit: 1, len(messages): 0, len(inflight): 0
17:15:35.901095 9cc5ea43-6f76-4208-869b-765e25e191f7 before exit - deliveryCount : 1 - linkCredit: 0, len(messages): 0
17:15:35.901102 PAUSE Link Mux pause: inflight: 0, credit: 0, deliveryCount: 1, messages: 0, pending: 1, maxCredit : 1, settleMode: second
17:15:35.901111 Receive() blocking 0
17:15:35.901406 Entering link yYFKKRUEZMN6iOiuNYfXc3J8sKt5RXfKzw9PPJmZ49Z7eCeMBlZOJg Receive()
[2021-02-10T17:15:35+11:00] handling message ID 2faa5ec1-5a48-4b1d-b3ad-01a05ed7f512 - DeliveryTag : 9cc5ea43-6f76-4208-869b-765e25e191f7
[2021-02-10T17:15:55+11:00] trying to complete ID: 2faa5ec1-5a48-4b1d-b3ad-01a05ed7f512 - Tag: 9cc5ea43-6f76-4208-869b-765e25e191f7 !
17:15:55.906006 RX: add %!s(uint32=0) to inflight
17:15:55.906056 TX: Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: false, State: Accepted, Batchable: false}
17:15:56.217774 RX(Session): Disposition{Role: Sender, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}
17:15:56.217813 RX: Disposition{Role: Sender, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}
17:15:56.217827 PAUSE Link Mux pause: inflight: 0, credit: 0, deliveryCount: 1, messages: 0, pending: 1, maxCredit : 1, settleMode: second
17:15:56.217898 Receive() deleted pending 0
17:15:56.217922 Receive() unpause link on completion
17:15:56.217940 FLOW Link Mux half: source: conn-issue189tag1ckhg/Subscriptions/issue189, inflight: 0, credit: 0, deliveryCount: 1, messages: 0, pending: 0, maxCredit : 1, settleMode: second
    issue_test.go:143: successfully processed msg 1
17:15:56.217950 link.muxFlow(): len(l.messages):0 - linkCredit: 0 - deliveryCount: 1, inFlight: 0
17:15:56.218006 TX: Flow{NextIncomingID: <nil>, IncomingWindow: 0, NextOutgoingID: 0, OutgoingWindow: 0, Handle: 0, DeliveryCount: 1, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
17:15:56.218039 TX(Session) - tx: Flow{NextIncomingID: 2, IncomingWindow: 100, NextOutgoingID: 0, OutgoingWindow: 100, Handle: 0, DeliveryCount: 1, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
17:15:56.395722 RX(Session): Transfer{Handle: 0, DeliveryID: 1, DeliveryTag: "/\x12\x9c\xe3\x18\x88\x9bM\xad\xe8\xec0;Cv\xe1", MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: true, Payload [size]: 393}
17:15:56.395743 TX: adding handle to handlesByRemoteDeliveryID. linkCredit: 1
17:15:56.395755 RX: Transfer{Handle: 0, DeliveryID: 1, DeliveryTag: "/\x12\x9c\xe3\x18\x88\x9bM\xad\xe8\xec0;Cv\xe1", MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: true, Payload [size]: 393}
17:15:56.395786 e39c122f-8818-4d9b-ade8-ec303b4376e1 before push to receiver - deliveryCount : 1 - linkCredit: 1, len(messages): 0, len(inflight): 0
17:15:56.395800 e39c122f-8818-4d9b-ade8-ec303b4376e1 after push to receiver - deliveryCount : 1 - linkCredit: 1, len(messages): 0, len(inflight): 0
17:15:56.395806 e39c122f-8818-4d9b-ade8-ec303b4376e1 before exit - deliveryCount : 2 - linkCredit: 0, len(messages): 0
17:15:56.395811 PAUSE Link Mux pause: inflight: 0, credit: 0, deliveryCount: 2, messages: 0, pending: 1, maxCredit : 1, settleMode: second
17:15:56.395818 Receive() blocking 1
17:15:56.395938 Entering link yYFKKRUEZMN6iOiuNYfXc3J8sKt5RXfKzw9PPJmZ49Z7eCeMBlZOJg Receive()
[2021-02-10T17:15:56+11:00] handling message ID 5a0e3016-b68f-40e3-b95c-5f00748b67a4 - DeliveryTag : e39c122f-8818-4d9b-ade8-ec303b4376e1

@serbrech serbrech changed the title fix: base amqp Flow on message handling completion fix: emit amqp Flow on message handling completion Feb 10, 2021
client.go Outdated Show resolved Hide resolved
client.go Outdated Show resolved Hide resolved
@serbrech
Copy link
Member Author

For future reference,
Here are some links to the relevant .net client implementation for the same behavior :

this is the Receive from azure servicebus.
if the message is settled on delivery, then it calls the link.Dispose right away.
if it's mode second (wait for ack to settle a message) then this is not called :
https://github.com/Azure/azure-sdk-for-net/blob/f222afc71c4416af52693f6bfcb5c33edf1a29e8/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs#L313-L316

in the Azure AMQP library, here is the implementation of the link.Dispose :

https://github.com/Azure/azure-amqp/blob/e5cc469ec333904d18f1ec7cbc950a4cf463b6f1/src/AmqpLink.cs#L928-L972

You can also see just below that OnSettled is where the link credit is incremented. The behavior is the same in go-amqp, but the link credit includes the unsettledMessages map length in the calculation instead.
which makes me thing, that unsettledMessages is a much better name :). naming is hard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants