Skip to content

Commit

Permalink
Make sure transactions only return connection once
Browse files Browse the repository at this point in the history
Before this fix, users could run into situation when a transaction
would execute their `onClosed` callback more than once, meaning
the referenced connection could be returned to the idle list of
its server's connection several times (i.e. the connection ref
could be duplicated and borrowed by more than 1 transaction at a
time, which breaks the fundamental invariant of connections).

This could occur when a transaction was committed or
rolled back* and then, users would run a query with the same transaction
again. This is obviously a misuse of the transaction API but the driver
could fail in a more graceful and more understandable manner.

When that subsequent run executed, a transaction ID mismatch error
would then be returned (since the connection was reset upon the
return to pool after commit/rollback, thus forgetting about its former
transaction ID).
The tx ID mismatch error would then lead to `onClosed` being called
a second time, and the same connection would end up listed a second time
in its server's tracked idle connection list.

One fix could be to make the pool return idempotent.
However, we felt that the better fix is to respect the invariant
according to which a connection is never returned more than once, thus
making the pool return idempotency irrelevant in practice.

*rollback also happens when closing the transaction or closing the
enclosing session and the transaction is the currently active workload
in that session

Fixes #487

Signed-off-by: Florent Biville <florent.biville@neo4j.com>
  • Loading branch information
robsdedude authored and fbiville committed May 17, 2023
1 parent 0eccfa3 commit 8c1268b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 23 deletions.
11 changes: 7 additions & 4 deletions neo4j/session_with_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ func (s *sessionWithContext) BeginTransaction(ctx context.Context, configurers .
return nil, errorutil.WrapError(err)
}

// Begin transaction
beginBookmarks, err := s.getBookmarks(ctx)
if err != nil {
_ = s.pool.Return(ctx, conn)
Expand Down Expand Up @@ -337,10 +336,14 @@ func (s *sessionWithContext) BeginTransaction(ctx context.Context, configurers .
fetchSize: s.fetchSize,
txHandle: txHandle,
onClosed: func(tx *explicitTransaction) {
// On transaction closed (rolled back or committed)
bookmarkErr := s.retrieveBookmarks(ctx, conn, beginBookmarks)
poolErr := s.pool.Return(ctx, conn)
if tx.conn == nil {
return
}
// On run failure, transaction closed (rolled back or committed)
bookmarkErr := s.retrieveBookmarks(ctx, tx.conn, beginBookmarks)
poolErr := s.pool.Return(ctx, tx.conn)
tx.err = errorutil.CombineAllErrors(tx.err, bookmarkErr, poolErr)
tx.conn = nil
s.explicitTx = nil
},
}
Expand Down
75 changes: 75 additions & 0 deletions neo4j/session_with_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ func TestSession(outer *testing.T) {
sess.Close(context.Background())
wg.Wait()
})

ct.Run("Cleans up router async", func(t *testing.T) {
router, _, sess := createSession()
wg := sync.WaitGroup{}
Expand All @@ -665,7 +666,81 @@ func TestSession(outer *testing.T) {
sess.Close(context.Background())
wg.Wait()
})

ct.Run("Does not put back connection twice to the pool", func(inner *testing.T) {
type testCase struct {
name string
completeTx func(context.Context, SessionWithContext, ExplicitTransaction) error
}
cases := []testCase{
{
name: "session close",
completeTx: func(ctx context.Context, session SessionWithContext, _ ExplicitTransaction) error {
return session.Close(ctx)
},
},
{
name: "tx commit",
completeTx: func(ctx context.Context, _ SessionWithContext, transaction ExplicitTransaction) error {
return transaction.Commit(ctx)
},
},
{
name: "tx rollback",
completeTx: func(ctx context.Context, _ SessionWithContext, transaction ExplicitTransaction) error {
return transaction.Rollback(ctx)
},
},
{
name: "tx close",
completeTx: func(ctx context.Context, _ SessionWithContext, transaction ExplicitTransaction) error {
return transaction.Close(ctx)
},
},
}

for _, test := range cases {
inner.Run(fmt.Sprintf("after %s", test.name), func(t *testing.T) {
_, pool, session := createSession()
conn := &ConnFake{Alive: true, RunTxErr: errors.New("invalid transaction handle")}
poolReturnsCalls := 0
pool.BorrowConn = conn
pool.ReturnHook = func() {
poolReturnsCalls++
}
tx, err := session.BeginTransaction(ctx)

AssertNoError(t, err)
AssertNoError(t, test.completeTx(ctx, session, tx))
AssertIntEqual(t, poolReturnsCalls, 1)
_, err = tx.Run(ctx, "RETURN 42", nil)
AssertErrorMessageContains(t, err, "cannot use this transaction")
AssertIntEqual(t, poolReturnsCalls, 1) // pool.Return must not be called again
})
}
})

ct.Run("Does not put back connection twice to the pool after second failed run", func(t *testing.T) {
_, pool, session := createSession()
runTxErr := errors.New("oopsie")
conn := &ConnFake{Alive: true, RunTxErr: runTxErr}
poolReturnsCalls := 0
pool.BorrowConn = conn
pool.ReturnHook = func() {
poolReturnsCalls++
}
tx, err := session.BeginTransaction(ctx)

AssertNoError(t, err)
_, err = tx.Run(ctx, "RETURN 42", nil)
AssertDeepEquals(t, err, runTxErr)
AssertIntEqual(t, poolReturnsCalls, 1)
_, err = tx.Run(ctx, "RETURN 42", nil)
AssertErrorMessageContains(t, err, "cannot use this transaction")
AssertIntEqual(t, poolReturnsCalls, 1) // pool.Return must not be called again
})
})

}

