Skip to content

Commit

Permalink
Recreate session if transaction is aborted (#99)
Browse files Browse the repository at this point in the history
* Recreate session if transaction is aborted.

* Change session MaxOpened to 10 since integration_test requires more than a single session.
  • Loading branch information
yfuruyama authored Dec 18, 2020
1 parent 051eb4b commit 3dbf87d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 25 deletions.
20 changes: 10 additions & 10 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/codes"
"io"
"os"
"regexp"
Expand Down Expand Up @@ -68,14 +69,6 @@ type command struct {
Vertical bool
}

var defaultClientConfig = spanner.ClientConfig{
NumChannels: 1,
SessionPoolConfig: spanner.SessionPoolConfig{
MaxOpened: 1,
MinOpened: 1,
},
}

func NewCli(projectId, instanceId, databaseId string, prompt string, credential []byte, inStream io.ReadCloser, outStream io.Writer, errStream io.Writer, verbose bool) (*Cli, error) {
ctx := context.Background()
session, err := createSession(ctx, projectId, instanceId, databaseId, credential)
Expand Down Expand Up @@ -184,6 +177,13 @@ func (c *Cli) RunInteractive() int {
elapsed := time.Since(t0).Seconds()
stop()
if err != nil {
if spanner.ErrCode(err) == codes.Aborted {
// Once the transaction is aborted, the underlying session gains higher lock priority for the next transaction.
// This makes the result of subsequent transaction in spanner-cli inconsistent, so we recreate the client to replace
// the Cloud Spanner's session with new one to revert the lock priority of the session.
// See: https://cloud.google.com/spanner/docs/reference/rest/v1/TransactionOptions#retrying-aborted-transactions
c.Session.RecreateClient()
}
c.PrintInteractiveError(err)
continue
}
Expand Down Expand Up @@ -293,9 +293,9 @@ func (c *Cli) getInterpolatedPrompt() string {
func createSession(ctx context.Context, projectId string, instanceId string, databaseId string, credential []byte) (*Session, error) {
if credential != nil {
credentialOption := option.WithCredentialsJSON(credential)
return NewSession(ctx, projectId, instanceId, databaseId, defaultClientConfig, credentialOption)
return NewSession(ctx, projectId, instanceId, databaseId, credentialOption)
} else {
return NewSession(ctx, projectId, instanceId, databaseId, defaultClientConfig)
return NewSession(ctx, projectId, instanceId, databaseId)
}
}

Expand Down
4 changes: 1 addition & 3 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ func setup(t *testing.T, ctx context.Context, dmls []string) (*Session, string,
if testCredential != "" {
options = append(options, option.WithCredentialsJSON([]byte(testCredential)))
}
session, err := NewSession(ctx, testProjectId, testInstanceId, testDatabaseId, spanner.ClientConfig{
SessionPoolConfig: spanner.SessionPoolConfig{WriteSessions: 0.2},
}, options...)
session, err := NewSession(ctx, testProjectId, testInstanceId, testDatabaseId, options...)
if err != nil {
t.Fatalf("failed to create test session: err=%s", err)
}
Expand Down
39 changes: 27 additions & 12 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ package main
import (
"context"
"fmt"
"os"
"time"

"cloud.google.com/go/spanner"
adminapi "cloud.google.com/go/spanner/admin/database/apiv1"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

Expand All @@ -34,13 +32,25 @@ type txnFinishResult struct {
Err error
}

var clientConfig = spanner.ClientConfig{
SessionPoolConfig: spanner.SessionPoolConfig{
MinOpened: 1,
MaxOpened: 10, // FIXME: integration_test requires more than a single session
},
}

var defaultClientOpts = []option.ClientOption{
option.WithGRPCConnectionPool(1),
}

type Session struct {
ctx context.Context
projectId string
instanceId string
databaseId string
client *spanner.Client
adminClient *adminapi.DatabaseAdminClient
clientOpts []option.ClientOption

// for read-write transaction
rwTxn *spanner.ReadWriteStmtBasedTransaction
Expand All @@ -50,22 +60,15 @@ type Session struct {
roTxn *spanner.ReadOnlyTransaction
}

func NewSession(ctx context.Context, projectId string, instanceId string, databaseId string, clientConfig spanner.ClientConfig, opts ...option.ClientOption) (*Session, error) {
func NewSession(ctx context.Context, projectId string, instanceId string, databaseId string, opts ...option.ClientOption) (*Session, error) {
dbPath := fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, instanceId, databaseId)
opts = append(opts, defaultClientOpts...)

client, err := spanner.NewClientWithConfig(ctx, dbPath, clientConfig, opts...)
if err != nil {
return nil, err
}

if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
emulatorOpts := []option.ClientOption{
option.WithEndpoint(emulatorAddr),
option.WithGRPCDialOption(grpc.WithInsecure()),
option.WithoutAuthentication(),
}
opts = append(opts, emulatorOpts...)
}

adminClient, err := adminapi.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return nil, err
Expand All @@ -77,6 +80,7 @@ func NewSession(ctx context.Context, projectId string, instanceId string, databa
instanceId: instanceId,
databaseId: databaseId,
client: client,
clientOpts: opts,
adminClient: adminClient,
}, nil
}
Expand Down Expand Up @@ -141,6 +145,17 @@ func (s *Session) DatabaseExists() (bool, error) {
}
}

// RecreateClient closes the current client and creates a new client for the session.
func (s *Session) RecreateClient() error {
c, err := spanner.NewClientWithConfig(s.ctx, s.DatabasePath(), clientConfig, s.clientOpts...)
if err != nil {
return err
}
s.client.Close()
s.client = c
return nil
}

// StartHeartbeat starts heartbeat for read-write transaction.
//
// If no reads or DMLs happen within 10 seconds, the rw-transaction is considered idle at Cloud Spanner server.
Expand Down

0 comments on commit 3dbf87d

Please sign in to comment.