Skip to content

Commit

Permalink
feat(pubsub): make lease management RPCs concurrent (#10238)
Browse files Browse the repository at this point in the history
* feat(pubsub): batch receipt modacks

* make ack/modack calls concurrent

* fix streaming pull test

* add waitgroups to ack/modack

* revert change to streaming pull retry test

* remove SplitRequestIDs, add test for makeBatches

* fix comment for makeBatches

* reduce duplicate code in sendack/modack func
  • Loading branch information
hongalex authored Jun 18, 2024
1 parent ba9711f commit 426a8c2
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 86 deletions.
139 changes: 69 additions & 70 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,46 +533,63 @@ func (it *messageIterator) handleKeepAlives() {
it.checkDrained()
}

// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is
// enabled, we'll retry these messages for a short duration in a goroutine.
func (it *messageIterator) sendAck(m map[string]*AckResult) {
type ackFunc = func(ctx context.Context, subName string, ackIds []string) error
type ackRecordStat = func(ctx context.Context, toSend []string)
type retryAckFunc = func(toRetry map[string]*ipubsub.AckResult)

func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) {
ackIDs := make([]string, 0, len(m))
for k := range m {
ackIDs = append(ackIDs, k)
}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
batches := makeBatches(ackIDs, ackIDBatchSize)
wg := sync.WaitGroup{}

for _, batch := range batches {
wg.Add(1)
go func(toSend []string) {
defer wg.Done()
ackRecordStat(it.ctx, toSend)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := ackFunc(cctx, it.subName, toSend)
if exactlyOnceDelivery {
resultsByAckID := make(map[string]*AckResult)
for _, ackID := range toSend {
resultsByAckID[ackID] = m[ackID]
}

var toSend []string
for len(ackIDs) > 0 {
toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)
st, md := extractMetadata(err)
_, toRetry := processResults(st, resultsByAckID, md)
if len(toRetry) > 0 {
// Retry modacks/nacks in a separate goroutine.
go func() {
retryAckFunc(toRetry)
}()
}
}
}(batch)
}
wg.Wait()
}

recordStat(it.ctx, AckCount, int64(len(toSend)))
addAcks(toSend)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is
// enabled, we'll retry these messages for a short duration in a goroutine.
func (it *messageIterator) sendAck(m map[string]*AckResult) {
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error {
return it.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{
Subscription: it.subName,
AckIds: toSend,
AckIds: ackIds,
})
if exactlyOnceDelivery {
resultsByAckID := make(map[string]*AckResult)
for _, ackID := range toSend {
resultsByAckID[ackID] = m[ackID]
}
st, md := extractMetadata(err)
_, toRetry := processResults(st, resultsByAckID, md)
if len(toRetry) > 0 {
// Retry acks in a separate goroutine.
go func() {
it.retryAcks(toRetry)
}()
}
}
}
}, it.retryAcks, func(ctx context.Context, toSend []string) {
recordStat(it.ctx, AckCount, int64(len(toSend)))
addAcks(toSend)
})
}

// sendModAck is used to extend the lease of messages or nack them.
Expand All @@ -583,47 +600,22 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
// enabled, we retry it in a separate goroutine for a short duration.
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
deadlineSec := int32(deadline / time.Second)
ackIDs := make([]string, 0, len(m))
for k := range m {
ackIDs = append(ackIDs, k)
}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
var toSend []string
for len(ackIDs) > 0 {
toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error {
return it.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
Subscription: it.subName,
AckDeadlineSeconds: deadlineSec,
AckIds: ackIds,
})
}, func(toRetry map[string]*ipubsub.AckResult) {
it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
}, func(ctx context.Context, toSend []string) {
if deadline == 0 {
recordStat(it.ctx, NackCount, int64(len(toSend)))
} else {
recordStat(it.ctx, ModAckCount, int64(len(toSend)))
}
addModAcks(toSend, deadlineSec)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{
Subscription: it.subName,
AckDeadlineSeconds: deadlineSec,
AckIds: toSend,
})
if exactlyOnceDelivery {
resultsByAckID := make(map[string]*AckResult)
for _, ackID := range toSend {
resultsByAckID[ackID] = m[ackID]
}

st, md := extractMetadata(err)
_, toRetry := processResults(st, resultsByAckID, md)
if len(toRetry) > 0 {
// Retry modacks/nacks in a separate goroutine.
go func() {
it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
}()
}
}
}
})
}

// retryAcks retries the ack RPC with backoff. This must be called in a goroutine
Expand Down Expand Up @@ -751,13 +743,20 @@ func calcFieldSizeInt(fields ...int) int {
return overhead
}

// splitRequestIDs takes a slice of ackIDs and returns two slices such that the first
// ackID slice can be used in a request where the payload does not exceed ackIDBatchSize.
func splitRequestIDs(ids []string, maxBatchSize int) (prefix, remainder []string) {
if len(ids) < maxBatchSize {
return ids, []string{}
// makeBatches takes a slice of ackIDs and returns a slice of ackID batches.
// Each ackID batch can be used in a request where the payload does not exceed maxBatchSize.
func makeBatches(ids []string, maxBatchSize int) [][]string {
var batches [][]string
for len(ids) > 0 {
if len(ids) < maxBatchSize {
batches = append(batches, ids)
ids = []string{}
} else {
batches = append(batches, ids[:maxBatchSize])
ids = ids[maxBatchSize:]
}
}
return ids[:maxBatchSize], ids[maxBatchSize:]
return batches
}

// The deadline to ack is derived from a percentile distribution based
Expand Down
29 changes: 13 additions & 16 deletions pubsub/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,22 @@ var (
fullyQualifiedSubName = fmt.Sprintf("projects/%s/subscriptions/%s", projName, subName)
)

func TestSplitRequestIDs(t *testing.T) {
func TestMakeBatches(t *testing.T) {
t.Parallel()
ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}
for _, test := range []struct {
ids []string
splitIndex int
ids := []string{"a", "b", "c", "d", "e"}
for i, test := range []struct {
ids []string
want [][]string
}{
{[]string{}, 0}, // empty slice, no split
{ids, 2}, // slice of size 5, split at index 2
{ids[:2], 2}, // slice of size 3, split at index 2
{ids[:1], 1}, // slice of size 1, split at index 1
{[]string{}, [][]string{}}, // empty slice
{ids, [][]string{{"a", "b"}, {"c", "d"}, {"e"}}}, // slice of size 5
{ids[:3], [][]string{{"a", "b"}, {"c"}}}, // slice of size 3
{ids[:1], [][]string{{"a"}}}, // slice of size 1
} {
got1, got2 := splitRequestIDs(test.ids, 2)
want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:]
if !testutil.Equal(len(got1), len(want1)) {
t.Errorf("%v, 1: got %v, want %v", test, got1, want1)
}
if !testutil.Equal(len(got2), len(want2)) {
t.Errorf("%v, 2: got %v, want %v", test, got2, want2)
got := makeBatches(test.ids, 2)
want := test.want
if !testutil.Equal(len(got), len(want)) {
t.Errorf("test %d: %v, got %v, want %v", i, test, got, want)
}
}
}
Expand Down

0 comments on commit 426a8c2

Please sign in to comment.