func assertTokenExpiredError(t *testing.T, err error) {
Expand Down
40 changes: 21 additions & 19 deletions neo4j/transaction_with_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ type explicitTransaction struct {
conn db.Connection
fetchSize int
txHandle db.TxHandle
done bool
runFailed bool
err error
onClosed func(*explicitTransaction)
}

func (tx *explicitTransaction) Run(ctx context.Context, cypher string,
params map[string]any) (ResultWithContext, error) {
func (tx *explicitTransaction) Run(ctx context.Context, cypher string, params map[string]any) (ResultWithContext, error) {
if tx.conn == nil {
return nil, transactionAlreadyCompletedError()
}
stream, err := tx.conn.RunTx(ctx, tx.txHandle, db.Command{Cypher: cypher, Params: params, FetchSize: tx.fetchSize})
if err != nil {
tx.err = err
Expand All @@ -79,46 +80,47 @@ func (tx *explicitTransaction) Run(ctx context.Context, cypher string,
}

func (tx *explicitTransaction) Commit(ctx context.Context) error {
if tx.runFailed {
tx.runFailed, tx.done = false, true
return tx.err
}
if tx.done {
return transactionAlreadyCompletedError()
if err := tx.checkCompleted(); err != nil {
return err
}
tx.err = tx.conn.TxCommit(ctx, tx.txHandle)
tx.done = true
tx.onClosed(tx)
return errorutil.WrapError(tx.err)
}

func (tx *explicitTransaction) Close(ctx context.Context) error {
if tx.done {
if tx.conn == nil {
// repeated calls to Close => NOOP
return nil
}
return tx.Rollback(ctx)
}

func (tx *explicitTransaction) Rollback(ctx context.Context) error {
if tx.runFailed {
tx.done, tx.runFailed = true, false
return nil
}
if tx.done {
return transactionAlreadyCompletedError()
if err := tx.checkCompleted(); err != nil {
return err
}
if !tx.conn.IsAlive() || tx.conn.HasFailed() {
// tx implicitly rolled back by having failed
tx.err = nil
} else {
tx.err = tx.conn.TxRollback(ctx, tx.txHandle)
}
tx.done = true
tx.onClosed(tx)
return errorutil.WrapError(tx.err)
}

func (tx *explicitTransaction) checkCompleted() error {
if tx.runFailed {
tx.runFailed = false
return tx.err
}
if tx.conn == nil {
return transactionAlreadyCompletedError()
}
return nil
}

func (tx *explicitTransaction) legacy() Transaction {
return &transaction{
delegate: tx,
Expand Down Expand Up @@ -189,5 +191,5 @@ func (tx *autocommitTransaction) discard(ctx context.Context) {
}

func transactionAlreadyCompletedError() *UsageError {
return &UsageError{Message: "commit or rollback already called once on this transaction"}
return &UsageError{Message: "cannot use this transaction, because it has been committed or rolled back either because of an error or explicit termination"}
}

0 comments on commit 8c1268b

Please sign in to comment.