Skip to content

Commit

Permalink
Revert "spanner: Retry "Session not found" for read-only transactions"
Browse files Browse the repository at this point in the history
This reverts commit 5d17c75.

Reason for revert: breakings TestIntegration_SingleUse

Change-Id: Ib7fde8b667603a2aa15aedc96a257f1a018650e0
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/45190
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
  • Loading branch information
jeanbza committed Sep 10, 2019
1 parent 91af92f commit 090ee50
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 98 deletions.
43 changes: 20 additions & 23 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,14 @@ func TestClient_Single_InvalidArgument(t *testing.T) {
}

func testSingleQuery(t *testing.T, serverError error) error {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
if serverError != nil {
server.TestSpanner.SetError(serverError)
}
tx := client.Single()
return executeTestQuery(t, tx)
}

func executeTestQuery(t *testing.T, tx *ReadOnlyTransaction) error {
ctx := context.Background()
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
var rowCount int64
for {
row, err := iter.Next()
if err == iterator.Done {
Expand All @@ -139,10 +133,6 @@ func executeTestQuery(t *testing.T, tx *ReadOnlyTransaction) error {
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
rowCount++
}
if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
t.Fatalf("Row count mismatch\ngot: %v\nwant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
return nil
}
Expand Down Expand Up @@ -210,16 +200,6 @@ func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndInvalidArgument
}
}

func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction(t *testing.T) {
t.Parallel()
exec := map[string]SimulatedExecutionTime{
MethodBeginTransaction: {Errors: []error{gstatus.Error(codes.NotFound, "Session not found")}},
}
if err := testReadOnlyTransaction(t, exec); err != nil {
t.Fatal(err)
}
}

func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) error {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
Expand All @@ -228,7 +208,24 @@ func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedEx
}
tx := client.ReadOnlyTransaction()
defer tx.Close()
return executeTestQuery(t, tx)
ctx := context.Background()
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
var singerID, albumID int64
var albumTitle string
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
}
return nil
}

func TestClient_ReadWriteTransaction(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions spanner/internal/testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ const (
MethodGetSession string = "GET_SESSION"
MethodExecuteSql string = "EXECUTE_SQL"
MethodExecuteStreamingSql string = "EXECUTE_STREAMING_SQL"
MethodStreamingRead string = "EXECUTE_STREAMING_READ"
)

// StatementResult represents a mocked result on the test server. Th result can
Expand Down Expand Up @@ -704,9 +703,13 @@ func (s *inMemSpannerServer) Read(ctx context.Context, req *spannerpb.ReadReques
}

func (s *inMemSpannerServer) StreamingRead(req *spannerpb.ReadRequest, stream spannerpb.Spanner_StreamingReadServer) error {
if err := s.simulateExecutionTime(MethodStreamingRead, req); err != nil {
return err
s.mu.Lock()
if s.stopped {
s.mu.Unlock()
return gstatus.Error(codes.Unavailable, "server has been stopped")
}
s.receivedRequests <- req
s.mu.Unlock()
return gstatus.Error(codes.Unimplemented, "Method not yet implemented")
}

Expand Down
45 changes: 11 additions & 34 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,41 +363,18 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
sh.recycle()
}
}()

// Create transaction options.
readOnlyOptions := buildTransactionOptionsReadOnly(t.getTimestampBound(), true)
transactionOptions := &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadOnly_{
ReadOnly: readOnlyOptions,
},
}
// Retry TakeSession and BeginTransaction on Session not found.
retryOnNotFound := gax.OnCodes([]codes.Code{codes.NotFound}, gax.Backoff{})
beginTxWithRetry := func(ctx context.Context) (*sppb.Transaction, error) {
for {
sh, err = t.sp.take(ctx)
if err != nil {
return nil, err
}
client := sh.getClient()
ctx := contextWithOutgoingMetadata(ctx, sh.getMetadata())
res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: transactionOptions,
})
if err == nil {
return res, nil
}
// We should not wait before retrying.
if _, shouldRetry := retryOnNotFound.Retry(err); !shouldRetry {
return nil, err
}
// Delete session and then retry with a new one.
sh.destroy()
}
sh, err = t.sp.take(ctx)
if err != nil {
return err
}

res, err := beginTxWithRetry(ctx)
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadOnly_{
ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
},
},
})
if err == nil {
tx = res.Id
if res.ReadTimestamp != nil {
Expand Down
58 changes: 20 additions & 38 deletions spanner/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,64 +195,46 @@ func TestApply_RetryOnAbort(t *testing.T) {
}
}

// Tests that NotFound errors cause failures, and aren't retried, except for
// BeginTransaction.
// Tests that NotFound errors cause failures, and aren't retried.
func TestTransaction_NotFound(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()

errSessionNotFound := spannerErrorf(codes.NotFound, "Session not found")
// BeginTransaction should retry automatically.
wantErr := spannerErrorf(codes.NotFound, "Session not found")
server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
SimulatedExecutionTime{
Errors: []error{errSessionNotFound},
Errors: []error{wantErr, wantErr, wantErr},
})
txn := client.ReadOnlyTransaction()
if _, _, got := txn.acquire(ctx); got != nil {
t.Fatalf("Expect acquire to succeed, got %v, want nil.", got)
}
txn.Close()

// Query should fail with Session not found.
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
Errors: []error{errSessionNotFound},
Errors: []error{wantErr, wantErr, wantErr},
})
txn = client.ReadOnlyTransaction()
iter := txn.Query(ctx, NewStatement("SELECT 1"))
_, got := iter.Next()
if !testEqual(errSessionNotFound, got) {
t.Fatalf("Expect Query to fail\ngot: %v\nwant: %v", got, errSessionNotFound)

txn := client.ReadOnlyTransaction()
defer txn.Close()

if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) {
t.Fatalf("Expect acquire to fail, got %v, want %v.", got, wantErr)
}
iter.Stop()

// Read should fail with Session not found.
server.TestSpanner.PutExecutionTime(MethodStreamingRead,
SimulatedExecutionTime{
Errors: []error{errSessionNotFound},
})
txn = client.ReadOnlyTransaction()
iter = txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"})
_, got = iter.Next()
if !testEqual(errSessionNotFound, got) {
t.Fatalf("Expect Read to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
// The failure should recycle the session, we expect it to be used in
// following requests.
if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) {
t.Fatalf("Expect Query to fail, got %v, want %v.", got.err, wantErr)
}
iter.Stop()

// Commit should fail with Session not found.
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
Errors: []error{errSessionNotFound},
})
if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) {
t.Fatalf("Expect Read to fail, got %v, want %v.", got.err, wantErr)
}

ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
if _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()); !testEqual(errSessionNotFound, got) {
t.Fatalf("Expect Apply to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
if _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()); !testEqual(wantErr, got) {
t.Fatalf("Expect Apply to fail, got %v, want %v.", got, wantErr)
}
}

Expand Down

0 comments on commit 090ee50

Please sign in to comment.