diff --git a/pubsub/doc.go b/pubsub/doc.go index 61840aed8329..6107d7a6474a 100644 --- a/pubsub/doc.go +++ b/pubsub/doc.go @@ -64,7 +64,7 @@ Messages are then consumed from a subscription via callback. log.Printf("Got message: %s", m.Data) m.Ack() }) - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { // Handle error. } diff --git a/pubsub/example_test.go b/pubsub/example_test.go index 07567521582a..7015f454a5cd 100644 --- a/pubsub/example_test.go +++ b/pubsub/example_test.go @@ -16,6 +16,7 @@ package pubsub_test import ( "context" + "errors" "fmt" "time" @@ -303,7 +304,7 @@ func ExampleSubscription_Receive() { // NOTE: May be called concurrently; synchronize access to shared memory. m.Ack() }) - if err != nil && err != context.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { // TODO: Handle error. } } @@ -324,7 +325,7 @@ func ExampleSubscription_Receive_maxExtension() { // TODO: Handle message. m.Ack() }) - if err != nil && err != context.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { // TODO: Handle error. } } @@ -345,7 +346,7 @@ func ExampleSubscription_Receive_maxOutstanding() { // TODO: Handle message. m.Ack() }) - if err != nil && err != context.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { // TODO: Handle error. } } diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go index 459663adb62a..9dae6b6f0825 100644 --- a/pubsub/flow_controller_test.go +++ b/pubsub/flow_controller_test.go @@ -43,7 +43,7 @@ func TestFlowControllerCancel(t *testing.T) { // Experiment: a context that times out should always return an error. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) defer cancel() - if err := fc.acquire(ctx, 6); err != context.DeadlineExceeded { + if err := fc.acquire(ctx, 6); !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("got %v, expected DeadlineExceeded", err) } // Control: a context that is not done should always return nil. @@ -78,7 +78,7 @@ func TestFlowControllerNoStarve(t *testing.T) { go func() { for { if err := fc.acquire(ctx, 1); err != nil { - if err != context.Canceled { + if !errors.Is(err, context.Canceled) { t.Error(err) } return diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 6c830553f308..554b0a63a8e3 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1200,10 +1200,8 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) { } received <- string(msg.Data) - }); err != nil { - if c := status.Code(err); c != codes.Canceled { - t.Error(err) - } + }); err != nil && !errors.Is(err, context.Canceled) { + t.Error(err) } } diff --git a/pubsub/internal/longtest/endtoend_test.go b/pubsub/internal/longtest/endtoend_test.go index b0a8d7e3531e..fdfc49c292d0 100644 --- a/pubsub/internal/longtest/endtoend_test.go +++ b/pubsub/internal/longtest/endtoend_test.go @@ -16,6 +16,7 @@ package longtest_test import ( "context" + "errors" "fmt" "log" "math/rand" @@ -29,8 +30,6 @@ import ( "cloud.google.com/go/pubsub" "google.golang.org/api/iterator" "google.golang.org/api/option" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) const ( @@ -303,7 +302,7 @@ func (c *consumer) consume(ctx context.Context, t *testing.T, sub *pubsub.Subscr prev := c.totalRecvd err := sub.Receive(ctx2, c.process) t.Logf("%s: end receive; read %d", id, c.totalRecvd-prev) - if serr, _ := status.FromError(err); err != nil && serr.Code() != codes.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { panic(err) } select { diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go index 2360d6345008..ddd1a442428d 100644 --- a/pubsub/iterator_test.go +++ b/pubsub/iterator_test.go @@ -261,10 +261,8 @@ func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg recvdWg.Done() } }) - if err != nil { - if status.Code(err) != codes.Canceled { - t.Error(err) - } + if err != nil && !errors.Is(err, context.Canceled) { + t.Error(err) } } @@ -397,7 +395,7 @@ func TestIterator_ModifyAckContextDeadline(t *testing.T) { err = s.Receive(cctx, func(ctx context.Context, m *Message) { m.Ack() }) - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("Got error in Receive: %v", err) } diff --git a/pubsub/pstest_test.go b/pubsub/pstest_test.go index 3114ef92af49..a9f16aa6bd44 100644 --- a/pubsub/pstest_test.go +++ b/pubsub/pstest_test.go @@ -16,6 +16,7 @@ package pubsub_test import ( "context" + "errors" "strconv" "sync" "testing" @@ -89,7 +90,7 @@ func TestPSTest(t *testing.T) { mu.Unlock() m.Ack() }) - if err != nil { - panic(err) + if err != nil && !errors.Is(err, context.Canceled) { + t.Fatal(err) } } diff --git a/pubsub/streaming_pull_test.go b/pubsub/streaming_pull_test.go index de832f1c7152..015e3186d1bf 100644 --- a/pubsub/streaming_pull_test.go +++ b/pubsub/streaming_pull_test.go @@ -20,6 +20,7 @@ package pubsub import ( "context" + "errors" "io" "strconv" "sync" @@ -78,7 +79,7 @@ func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer m.Nack() } }) - if c := status.Convert(err); err != nil && c.Code() != codes.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("Pull: %v", err) } gotMap := map[string]*Message{} diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index 6208b408daf8..e98f5c9f0f7d 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -318,7 +318,7 @@ func testReceive(t *testing.T, synchronous, exactlyOnceDelivery bool) { m.Ack() } }) - if c := status.Convert(err); err != nil && c.Code() != codes.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("Pull: %v", err) } var seen [256]bool @@ -607,7 +607,7 @@ func TestExactlyOnceDelivery_AckSuccess(t *testing.T) { } cancel() }) - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("s.Receive err: %v", err) } } @@ -654,7 +654,7 @@ func TestExactlyOnceDelivery_AckFailureErrorPermissionDenied(t *testing.T) { } cancel() }) - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("s.Receive err: %v", err) } } @@ -706,7 +706,7 @@ func TestExactlyOnceDelivery_AckRetryDeadlineExceeded(t *testing.T) { } cancel() }) - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("s.Receive err: %v", err) } } @@ -748,7 +748,7 @@ func TestExactlyOnceDelivery_NackSuccess(t *testing.T) { } cancel() }) - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("s.Receive err: %v", err) } } @@ -833,7 +833,7 @@ func TestSubscribeMessageExpirationFlowControl(t *testing.T) { if deliveryCount != 2 { t.Fatalf("expected 2 iterations of the callback, got %d", deliveryCount) } - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("s.Receive err: %v", err) } } diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 974e0c487d65..0cfd5f52ca03 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -254,7 +254,7 @@ func TestPublishTimeout(t *testing.T) { select { case <-r.Ready(): _, err = r.Get(ctx) - if err != context.DeadlineExceeded { + if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("got %v, want context.DeadlineExceeded", err) } case <-time.After(2 * topic.PublishSettings.Timeout):