Skip to content

Commit

Permalink
feat(pubsub): add client ID to initial streaming pull request (#10436)
Browse files Browse the repository at this point in the history
* feat(pubsub): add randomly generated UUID to outgoing initial streaming pull requests

* run go mod tidy

* fix issue with subscription name argument
  • Loading branch information
hongalex authored Jun 26, 2024
1 parent b76b4ee commit a3d70ed
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 5 deletions.
1 change: 1 addition & 0 deletions pubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
cloud.google.com/go/iam v1.1.8
cloud.google.com/go/kms v1.17.1
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.12.5
go.einride.tech/aip v0.67.1
go.opencensus.io v0.24.0
Expand Down
1 change: 1 addition & 0 deletions pubsub/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.12.5 h1:8gw9KZK8TiVKB6q3zHY3SBzLnrGp6HQjyfYBYGmXdxA=
Expand Down
2 changes: 1 addition & 1 deletion pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
maxMessages = 0
maxBytes = 0
}
ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes, po.maxExtensionPeriod)
ps = newPullStream(context.Background(), subc.StreamingPull, subName, po.clientID, maxMessages, maxBytes, po.maxExtensionPeriod)
}
// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
// the first keepAlive halfway towards the minimum ack deadline.
Expand Down
3 changes: 2 additions & 1 deletion pubsub/pullstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type pullStream struct {
// for testing
type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error)

func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream {
func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName, clientID string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream {
ctx = withSubscriptionKey(ctx, subName)
hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(subName))}
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
Expand All @@ -62,6 +62,7 @@ func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName
}
err = spc.Send(&pb.StreamingPullRequest{
Subscription: subName,
ClientId: clientID,
StreamAckDeadlineSeconds: streamAckDeadline,
MaxOutstandingMessages: int64(maxOutstandingMessages),
MaxOutstandingBytes: int64(maxOutstandingBytes),
Expand Down
2 changes: 1 addition & 1 deletion pubsub/pullstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestPullStreamGet(t *testing.T) {
test.errors = test.errors[1:]
return &testStreamingPullClient{sendError: err}, nil
}
ps := newPullStream(context.Background(), streamingPull, "", 100, 1000, 0)
ps := newPullStream(context.Background(), streamingPull, "", "", 100, 1000, 0)
_, err := ps.get(nil)
if got := status.Code(err); got != test.wantCode {
t.Errorf("%s: got %s, want %s", test.desc, got, test.wantCode)
Expand Down
13 changes: 11 additions & 2 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"cloud.google.com/go/internal/optional"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/internal/scheduler"
"github.com/google/uuid"
gax "github.com/googleapis/gax-go/v2"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
Expand All @@ -50,6 +51,11 @@ type Subscription struct {

mu sync.Mutex
receiveActive bool

// clientID to be used across all streaming pull connections that are created.
// This indicates to the server that any guarantees made for a stream that
// disconnected will be made for the stream that is created to replace it.
clientID string
}

// Subscription creates a reference to a subscription.
Expand All @@ -60,8 +66,9 @@ func (c *Client) Subscription(id string) *Subscription {
// SubscriptionInProject creates a reference to a subscription in a given project.
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
return &Subscription{
c: c,
name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
c: c,
name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
clientID: uuid.NewString(),
}
}

Expand Down Expand Up @@ -1280,6 +1287,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
maxOutstandingMessages: maxCount,
maxOutstandingBytes: maxBytes,
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
clientID: s.clientID,
}
fc := newSubscriptionFlowController(FlowControlSettings{
MaxOutstandingMessages: maxCount,
Expand Down Expand Up @@ -1446,4 +1454,5 @@ type pullOptions struct {
maxOutstandingMessages int
maxOutstandingBytes int
useLegacyFlowControl bool
clientID string
}

0 comments on commit a3d70ed

Please sign in to comment.