Skip to content

Commit

Permalink
feat(pubsub): use Streaming Pull response for ordering check (#9682)
Browse files Browse the repository at this point in the history
* feat(pubsub): remove subscription config check with ordering

* remove old ordering check

* put back comment, save properties variable
  • Loading branch information
hongalex authored Apr 9, 2024
1 parent 53a1a75 commit 7bf4904
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 26 deletions.
3 changes: 0 additions & 3 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,9 +1434,6 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
msg.Ack()
atomic.AddInt32(&numAcked, 1)
})
if sub.enableOrdering != enableMessageOrdering {
t.Fatalf("enableOrdering mismatch: got: %v, want: %v", sub.enableOrdering, enableMessageOrdering)
}
// If the messages were received on a subscription with the EnableMessageOrdering=true,
// total processing would exceed the timeout and only one message would be processed.
if numAcked < 2 {
Expand Down
30 changes: 27 additions & 3 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ type messageIterator struct {
eoMu sync.RWMutex
enableExactlyOnceDelivery bool
sendNewAckDeadline bool

orderingMu sync.RWMutex
// enableOrdering determines if messages should be processed in order. This is populated
// by the response in StreamingPull and can change mid Receive. Must be accessed
// with the lock held.
enableOrdering bool
}

// newMessageIterator starts and returns a new messageIterator.
Expand Down Expand Up @@ -352,12 +358,30 @@ func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) {
if err != nil {
return nil, err
}
it.eoMu.Lock()
if got := res.GetSubscriptionProperties().GetExactlyOnceDeliveryEnabled(); got != it.enableExactlyOnceDelivery {

// If the new exactly once settings are different than the current settings, update it.
it.eoMu.RLock()
enableEOD := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()

subProp := res.GetSubscriptionProperties()
if got := subProp.GetExactlyOnceDeliveryEnabled(); got != enableEOD {
it.eoMu.Lock()
it.sendNewAckDeadline = true
it.enableExactlyOnceDelivery = got
it.eoMu.Unlock()
}

// Also update the subscriber's ordering setting if stale.
it.orderingMu.RLock()
enableOrdering := it.enableOrdering
it.orderingMu.RUnlock()

if got := subProp.GetMessageOrderingEnabled(); got != enableOrdering {
it.orderingMu.Lock()
it.enableOrdering = got
it.orderingMu.Unlock()
}
it.eoMu.Unlock()
return res.ReceivedMessages, nil
}

Expand Down
25 changes: 5 additions & 20 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ type Subscription struct {

mu sync.Mutex
receiveActive bool

enableOrdering bool
}

// Subscription creates a reference to a subscription.
Expand Down Expand Up @@ -1238,8 +1236,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
s.mu.Unlock()
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()

s.checkOrdering(ctx)

// TODO(hongalex): move settings check to a helper function to make it more testable
maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
Expand Down Expand Up @@ -1392,11 +1388,14 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
iter.eoMu.RUnlock()

wg.Add(1)
// Make sure the subscription has ordering enabled before adding to scheduler.
// Only schedule messages in order if an ordering key is present and the subscriber client
// received the ordering flag from a Streaming Pull response.
var key string
if s.enableOrdering {
iter.orderingMu.RLock()
if iter.enableOrdering {
key = msg.OrderingKey
}
iter.orderingMu.RUnlock()
msgLen := len(msg.Data)
if err := sched.Add(key, msg, func(msg interface{}) {
defer wg.Done()
Expand Down Expand Up @@ -1436,20 +1435,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
return group.Wait()
}

// checkOrdering calls Config to check theEnableMessageOrdering field.
// If this call fails (e.g. because the service account doesn't have
// the roles/viewer or roles/pubsub.viewer role) we will assume
// EnableMessageOrdering to be true.
// See: https://github.com/googleapis/google-cloud-go/issues/3884
func (s *Subscription) checkOrdering(ctx context.Context) {
cfg, err := s.Config(ctx)
if err != nil {
s.enableOrdering = true
} else {
s.enableOrdering = cfg.EnableMessageOrdering
}
}

type pullOptions struct {
maxExtension time.Duration // the maximum time to extend a message's ack deadline in total
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
Expand Down

0 comments on commit 7bf4904

Please sign in to comment.