Skip to content

Commit

Permalink
Merge #69224
Browse files Browse the repository at this point in the history
69224: sql: implement SET LOCAL r=rafiss a=otan

Resolves: #32562
Release justification: high priority bug fix

First 3 commits from #69394

Co-authored-by: Oliver Tan <otan@cockroachlabs.com>
  • Loading branch information
craig[bot] and otan committed Aug 27, 2021
2 parents 2b8d42d + 0c87ad9 commit cae3152
Show file tree
Hide file tree
Showing 35 changed files with 1,215 additions and 224 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/set_local_stmt.bnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
set_local_stmt ::=
'SET' 'LOCAL' set_rest
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/set_var.bnf
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
preparable_set_stmt ::=
'SET' 'SESSION' set_rest
| 'SET' set_rest
| set_local_stmt
| set_csetting_stmt
| use_stmt
22 changes: 13 additions & 9 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ select_stmt ::=

preparable_set_stmt ::=
set_session_stmt
| set_local_stmt
| set_csetting_stmt
| use_stmt

Expand Down Expand Up @@ -603,6 +604,9 @@ set_session_stmt ::=
| 'SET' set_rest_more
| 'SET' 'SESSION' 'CHARACTERISTICS' 'AS' 'TRANSACTION' transaction_mode_list

set_local_stmt ::=
'SET' 'LOCAL' set_rest

set_csetting_stmt ::=
'SET' 'CLUSTER' 'SETTING' var_name to_or_eq var_value

Expand Down Expand Up @@ -1566,6 +1570,9 @@ opt_for_locking_clause ::=
set_rest_more ::=
set_rest

set_rest ::=
generic_set

to_or_eq ::=
'='
| 'TO'
Expand Down Expand Up @@ -1831,9 +1838,6 @@ abbreviated_revoke_stmt ::=
role_options ::=
( role_option ) ( ( role_option ) )*

set_rest ::=
generic_set

backup_options ::=
'ENCRYPTION_PASSPHRASE' '=' string_or_placeholder
| 'REVISION_HISTORY'
Expand Down Expand Up @@ -2117,6 +2121,9 @@ offset_clause ::=
'OFFSET' a_expr
| 'OFFSET' select_fetch_first_value row_or_rows

generic_set ::=
var_name to_or_eq var_list

extra_var_value ::=
'ON'
| cockroachdb_extra_reserved_keyword
Expand Down Expand Up @@ -2330,9 +2337,6 @@ role_option ::=
| password_clause
| valid_until_clause

generic_set ::=
var_name to_or_eq var_list

d_expr ::=
'ICONST'
| 'FCONST'
Expand Down Expand Up @@ -2542,6 +2546,9 @@ all_or_distinct ::=
for_locking_item ::=
for_locking_strength opt_locked_rels opt_nowait_or_skip

var_list ::=
( var_value ) ( ( ',' var_value ) )*

opt_ordinality ::=
'WITH' 'ORDINALITY'
|
Expand Down Expand Up @@ -2682,9 +2689,6 @@ valid_until_clause ::=
'VALID' 'UNTIL' string_or_placeholder
| 'VALID' 'UNTIL' 'NULL'

var_list ::=
( var_value ) ( ( ',' var_value ) )*

