Skip to content

Commit

Permalink
test(pubsub): make error checking of receive less brittle (#10165)
Browse files Browse the repository at this point in the history
* test(pubsub): make error checking of receive less brittle

* use errors.Is for comparisons

* replace more instances
  • Loading branch information
hongalex authored May 14, 2024
1 parent fa3bfdb commit 7371665
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pubsub/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Messages are then consumed from a subscription via callback.
log.Printf("Got message: %s", m.Data)
m.Ack()
})
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
// Handle error.
}
Expand Down
7 changes: 4 additions & 3 deletions pubsub/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pubsub_test

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -303,7 +304,7 @@ func ExampleSubscription_Receive() {
// NOTE: May be called concurrently; synchronize access to shared memory.
m.Ack()
})
if err != nil && err != context.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
// TODO: Handle error.
}
}
Expand All @@ -324,7 +325,7 @@ func ExampleSubscription_Receive_maxExtension() {
// TODO: Handle message.
m.Ack()
})
if err != nil && err != context.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
// TODO: Handle error.
}
}
Expand All @@ -345,7 +346,7 @@ func ExampleSubscription_Receive_maxOutstanding() {
// TODO: Handle message.
m.Ack()
})
if err != nil && err != context.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
// TODO: Handle error.
}
}
Expand Down
4 changes: 2 additions & 2 deletions pubsub/flow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestFlowControllerCancel(t *testing.T) {
// Experiment: a context that times out should always return an error.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()
if err := fc.acquire(ctx, 6); err != context.DeadlineExceeded {
if err := fc.acquire(ctx, 6); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("got %v, expected DeadlineExceeded", err)
}
// Control: a context that is not done should always return nil.
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestFlowControllerNoStarve(t *testing.T) {
go func() {
for {
if err := fc.acquire(ctx, 1); err != nil {
if err != context.Canceled {
if !errors.Is(err, context.Canceled) {
t.Error(err)
}
return
Expand Down
6 changes: 2 additions & 4 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,10 +1200,8 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) {
}

received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
}
}); err != nil && !errors.Is(err, context.Canceled) {
t.Error(err)
}
}

Expand Down
5 changes: 2 additions & 3 deletions pubsub/internal/longtest/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package longtest_test

import (
"context"
"errors"
"fmt"
"log"
"math/rand"
Expand All @@ -29,8 +30,6 @@ import (
"cloud.google.com/go/pubsub"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -303,7 +302,7 @@ func (c *consumer) consume(ctx context.Context, t *testing.T, sub *pubsub.Subscr
prev := c.totalRecvd
err := sub.Receive(ctx2, c.process)
t.Logf("%s: end receive; read %d", id, c.totalRecvd-prev)
if serr, _ := status.FromError(err); err != nil && serr.Code() != codes.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
panic(err)
}
select {
Expand Down
8 changes: 3 additions & 5 deletions pubsub/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,8 @@ func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg
recvdWg.Done()
}
})
if err != nil {
if status.Code(err) != codes.Canceled {
t.Error(err)
}
if err != nil && !errors.Is(err, context.Canceled) {
t.Error(err)
}
}

Expand Down Expand Up @@ -397,7 +395,7 @@ func TestIterator_ModifyAckContextDeadline(t *testing.T) {
err = s.Receive(cctx, func(ctx context.Context, m *Message) {
m.Ack()
})
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("Got error in Receive: %v", err)
}

Expand Down
5 changes: 3 additions & 2 deletions pubsub/pstest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pubsub_test

import (
"context"
"errors"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -89,7 +90,7 @@ func TestPSTest(t *testing.T) {
mu.Unlock()
m.Ack()
})
if err != nil {
panic(err)
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatal(err)
}
}
3 changes: 2 additions & 1 deletion pubsub/streaming_pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pubsub

import (
"context"
"errors"
"io"
"strconv"
"sync"
Expand Down Expand Up @@ -78,7 +79,7 @@ func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer
m.Nack()
}
})
if c := status.Convert(err); err != nil && c.Code() != codes.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("Pull: %v", err)
}
gotMap := map[string]*Message{}
Expand Down
12 changes: 6 additions & 6 deletions pubsub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func testReceive(t *testing.T, synchronous, exactlyOnceDelivery bool) {
m.Ack()
}
})
if c := status.Convert(err); err != nil && c.Code() != codes.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("Pull: %v", err)
}
var seen [256]bool
Expand Down Expand Up @@ -607,7 +607,7 @@ func TestExactlyOnceDelivery_AckSuccess(t *testing.T) {
}
cancel()
})
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("s.Receive err: %v", err)
}
}
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestExactlyOnceDelivery_AckFailureErrorPermissionDenied(t *testing.T) {
}
cancel()
})
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("s.Receive err: %v", err)
}
}
Expand Down Expand Up @@ -706,7 +706,7 @@ func TestExactlyOnceDelivery_AckRetryDeadlineExceeded(t *testing.T) {
}
cancel()
})
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("s.Receive err: %v", err)
}
}
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestExactlyOnceDelivery_NackSuccess(t *testing.T) {
}
cancel()
})
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("s.Receive err: %v", err)
}
}
Expand Down Expand Up @@ -833,7 +833,7 @@ func TestSubscribeMessageExpirationFlowControl(t *testing.T) {
if deliveryCount != 2 {
t.Fatalf("expected 2 iterations of the callback, got %d", deliveryCount)
}
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("s.Receive err: %v", err)
}
}
2 changes: 1 addition & 1 deletion pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestPublishTimeout(t *testing.T) {
select {
case <-r.Ready():
_, err = r.Get(ctx)
if err != context.DeadlineExceeded {
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("got %v, want context.DeadlineExceeded", err)
}
case <-time.After(2 * topic.PublishSettings.Timeout):
Expand Down

0 comments on commit 7371665

Please sign in to comment.