Skip to content

Commit

Permalink
fix(pubsublite): fixes for background partition count updates (#4293)
Browse files Browse the repository at this point in the history
- When refreshing topic partition counts in the background for Publishers, retry slowly upon resource exhausted errors.
- partitionCountWatcher ignores errors after the first update.
  • Loading branch information
tmdiep authored Jun 22, 2021
1 parent 598f5b9 commit 634847b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 21 deletions.
7 changes: 6 additions & 1 deletion pubsublite/internal/wire/partition_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newPartitionCountWatcher(ctx context.Context, adminClient *vkit.AdminClient
adminClient: adminClient,
topicPath: topicPath,
receiver: receiver,
callOption: retryableReadOnlyCallOption(),
callOption: resourceExhaustedRetryer(),
}

// Polling the topic partition count can be disabled in settings if the period
Expand Down Expand Up @@ -100,6 +100,11 @@ func (p *partitionCountWatcher) updatePartitionCount() {
return p.partitionCount, nil
}
if err != nil {
if p.partitionCount > 0 {
// Ignore errors after the first update.
// TODO: Log the error.
return p.partitionCount, nil
}
err = fmt.Errorf("pubsublite: failed to update topic partition count: %v", err)
p.unsafeInitiateShutdown(err)
return 0, err
Expand Down
23 changes: 23 additions & 0 deletions pubsublite/internal/wire/partition_count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,26 @@ func TestPartitionCountWatcherPartitionCountUnchanged(t *testing.T) {
watcher.VerifyCounts([]int{wantPartitionCount1, wantPartitionCount2})
watcher.StopVerifyNoError()
}

func TestPartitionCountWatcherIgnoreUpdateErrors(t *testing.T) {
const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
wantPartitionCount := 4

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(wantPartitionCount), nil)
verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, status.Error(codes.FailedPrecondition, ""))

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

watcher := newTestPartitionCountWatcher(t, topic, testPublishSettings())
if gotErr := watcher.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
watcher.VerifyCounts([]int{wantPartitionCount}) // Initial count

// Although the next update is a permanent error, do not terminate.
watcher.UpdatePartitionCount()
watcher.VerifyCounts([]int{wantPartitionCount})
watcher.StopVerifyNoError()
}
10 changes: 6 additions & 4 deletions pubsublite/internal/wire/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,14 +1004,16 @@ func TestRoutingPublisherPartitionCountUpdateFails(t *testing.T) {
t.Run("Failed update", func(t *testing.T) {
pub.pub.partitionWatcher.updatePartitionCount()

// Failed background update terminates the routingPublisher.
if gotErr := pub.WaitStopped(); !test.ErrorHasMsg(gotErr, serverErr.Error()) {
t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr)
}
// Failed update ignored.
if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
t.Errorf("Num partition publishers: got %d, want %d", got, want)
}
})

pub.Stop()
if gotErr := pub.WaitStopped(); gotErr != nil {
t.Errorf("Stop() got err: (%v)", gotErr)
}
}

func TestNewPublisherValidatesSettings(t *testing.T) {
Expand Down
57 changes: 41 additions & 16 deletions pubsublite/internal/wire/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,49 @@ func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool {
return isEligible(s.Code())
}

// retryableReadOnlyCallOption returns a call option that retries with backoff
// for ResourceExhausted in addition to other default retryable codes for
// Pub/Sub. Suitable for read-only operations which are subject to only QPS
// Wraps an ordered list of retryers. Earlier retryers take precedence.
type compositeRetryer struct {
retryers []gax.Retryer
}

func (cr *compositeRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
for _, r := range cr.retryers {
pause, shouldRetry = r.Retry(err)
if shouldRetry {
return
}
}
return 0, false
}

// resourceExhaustedRetryer returns a call option that retries slowly with
// backoff for ResourceExhausted in addition to other default retryable codes
// for Pub/Sub. Suitable for read-only operations which are subject to only QPS
// quota limits.
func retryableReadOnlyCallOption() gax.CallOption {
func resourceExhaustedRetryer() gax.CallOption {
return gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Aborted,
codes.DeadlineExceeded,
codes.Internal,
codes.ResourceExhausted,
codes.Unavailable,
codes.Unknown,
}, gax.Backoff{
Initial: 100 * time.Millisecond,
Max: 60 * time.Second,
Multiplier: 1.3,
})
return &compositeRetryer{
retryers: []gax.Retryer{
gax.OnCodes([]codes.Code{
codes.ResourceExhausted,
}, gax.Backoff{
Initial: time.Second,
Max: 60 * time.Second,
Multiplier: 3,
}),
gax.OnCodes([]codes.Code{
codes.Aborted,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable,
codes.Unknown,
}, gax.Backoff{
Initial: 100 * time.Millisecond,
Max: 60 * time.Second,
Multiplier: 1.3,
}),
},
}
})
}

Expand Down

0 comments on commit 634847b

Please sign in to comment.