typed_literal ::=
func_name_no_crdb_extra 'SCONST'
| const_typename 'SCONST'
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func (s *Server) SetupConn(
// Set the SessionData from args.SessionDefaults. This also validates the
// respective values.
sdMutIterator := s.makeSessionDataMutatorIterator(sds, args.SessionDefaults)
if err := sdMutIterator.forEachMutatorError(func(m *sessionDataMutator) error {
if err := sdMutIterator.applyOnEachMutatorError(func(m *sessionDataMutator) error {
return resetSessionVars(ctx, m)
}); err != nil {
log.Errorf(ctx, "error setting up client session: %s", err)
Expand Down Expand Up @@ -679,6 +679,7 @@ func (s *Server) newSessionData(args SessionArgs) *sessiondata.SessionData {
},
LocalOnlySessionData: sessiondatapb.LocalOnlySessionData{
ResultsBufferSize: args.ConnResultsBufferSize,
IsSuperuser: args.IsSuperuser,
},
}
s.populateMinimalSessionData(sd)
Expand Down Expand Up @@ -867,7 +868,7 @@ func (s *Server) newConnExecutorWithTxn(
if txn.Type() == kv.LeafTxn {
// If the txn is a leaf txn it is not allowed to perform mutations. For
// sanity, set read only on the session.
ex.dataMutatorIterator.forEachMutator(func(m *sessionDataMutator) {
ex.dataMutatorIterator.applyForEachMutator(func(m *sessionDataMutator) {
m.SetReadOnly(true)
})
}
Expand Down
52 changes: 49 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func (ex *connExecutor) execStmtInOpenState(

case *tree.RollbackTransaction:
// RollbackTransaction is executed fully here; there's no plan for it.
ev, payload := ex.rollbackSQLTransaction(ctx)
ev, payload := ex.rollbackSQLTransaction(ctx, s)
return ev, payload, nil

case *tree.Savepoint:
Expand Down Expand Up @@ -818,9 +818,46 @@ func (ex *connExecutor) commitSQLTransaction(
return ex.makeErrEvent(err, ast)
}
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndTransactionCommit, timeutil.Now())
if err := ex.reportParamStatusUpdateChanges(func() error {
ex.sessionDataStack.PopAll()
return nil
}); err != nil {
return ex.makeErrEvent(err, ast)
}
return eventTxnFinishCommitted{}, nil
}

// reportParamStatusUpdateChanges reports param status update changes after the
// given fn has been executed.
func (ex *connExecutor) reportParamStatusUpdateChanges(fn func() error) error {
before := ex.sessionDataStack.Top()
if err := fn(); err != nil {
return err
}
if ex.dataMutatorIterator.paramStatusUpdater == nil {
return nil
}
after := ex.sessionDataStack.Top()
for _, param := range bufferableParamStatusUpdates {
_, v, err := getSessionVar(param.lowerName, false /* missingOk */)
if err != nil {
return err
}
if v.GetFromSessionData == nil {
return errors.AssertionFailedf("GetFromSessionData for %s must be set", param.name)
}
beforeVal := v.GetFromSessionData(before)
afterVal := v.GetFromSessionData(after)
if beforeVal != afterVal {
ex.dataMutatorIterator.paramStatusUpdater.BufferParamStatusUpdate(
param.name,
afterVal,
)
}
}
return nil
}

func (ex *connExecutor) commitSQLTransactionInternal(
ctx context.Context, ast tree.Statement,
) error {
Expand Down Expand Up @@ -875,10 +912,18 @@ func (ex *connExecutor) createJobs(ctx context.Context) error {

// rollbackSQLTransaction executes a ROLLBACK statement: the KV transaction is
// rolled-back and an event is produced.
func (ex *connExecutor) rollbackSQLTransaction(ctx context.Context) (fsm.Event, fsm.EventPayload) {
func (ex *connExecutor) rollbackSQLTransaction(
ctx context.Context, stmt tree.Statement,
) (fsm.Event, fsm.EventPayload) {
if err := ex.state.mu.txn.Rollback(ctx); err != nil {
log.Warningf(ctx, "txn rollback failed: %s", err)
}
if err := ex.reportParamStatusUpdateChanges(func() error {
ex.sessionDataStack.PopAll()
return nil
}); err != nil {
return ex.makeErrEvent(err, stmt)
}
// We're done with this txn.
return eventTxnFinishAborted{}, nil
}
Expand Down Expand Up @@ -1378,6 +1423,7 @@ func (ex *connExecutor) execStmtInNoTxnState(
if err != nil {
return ex.makeErrEvent(err, s)
}
ex.sessionDataStack.PushTopClone()
return eventTxnStart{ImplicitTxn: fsm.False},
makeEventTxnStartPayload(
ex.txnPriorityWithSessionDefault(s.Modes.UserPriority),
Expand Down Expand Up @@ -1437,7 +1483,7 @@ func (ex *connExecutor) execStmtInAbortedState(
// Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too.
res.ResetStmtType((*tree.RollbackTransaction)(nil))
}
return ex.rollbackSQLTransaction(ctx)
return ex.rollbackSQLTransaction(ctx, s)

case *tree.RollbackToSavepoint:
return ex.execRollbackToSavepointInAbortedState(ctx, s)
Expand Down
45 changes: 42 additions & 3 deletions pkg/sql/conn_executor_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (ex *connExecutor) execSavepointInOpenState(
numDDL: ex.extraTxnState.numDDL,
}
savepoints.push(sp)
ex.sessionDataStack.PushTopClone()

return nil, nil, nil
}
Expand All @@ -124,10 +125,24 @@ func (ex *connExecutor) execRelease(
return ev, payload
}

// When doing RELEASE SAVEPOINT, all LOCAL session parameters are preserved.
currSessionData := ex.sessionDataStack.Top()

// Discard our savepoint and all further ones. Depending on what happens with
// the release below, we might add this savepoint back.
env.popToIdx(idx - 1)

// Pop all the savepoint SessionData objects, and then an extra element.
// We will restore the currSessionData on to the stack, as releasing still
// preserves the current SessionData in the transaction.
// We do not have to report param status updates as the SessionData
// remains the same after this transformation!
numPoppedElems := (len(ex.extraTxnState.savepoints) - idx) + 1
if err := ex.sessionDataStack.PopN(numPoppedElems); err != nil {
return ex.makeErrEvent(err, s)
}
ex.sessionDataStack.Push(currSessionData)

if entry.commitOnRelease {
res.ResetStmtType((*tree.CommitTransaction)(nil))
err := ex.commitSQLTransactionInternal(ctx, s)
Expand All @@ -140,6 +155,7 @@ func (ex *connExecutor) execRelease(
// Add the savepoint back. We want to allow a ROLLBACK TO SAVEPOINT
// cockroach_restart (that's the whole point of commitOnRelease).
env.push(*entry)
ex.sessionDataStack.PushTopClone()

rc, canAutoRetry := ex.getRewindTxnCapability()
ev := eventRetriableErr{
Expand All @@ -153,7 +169,7 @@ func (ex *connExecutor) execRelease(
// Non-retriable error. The transaction might have committed (i.e. the
// error might be ambiguous). We can't allow a ROLLBACK TO SAVEPOINT to
// recover the transaction, so we're not adding the savepoint back.
ex.rollbackSQLTransaction(ctx)
ex.rollbackSQLTransaction(ctx, s)
ev := eventNonRetriableErr{IsCommit: fsm.FromBool(false)}
payload := eventNonRetriableErrPayload{err: err}
return ev, payload
Expand Down Expand Up @@ -188,7 +204,9 @@ func (ex *connExecutor) execRollbackToSavepointInOpenState(
return ev, payload
}

ex.extraTxnState.savepoints.popToIdx(idx)
if err := ex.popSavepointsToIdx(s, idx); err != nil {
return ex.makeErrEvent(err, s)
}

if entry.kvToken.Initial() {
return eventTxnRestart{}, nil
Expand Down Expand Up @@ -262,7 +280,9 @@ func (ex *connExecutor) execRollbackToSavepointInAbortedState(
return ev, payload
}

ex.extraTxnState.savepoints.popToIdx(idx)
if err := ex.popSavepointsToIdx(s, idx); err != nil {
return ex.makeErrEvent(err, s)
}

if err := ex.state.mu.txn.RollbackToSavepoint(ctx, entry.kvToken); err != nil {
return ex.makeErrEvent(err, s)
Expand All @@ -274,6 +294,25 @@ func (ex *connExecutor) execRollbackToSavepointInAbortedState(
return eventSavepointRollback{}, nil
}

// popSavepointsToIdx pops savepoints and SessionData elements related to
// the savepoint up to the given idx.
func (ex *connExecutor) popSavepointsToIdx(stmt tree.Statement, idx int) error {
if err := ex.reportParamStatusUpdateChanges(func() error {
numPoppedElems := len(ex.extraTxnState.savepoints) - idx
ex.extraTxnState.savepoints.popToIdx(idx)
if err := ex.sessionDataStack.PopN(numPoppedElems); err != nil {
return err
}
// We need to restore the top of the session data stack, which was the
// SessionData just before the the savepoint was created.
ex.sessionDataStack.PushTopClone()
return nil
}); err != nil {
return err
}
return nil
}

// isCommitOnReleaseSavepoint returns true if the savepoint name implies special
// release semantics: releasing it commits the underlying KV txn.
func (ex *connExecutor) isCommitOnReleaseSavepoint(savepoint tree.Name) bool {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,14 +905,15 @@ func TestTrimSuspendedPortals(t *testing.T) {

// setup the portal
require.NoError(t, p.SendOneLine(`Query {"String": "BEGIN"}`))
require.NoError(t, p.SendOneLine(fmt.Sprintf(`Parse {"Query": "%s"}`, selectStmt)))
require.NoError(t, p.SendOneLine(fmt.Sprintf(`Bind {"DestinationPortal": "%s"}`, portalName)))

// wait for ready
until := pgtest.ParseMessages("ReadyForQuery")
_, err = p.Until(false /* keepErrMsg */, until...)
require.NoError(t, err)

require.NoError(t, p.SendOneLine(fmt.Sprintf(`Parse {"Query": "%s"}`, selectStmt)))
require.NoError(t, p.SendOneLine(fmt.Sprintf(`Bind {"DestinationPortal": "%s"}`, portalName)))

// Execute the portal 10 times
for i := 1; i <= 10; i++ {

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (p *planner) Discard(ctx context.Context, s *tree.Discard) (planNode, error
}

// RESET ALL
if err := p.sessionDataMutatorIterator.forEachMutatorError(
if err := p.sessionDataMutatorIterator.applyOnEachMutatorError(
func(m *sessionDataMutator) error {
return resetSessionVars(ctx, m)
},
Expand Down
Loading

0 comments on commit cae3152

Please sign in to comment.