From 0a658c19cd164e7c021eaff7f73db173f0650e8c Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 29 Nov 2019 09:17:08 +0100 Subject: [PATCH] sql: make SQL statements operate on a read snapshot Previously, all individual KV reads performed by a SQL statement were able to observe the most recent KV writes that it performed itself. This is in violation of PostgreSQL's dialect semantics, which mandate that statements can only observe data as per a read snapshot taken at the instant a statement begins execution. Moreover, this invalid behavior causes a real observable bug: a statement that reads and writes to the same table may never complete, as the read part may become able to consume the rows that it itself writes. Or worse, it could cause logical operations to be performed multiple times: https://en.wikipedia.org/wiki/Halloween_Problem This patch fixes it (partially) by exploiting the new KV `Step()` API which decouples the read and write sequence numbers. The fix is not complete however; additional sequence points must also be introduced prior to FK existence checks and cascading actions. See [#42864](https://github.com/cockroachdb/cockroach/pull/42864) and [#33475](https://github.com/cockroachdb/cockroach/issues/33475) for details. For now, this patch excludes any mutation that 1) involves a foreign key and 2) does not uyse the new CBO-driven FK logic, from the new (fixed) semantics. When a mutation involves a FK without CBO involvement, the previous (broken) semantics still apply. Release note (bug fix): SQL mutation statements that target tables with no foreign key relationships now correctly read data as per the state of the database when the statement started execution. This is required for compatibility with PostgreSQL and to ensure deterministic behavior when certain operations are parallelized. Prior to this fix, a statement [could incorrectly operate multiple times](https://en.wikipedia.org/wiki/Halloween_Problem) on data that itself was writing, and potentially never terminate. This fix is limited to tables without FK relationships, and for certain operations on tables with FK relationships; in other cases, the fix is not active and the bug is still present. A full fix will be provided in a later release. --- pkg/ccl/importccl/import_processor.go | 15 +- pkg/ccl/importccl/import_processor_test.go | 2 +- pkg/ccl/importccl/load.go | 2 +- pkg/ccl/importccl/read_import_avro.go | 3 +- pkg/ccl/importccl/read_import_avro_test.go | 2 +- pkg/ccl/importccl/read_import_base.go | 2 +- pkg/ccl/importccl/read_import_csv.go | 2 +- pkg/ccl/importccl/read_import_mysql.go | 3 +- pkg/ccl/importccl/read_import_mysql_test.go | 2 +- pkg/ccl/importccl/read_import_mysqlout.go | 3 +- pkg/ccl/importccl/read_import_pgcopy.go | 3 +- pkg/ccl/importccl/read_import_pgdump.go | 3 +- pkg/ccl/importccl/read_import_workload.go | 2 +- .../client/mock_transactional_sender.go | 19 ++- pkg/internal/client/sender.go | 41 +++-- pkg/internal/client/txn.go | 42 ++++- pkg/kv/txn_coord_sender.go | 33 +++- pkg/kv/txn_interceptor_seq_num_allocator.go | 42 +++-- .../txn_interceptor_seq_num_allocator_test.go | 8 +- pkg/sql/alter_index.go | 5 + pkg/sql/alter_sequence.go | 5 + pkg/sql/alter_table.go | 5 + pkg/sql/backfill.go | 4 +- pkg/sql/backfill/backfill.go | 1 + pkg/sql/conn_executor_exec.go | 59 ++++++- pkg/sql/conn_executor_test.go | 72 +++++++- pkg/sql/copy.go | 2 +- pkg/sql/create_index.go | 5 + pkg/sql/create_sequence.go | 5 + pkg/sql/create_table.go | 161 ++++++++++-------- pkg/sql/create_view.go | 5 + pkg/sql/distsql_running.go | 7 + pkg/sql/drop_index.go | 5 + pkg/sql/drop_sequence.go | 5 + pkg/sql/drop_table.go | 5 + pkg/sql/drop_view.go | 5 + pkg/sql/grant_revoke.go | 5 + pkg/sql/insert.go | 9 +- pkg/sql/insert_fast_path.go | 10 +- pkg/sql/logictest/logic.go | 2 +- .../testdata/logic_test/statement_source | 4 +- pkg/sql/opt_exec_factory.go | 19 ++- pkg/sql/plan.go | 41 ++++- pkg/sql/rename_column.go | 5 + pkg/sql/rename_database.go | 5 + pkg/sql/rename_index.go | 5 + pkg/sql/rename_table.go | 5 + pkg/sql/row/cascader.go | 54 +++++- pkg/sql/row/deleter.go | 8 +- pkg/sql/row/fetcher.go | 4 +- pkg/sql/row/fk_existence_delete.go | 21 +++ pkg/sql/row/fk_existence_insert.go | 21 +++ pkg/sql/row/fk_existence_update.go | 5 +- pkg/sql/row/inserter.go | 3 +- pkg/sql/row/kv_batch_fetcher.go | 5 +- pkg/sql/row/row_converter.go | 2 + pkg/sql/row/updater.go | 12 +- pkg/sql/rowexec/bulk_row_writer.go | 3 +- pkg/sql/scan_test.go | 4 +- pkg/sql/set_zone_config.go | 5 + pkg/sql/stats/create_stats_job_test.go | 2 +- pkg/sql/tablewriter_upsert_opt.go | 5 +- pkg/sql/truncate.go | 1 + pkg/sql/txn_state.go | 2 +- pkg/sql/upsert.go | 2 +- 65 files changed, 682 insertions(+), 172 deletions(-) diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 50220c9e3c40..e279784cb0e5 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -104,7 +104,10 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) { } func makeInputConverter( - spec *execinfrapb.ReadImportDataSpec, evalCtx *tree.EvalContext, kvCh chan row.KVBatch, + ctx context.Context, + spec *execinfrapb.ReadImportDataSpec, + evalCtx *tree.EvalContext, + kvCh chan row.KVBatch, ) (inputConverter, error) { var singleTable *sqlbase.TableDescriptor @@ -139,15 +142,15 @@ func makeInputConverter( kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx), nil case roachpb.IOFileFormat_MysqlOutfile: - return newMysqloutfileReader(kvCh, spec.Format.MysqlOut, singleTable, evalCtx) + return newMysqloutfileReader(ctx, kvCh, spec.Format.MysqlOut, singleTable, evalCtx) case roachpb.IOFileFormat_Mysqldump: - return newMysqldumpReader(kvCh, spec.Tables, evalCtx) + return newMysqldumpReader(ctx, kvCh, spec.Tables, evalCtx) case roachpb.IOFileFormat_PgCopy: - return newPgCopyReader(kvCh, spec.Format.PgCopy, singleTable, evalCtx) + return newPgCopyReader(ctx, kvCh, spec.Format.PgCopy, singleTable, evalCtx) case roachpb.IOFileFormat_PgDump: - return newPgDumpReader(kvCh, spec.Format.PgDump, spec.Tables, evalCtx) + return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.Tables, evalCtx) case roachpb.IOFileFormat_Avro: - return newAvroInputReader(kvCh, singleTable, spec.Format.Avro, evalCtx) + return newAvroInputReader(ctx, kvCh, singleTable, spec.Format.Avro, evalCtx) default: return nil, errors.Errorf("Requested IMPORT format (%d) not supported by this node", spec.Format.Format) } diff --git a/pkg/ccl/importccl/import_processor_test.go b/pkg/ccl/importccl/import_processor_test.go index 42107e6d9536..92ecff71d407 100644 --- a/pkg/ccl/importccl/import_processor_test.go +++ b/pkg/ccl/importccl/import_processor_test.go @@ -110,7 +110,7 @@ func TestConverterFlushesBatches(t *testing.T) { } kvCh := make(chan row.KVBatch, batchSize) - conv, err := makeInputConverter(converterSpec, &evalCtx, kvCh) + conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh) if err != nil { t.Fatalf("makeInputConverter() error = %v", err) } diff --git a/pkg/ccl/importccl/load.go b/pkg/ccl/importccl/load.go index e1c76671b4ee..ae884f010b9a 100644 --- a/pkg/ccl/importccl/load.go +++ b/pkg/ccl/importccl/load.go @@ -186,7 +186,7 @@ func Load( } ri, err = row.MakeInserter( - nil, tableDesc, tableDesc.Columns, row.SkipFKs, nil /* fkTables */, &sqlbase.DatumAlloc{}, + ctx, nil, tableDesc, tableDesc.Columns, row.SkipFKs, nil /* fkTables */, &sqlbase.DatumAlloc{}, ) if err != nil { return backupccl.BackupDescriptor{}, errors.Wrap(err, "make row inserter") diff --git a/pkg/ccl/importccl/read_import_avro.go b/pkg/ccl/importccl/read_import_avro.go index 3e731441ac46..23e913aa41e6 100644 --- a/pkg/ccl/importccl/read_import_avro.go +++ b/pkg/ccl/importccl/read_import_avro.go @@ -428,12 +428,13 @@ type avroInputReader struct { var _ inputConverter = &avroInputReader{} func newAvroInputReader( + ctx context.Context, kvCh chan row.KVBatch, tableDesc *sqlbase.TableDescriptor, avro roachpb.AvroOptions, evalCtx *tree.EvalContext, ) (*avroInputReader, error) { - conv, err := row.NewDatumRowConverter(tableDesc, nil /* targetColNames */, evalCtx, kvCh) + conv, err := row.NewDatumRowConverter(ctx, tableDesc, nil /* targetColNames */, evalCtx, kvCh) if err != nil { return nil, err diff --git a/pkg/ccl/importccl/read_import_avro_test.go b/pkg/ccl/importccl/read_import_avro_test.go index e9a753905d08..845da48db678 100644 --- a/pkg/ccl/importccl/read_import_avro_test.go +++ b/pkg/ccl/importccl/read_import_avro_test.go @@ -202,7 +202,7 @@ func (th *testHelper) newRecordStream( // we're using nil kv channel for this test). defer row.TestingSetDatumRowConverterBatchSize(numRecords + 1)() evalCtx := tree.MakeTestingEvalContext(nil) - conv, err := row.NewDatumRowConverter(th.schemaTable, nil, &evalCtx, nil) + conv, err := row.NewDatumRowConverter(context.Background(), th.schemaTable, nil, &evalCtx, nil) require.NoError(t, err) opts := roachpb.AvroOptions{ diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index 81f16aa85792..16d400ef1bb4 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -41,7 +41,7 @@ func runImport( ) (*roachpb.BulkOpSummary, error) { // Used to send ingested import rows to the KV layer. kvCh := make(chan row.KVBatch, 10) - conv, err := makeInputConverter(spec, flowCtx.NewEvalCtx(), kvCh) + conv, err := makeInputConverter(ctx, spec, flowCtx.NewEvalCtx(), kvCh) if err != nil { return nil, err } diff --git a/pkg/ccl/importccl/read_import_csv.go b/pkg/ccl/importccl/read_import_csv.go index 3fde34839a10..f076db680426 100644 --- a/pkg/ccl/importccl/read_import_csv.go +++ b/pkg/ccl/importccl/read_import_csv.go @@ -213,7 +213,7 @@ func (c *csvInputReader) convertRecordWorker(ctx context.Context, workerID int) // Create a new evalCtx per converter so each go routine gets its own // collationenv, which can't be accessed in parallel. evalCtx := c.evalCtx.Copy() - conv, err := row.NewDatumRowConverter(c.tableDesc, c.targetCols, evalCtx, c.kvCh) + conv, err := row.NewDatumRowConverter(ctx, c.tableDesc, c.targetCols, evalCtx, c.kvCh) if err != nil { return err } diff --git a/pkg/ccl/importccl/read_import_mysql.go b/pkg/ccl/importccl/read_import_mysql.go index d2a449e40bcc..8d24a6e3e9f8 100644 --- a/pkg/ccl/importccl/read_import_mysql.go +++ b/pkg/ccl/importccl/read_import_mysql.go @@ -53,6 +53,7 @@ type mysqldumpReader struct { var _ inputConverter = &mysqldumpReader{} func newMysqldumpReader( + ctx context.Context, kvCh chan row.KVBatch, tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable, evalCtx *tree.EvalContext, @@ -65,7 +66,7 @@ func newMysqldumpReader( converters[name] = nil continue } - conv, err := row.NewDatumRowConverter(table.Desc, nil /* targetColNames */, evalCtx, kvCh) + conv, err := row.NewDatumRowConverter(ctx, table.Desc, nil /* targetColNames */, evalCtx, kvCh) if err != nil { return nil, err } diff --git a/pkg/ccl/importccl/read_import_mysql_test.go b/pkg/ccl/importccl/read_import_mysql_test.go index f78edcba7d25..63ae1c076031 100644 --- a/pkg/ccl/importccl/read_import_mysql_test.go +++ b/pkg/ccl/importccl/read_import_mysql_test.go @@ -42,7 +42,7 @@ func TestMysqldumpDataReader(t *testing.T) { tables := map[string]*execinfrapb.ReadImportDataSpec_ImportTable{"simple": {Desc: table}} kvCh := make(chan row.KVBatch, 10) - converter, err := newMysqldumpReader(kvCh, tables, testEvalCtx) + converter, err := newMysqldumpReader(ctx, kvCh, tables, testEvalCtx) if err != nil { t.Fatal(err) diff --git a/pkg/ccl/importccl/read_import_mysqlout.go b/pkg/ccl/importccl/read_import_mysqlout.go index 8507a1b89c5f..b46f8b342b71 100644 --- a/pkg/ccl/importccl/read_import_mysqlout.go +++ b/pkg/ccl/importccl/read_import_mysqlout.go @@ -32,12 +32,13 @@ type mysqloutfileReader struct { var _ inputConverter = &mysqloutfileReader{} func newMysqloutfileReader( + ctx context.Context, kvCh chan row.KVBatch, opts roachpb.MySQLOutfileOptions, tableDesc *sqlbase.TableDescriptor, evalCtx *tree.EvalContext, ) (*mysqloutfileReader, error) { - conv, err := row.NewDatumRowConverter(tableDesc, nil /* targetColNames */, evalCtx, kvCh) + conv, err := row.NewDatumRowConverter(ctx, tableDesc, nil /* targetColNames */, evalCtx, kvCh) if err != nil { return nil, err } diff --git a/pkg/ccl/importccl/read_import_pgcopy.go b/pkg/ccl/importccl/read_import_pgcopy.go index 68ebcb1f7557..81bb5b221e1b 100644 --- a/pkg/ccl/importccl/read_import_pgcopy.go +++ b/pkg/ccl/importccl/read_import_pgcopy.go @@ -39,12 +39,13 @@ type pgCopyReader struct { var _ inputConverter = &pgCopyReader{} func newPgCopyReader( + ctx context.Context, kvCh chan row.KVBatch, opts roachpb.PgCopyOptions, tableDesc *sqlbase.TableDescriptor, evalCtx *tree.EvalContext, ) (*pgCopyReader, error) { - conv, err := row.NewDatumRowConverter(tableDesc, nil /* targetColNames */, evalCtx, kvCh) + conv, err := row.NewDatumRowConverter(ctx, tableDesc, nil /* targetColNames */, evalCtx, kvCh) if err != nil { return nil, err } diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index a2a9ad0b16d3..6c40a1360edf 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -409,6 +409,7 @@ var _ inputConverter = &pgDumpReader{} // newPgDumpReader creates a new inputConverter for pg_dump files. func newPgDumpReader( + ctx context.Context, kvCh chan row.KVBatch, opts roachpb.PgDumpOptions, descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable, @@ -417,7 +418,7 @@ func newPgDumpReader( converters := make(map[string]*row.DatumRowConverter, len(descs)) for name, table := range descs { if table.Desc.IsTable() { - conv, err := row.NewDatumRowConverter(table.Desc, nil /* targetColNames */, evalCtx, kvCh) + conv, err := row.NewDatumRowConverter(ctx, table.Desc, nil /* targetColNames */, evalCtx, kvCh) if err != nil { return nil, err } diff --git a/pkg/ccl/importccl/read_import_workload.go b/pkg/ccl/importccl/read_import_workload.go index 603d6838bc88..2b4d5d5befda 100644 --- a/pkg/ccl/importccl/read_import_workload.go +++ b/pkg/ccl/importccl/read_import_workload.go @@ -202,7 +202,7 @@ func NewWorkloadKVConverter( // // This worker needs its own EvalContext and DatumAlloc. func (w *WorkloadKVConverter) Worker(ctx context.Context, evalCtx *tree.EvalContext) error { - conv, err := row.NewDatumRowConverter(w.tableDesc, nil /* targetColNames */, evalCtx, w.kvCh) + conv, err := row.NewDatumRowConverter(ctx, w.tableDesc, nil /* targetColNames */, evalCtx, w.kvCh) if err != nil { return err } diff --git a/pkg/internal/client/mock_transactional_sender.go b/pkg/internal/client/mock_transactional_sender.go index 60658038e314..8a0416f578f9 100644 --- a/pkg/internal/client/mock_transactional_sender.go +++ b/pkg/internal/client/mock_transactional_sender.go @@ -164,10 +164,23 @@ func (m *MockTransactionalSender) PrepareRetryableError(ctx context.Context, msg } // Step is part of the TxnSender interface. -func (m *MockTransactionalSender) Step() error { panic("unimplemented") } +func (m *MockTransactionalSender) Step(_ context.Context) error { + // At least one test (e.g sql/TestPortalsDestroyedOnTxnFinish) requires + // the ability to run simple statements that do not access storage, + // and that requires a non-panicky Step(). + return nil +} -// DisableStepping is part of the TxnSender interface. -func (m *MockTransactionalSender) DisableStepping() error { panic("unimplemented") } +// ConfigureStepping is part of the TxnSender interface. +func (m *MockTransactionalSender) ConfigureStepping(context.Context, SteppingMode) SteppingMode { + // See Step() above. + return SteppingDisabled +} + +// GetSteppingMode is part of the TxnSender interface. +func (m *MockTransactionalSender) GetSteppingMode(context.Context) SteppingMode { + return SteppingDisabled +} // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index f7201650fc54..47f6ca66d969 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -239,24 +239,41 @@ type TxnSender interface { // operations observe the data at the time the snapshot was // established and ignore writes performed since. // - // Before the first step is taken, the transaction operates as if - // there was a step after every write: each read to a key is able to - // see the latest write before it. This makes the step behavior - // opt-in and backward-compatible with existing code which does not - // need it. + // Step() can only be called after stepping mode has been enabled + // using ConfigureStepping(SteppingEnabled). + // // The method is idempotent. - Step() error + Step(context.Context) error - // DisableStepping disables the sequencing point behavior and - // ensures that every read can read the latest write. The effect - // remains disabled until the next call to Step(). The method is - // idempotent. + // ConfigureStepping sets the sequencing point behavior. // // Note that a Sender is initially in the non-stepping mode, - // i.e. uses reads-own-writes by default. - DisableStepping() error + // i.e. uses reads-own-writes by default. This makes the step + // behavior opt-in and backward-compatible with existing code which + // does not need it. + // + // Calling ConfigureStepping(SteppingEnabled) when the stepping mode + // is currently disabled implies calling Step(), for convenience. + ConfigureStepping(ctx context.Context, mode SteppingMode) (prevMode SteppingMode) + + // GetSteppingMode accompanies ConfigureStepping. It is provided + // for use in tests and assertion checks. + GetSteppingMode(ctx context.Context) (curMode SteppingMode) } +// SteppingMode is the argument type to ConfigureStepping. +type SteppingMode bool + +const ( + // SteppingDisabled is the default mode, where each read can + // observe the latest write. + SteppingDisabled SteppingMode = false + + // SteppingEnabled can be set to indicate that read operations + // operate on a snapshot taken at the latest Step() invocation. + SteppingEnabled SteppingMode = true +) + // TxnStatusOpt represents options for TxnSender.GetMeta(). type TxnStatusOpt int diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index fa344fecd5dc..513759af2c80 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -76,6 +76,7 @@ type Txn struct { } // NewTxn returns a new RootTxn. +// Note: for SQL usage, prefer NewTxnWithSteppingEnabled() below. // // If the transaction is used to send any operations, CommitOrCleanup() or // CleanupOnError() should eventually be called to commit/rollback the @@ -109,6 +110,13 @@ func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn { return NewTxnFromProto(ctx, db, gatewayNodeID, now, RootTxn, &kvTxn) } +// NewTxnWithSteppingEnabled is like NewTxn but suitable for use by SQL. +func NewTxnWithSteppingEnabled(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn { + txn := NewTxn(ctx, db, gatewayNodeID) + _ = txn.ConfigureStepping(ctx, SteppingEnabled) + return txn +} + // NewTxnFromProto is like NewTxn but assumes the Transaction object is already initialized. // Do not use this directly; use NewTxn() instead. // This function exists for testing only. @@ -766,7 +774,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) } } - cause := errors.Cause(err) + cause := errors.UnwrapAll(err) var retryable bool switch t := cause.(type) { @@ -1035,8 +1043,11 @@ func (txn *Txn) replaceRootSenderIfTxnAbortedLocked( // transaction, even once the proto is reset. txn.recordPreviousTxnIDLocked(txn.mu.ID) txn.mu.ID = newTxn.ID - // Create a new txn sender. + // Create a new txn sender. We need to preserve the stepping mode, + // if any. + // prevSteppingMode := txn.mu.sender.GetSteppingMode(ctx) txn.mu.sender = txn.db.factory.RootTransactionalSender(newTxn, txn.mu.userPriority) + // txn.mu.sender.ConfigureStepping(ctx, prevSteppingMode) } func (txn *Txn) recordPreviousTxnIDLocked(prevTxnID uuid.UUID) { @@ -1179,3 +1190,30 @@ func (txn *Txn) Active() bool { defer txn.mu.Unlock() return txn.mu.sender.Active() } + +// Step performs a sequencing step. Step-wise execution must be +// already enabled. +// +// In step-wise execution, reads operate at a snapshot established at +// the last step, instead of the latest write if not yet enabled. +func (txn *Txn) Step(ctx context.Context) error { + if txn.typ != RootTxn { + return errors.WithContextTags( + errors.AssertionFailedf("txn.Step() only allowed in RootTxn"), ctx) + } + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.Step(ctx) +} + +// ConfigureStepping configures step-wise execution in the +// transaction. +func (txn *Txn) ConfigureStepping(ctx context.Context, mode SteppingMode) (prevMode SteppingMode) { + if txn.typ != RootTxn { + panic(errors.WithContextTags( + errors.AssertionFailedf("txn.ConfigureStepping() only allowed in RootTxn"), ctx)) + } + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.ConfigureStepping(ctx, mode) +} diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index b0de358397ce..39d98a0f5fc1 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -1077,22 +1077,39 @@ func (tc *TxnCoordSender) PrepareRetryableError(ctx context.Context, msg string) } // Step is part of the TxnSender interface. -func (tc *TxnCoordSender) Step() error { +func (tc *TxnCoordSender) Step(ctx context.Context) error { + // log.Infof(ctx, "STEP %+v", errors.New("WOOF")) if tc.typ != client.RootTxn { - return errors.AssertionFailedf("cannot call Step() in leaf txn") + return errors.WithContextTags( + errors.AssertionFailedf("cannot call Step() in leaf txn"), ctx) } tc.mu.Lock() defer tc.mu.Unlock() - return tc.interceptorAlloc.txnSeqNumAllocator.stepLocked() + return tc.interceptorAlloc.txnSeqNumAllocator.stepLocked(ctx) } -// DisableStepping is part of the TxnSender interface. -func (tc *TxnCoordSender) DisableStepping() error { +// ConfigureStepping is part of the TxnSender interface. +func (tc *TxnCoordSender) ConfigureStepping( + ctx context.Context, mode client.SteppingMode, +) (prevMode client.SteppingMode) { if tc.typ != client.RootTxn { - return errors.AssertionFailedf("cannot call DisableStepping() in leaf txn") + panic(errors.WithContextTags( + errors.AssertionFailedf("cannot call ConfigureStepping() in leaf txn"), ctx)) } tc.mu.Lock() defer tc.mu.Unlock() - tc.interceptorAlloc.txnSeqNumAllocator.disableSteppingLocked() - return nil + prevMode = tc.interceptorAlloc.txnSeqNumAllocator.configureSteppingLocked(mode) + // if mode != prevMode { + // log.Infof(ctx, "CONFIGURE STEPPING: %v -> %v // %+v", prevMode, mode, errors.New("WOOF")) + // } + return prevMode +} + +// GetSteppingMode is part of the TxnSender interface. +func (tc *TxnCoordSender) GetSteppingMode(ctx context.Context) (curMode client.SteppingMode) { + curMode = client.SteppingDisabled + if tc.interceptorAlloc.txnSeqNumAllocator.steppingModeEnabled { + curMode = client.SteppingEnabled + } + return curMode } diff --git a/pkg/kv/txn_interceptor_seq_num_allocator.go b/pkg/kv/txn_interceptor_seq_num_allocator.go index 3dfa1f184c47..c21621229c6a 100644 --- a/pkg/kv/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/txn_interceptor_seq_num_allocator.go @@ -13,6 +13,7 @@ package kv import ( "context" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/errors" @@ -128,29 +129,50 @@ func (s *txnSeqNumAllocator) importLeafFinalState(tfs *roachpb.LeafTxnFinalState // stepLocked bumps the read seqnum to the current write seqnum. // Used by the TxnCoordSender's Step() method. -func (s *txnSeqNumAllocator) stepLocked() error { - if s.steppingModeEnabled && s.readSeq > s.writeSeq { +func (s *txnSeqNumAllocator) stepLocked(ctx context.Context) error { + if !s.steppingModeEnabled { + return errors.AssertionFailedf("stepping mode is not enabled") + } + if s.readSeq > s.writeSeq { return errors.AssertionFailedf( "cannot step() after mistaken initialization (%d,%d)", s.writeSeq, s.readSeq) } - s.steppingModeEnabled = true s.readSeq = s.writeSeq return nil } -// disableSteppingLocked cancels the stepping behavior and -// restores read-latest-write behavior. -// Used by the TxnCoordSender's DisableStepping() method. -func (s *txnSeqNumAllocator) disableSteppingLocked() { - s.steppingModeEnabled = false - s.readSeq = 0 +// configureSteppingLocked configures the stepping mode. +// +// When enabling stepping from the non-enabled state, the read seqnum +// is set to the current write seqnum, as if a snapshot was taken at +// the point stepping was enabled. +// +// The read seqnum is otherwise not modified when trying to enable +// stepping when it was previously enabled already. This is the +// behavior needed to provide the documented API semantics of +// sender.ConfigureStepping() (see client/sender.go). +func (s *txnSeqNumAllocator) configureSteppingLocked( + newMode client.SteppingMode, +) (prevMode client.SteppingMode) { + prevEnabled := s.steppingModeEnabled + enabled := newMode == client.SteppingEnabled + s.steppingModeEnabled = enabled + if !prevEnabled && enabled { + s.readSeq = s.writeSeq + } + prevMode = client.SteppingDisabled + if prevEnabled { + prevMode = client.SteppingEnabled + } + return prevMode } // epochBumpedLocked is part of the txnInterceptor interface. func (s *txnSeqNumAllocator) epochBumpedLocked() { + // Note: we do not touch steppingModeEnabled here: if stepping mode + // was enabled on the txn, it remains enabled. s.writeSeq = 0 s.readSeq = 0 - s.steppingModeEnabled = false } // closeLocked is part of the txnInterceptor interface. diff --git a/pkg/kv/txn_interceptor_seq_num_allocator_test.go b/pkg/kv/txn_interceptor_seq_num_allocator_test.go index 562040f1f8b0..bd3a7b245e8c 100644 --- a/pkg/kv/txn_interceptor_seq_num_allocator_test.go +++ b/pkg/kv/txn_interceptor_seq_num_allocator_test.go @@ -117,8 +117,10 @@ func TestSequenceNumberAllocationWithStep(t *testing.T) { txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + s.configureSteppingLocked(true /* enabled */) + for i := 1; i <= 3; i++ { - if err := s.stepLocked(); err != nil { + if err := s.stepLocked(ctx); err != nil { t.Fatal(err) } if s.writeSeq != s.readSeq { @@ -195,8 +197,8 @@ func TestSequenceNumberAllocationWithStep(t *testing.T) { }) } - // Check that step-wise execution is disabled by DisableStepping(). - s.disableSteppingLocked() + // Check that step-wise execution is disabled by ConfigureStepping(SteppingDisabled). + s.configureSteppingLocked(false /* enabled */) currentStepSeqNum := s.writeSeq var ba roachpb.BatchRequest diff --git a/pkg/sql/alter_index.go b/pkg/sql/alter_index.go index 30afb5d6ae75..c69863a76279 100644 --- a/pkg/sql/alter_index.go +++ b/pkg/sql/alter_index.go @@ -44,6 +44,11 @@ func (p *planner) AlterIndex(ctx context.Context, n *tree.AlterIndex) (planNode, return &alterIndexNode{n: n, tableDesc: tableDesc, indexDesc: indexDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because ALTER INDEX performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *alterIndexNode) ReadingOwnWrites() {} + func (n *alterIndexNode) startExec(params runParams) error { // Commands can either change the descriptor directly (for // alterations that don't require a backfill) or add a mutation to diff --git a/pkg/sql/alter_sequence.go b/pkg/sql/alter_sequence.go index 3e7f47f9a658..34efdb0b8746 100644 --- a/pkg/sql/alter_sequence.go +++ b/pkg/sql/alter_sequence.go @@ -42,6 +42,11 @@ func (p *planner) AlterSequence(ctx context.Context, n *tree.AlterSequence) (pla return &alterSequenceNode{n: n, seqDesc: seqDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because ALTER SEQUENCE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *alterSequenceNode) ReadingOwnWrites() {} + func (n *alterSequenceNode) startExec(params runParams) error { desc := n.seqDesc diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index a98fe06dad2e..c700919a853e 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -92,6 +92,11 @@ func (p *planner) AlterTable(ctx context.Context, n *tree.AlterTable) (planNode, }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because ALTER TABLE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *alterTableNode) ReadingOwnWrites() {} + func (n *alterTableNode) startExec(params runParams) error { // Commands can either change the descriptor directly (for // alterations that don't require a backfill) or add a mutation to diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 2f078584a03e..ac7e94423148 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -615,7 +615,7 @@ func (sc *SchemaChanger) truncateIndexes( } rd, err := row.MakeDeleter( - txn, tableDesc, nil, nil, row.SkipFKs, nil /* *tree.EvalContext */, alloc, + ctx, txn, tableDesc, nil, nil, row.SkipFKs, nil /* *tree.EvalContext */, alloc, ) if err != nil { return err @@ -1660,7 +1660,7 @@ func indexTruncateInTxn( var sp roachpb.Span for done := false; !done; done = sp.Key == nil { rd, err := row.MakeDeleter( - txn, tableDesc, nil, nil, row.SkipFKs, nil /* *tree.EvalContext */, alloc, + ctx, txn, tableDesc, nil, nil, row.SkipFKs, nil /* *tree.EvalContext */, alloc, ) if err != nil { return err diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 0b2009d00fc3..848261924f30 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -168,6 +168,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( requestedCols = append(requestedCols, tableDesc.Columns...) requestedCols = append(requestedCols, cb.added...) ru, err := row.MakeUpdater( + ctx, txn, tableDesc, fkTables, diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 9ace90d37b46..dcbaa4546885 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -359,8 +359,8 @@ func (ex *connExecutor) execStmtInOpenState( discardRows = s.DiscardRows } - // For regular statements (the ones that get to this point), we don't return - // any event unless an an error happens. + // For regular statements (the ones that get to this point), we + // don't return any event unless an error happens. p := &ex.planner stmtTS := ex.server.cfg.Clock.PhysicalTime() @@ -397,6 +397,58 @@ func (ex *connExecutor) execStmtInOpenState( } } + // The first order of business is to ensure proper sequencing + // semantics. As per PostgreSQL's dialect specs, the "read" part of + // statements always see the data as per a snapshot of the database + // taken the instant the statement begins to run. In particular a + // mutation does not see its own writes. If a query contains + // multiple mutations using CTEs (WITH) or a read part following a + // mutation, all still operate on the same read snapshot. + // + // (To communicate data between CTEs and a main query, the result + // set / RETURNING can be used instead. However this is not relevant + // here.) + + // We first ensure stepping mode is enabled. + // + // This ought to be done just once when a txn gets initialized; + // unfortunately, there are too many places where the txn object + // is re-configured, re-set etc without using NewTxnWithSteppingEnabled(). + // + // Manually hunting them down and calling ConfigureStepping() each + // time would be error prone (and increase the change that a future + // change would forget to add the call). + // + // TODO(andrei): really the code should be re-architectued to ensure + // that all uses of SQL execution initialize the client.Txn using a + // single/common function. That would be where the stepping mode + // gets enabled once for all SQL statements executed "underneath". + prevSteppingMode := ex.state.mu.txn.ConfigureStepping(ctx, client.SteppingEnabled) + defer func() { _ = ex.state.mu.txn.ConfigureStepping(ctx, prevSteppingMode) }() + + // Then we create a sequencing point. + // + // This is not the only place where a sequencing point is + // placed. There are also sequencing point after every stage of + // constraint checks and cascading actions at the _end_ of a + // statement's execution. + // + // TODO(knz): At the time of this writing CockroachDB performs + // cascading actions and the corresponding FK existence checks + // interleaved with mutations. This is incorrect; the correct + // behavior, as described in issue + // https://github.com/cockroachdb/cockroach/issues/33475, is to + // execute cascading actions no earlier than after all the "main + // effects" of the current statement (including all its CTEs) have + // completed. There should be a sequence point between the end of + // the main execution and the start of the cascading actions, as + // well as in-between very stage of cascading actions. + // This TODO can be removed when the cascading code is reorganized + // accordingly and the missing call to Step() is introduced. + if err := ex.state.mu.txn.Step(ctx); err != nil { + return makeErrEvent(err) + } + if err := p.semaCtx.Placeholders.Assign(pinfo, stmt.NumPlaceholders); err != nil { return makeErrEvent(err) } @@ -423,6 +475,7 @@ func (ex *connExecutor) execStmtInOpenState( } txn := ex.state.mu.txn + if !os.ImplicitTxn.Get() && txn.IsSerializablePushAndRefreshNotPossible() { rc, canAutoRetry := ex.getRewindTxnCapability() if canAutoRetry { @@ -553,7 +606,7 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error // Create a new transaction to retry with a higher timestamp than the // timestamps used in the retry loop above. - ex.state.mu.txn = client.NewTxn(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeID) + ex.state.mu.txn = client.NewTxnWithSteppingEnabled(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeID) if err := ex.state.mu.txn.SetUserPriority(userPriority); err != nil { return err } diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index d21f162ffc60..9764c05ddefe 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -91,7 +92,7 @@ func TestSessionFinishRollsBackTxn(t *testing.T) { params, _ := tests.CreateTestServerParams() params.Knobs.SQLExecutor = aborter.executorKnobs() s, mainDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(context.TODO()) + defer s.Stopper().Stop(context.Background()) { pgURL, cleanup := sqlutils.PGUrl( t, s.ServingSQLAddr(), "TestSessionFinishRollsBackTxn", url.User(security.RootUser)) @@ -348,6 +349,75 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); } } +func TestHalloweenProblemAvoidance(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Populate a sufficiently large number of rows. We want at least as + // many rows as an insert can batch in its output buffer (to force a + // buffer flush), plus as many rows as a fetcher can fetch at once + // (to force a read buffer update), plus some more. + // + // Instead of customizing the working set size of the test up to the + // default settings for the SQL package, we scale down the config + // of the SQL package to the test. The reason for this is that + // race-enable builds are very slow and the default batch sizes + // would cause the test duration to exceed the timeout. + // + // We are also careful to override these defaults before starting + // the server, so as to not risk updating them concurrently with + // some background SQL activity. + const smallerKvBatchSize = 10 + defer row.TestingSetKVBatchSize(smallerKvBatchSize)() + const smallerInsertBatchSize = 5 + defer sql.TestingSetInsertBatchSize(smallerInsertBatchSize)() + numRows := smallerKvBatchSize + smallerInsertBatchSize + 10 + + params, _ := tests.CreateTestServerParams() + params.Insecure = true + s, db, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.TODO()) + + if _, err := db.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (x FLOAT); +`); err != nil { + t.Fatal(err) + } + + if _, err := db.Exec( + `INSERT INTO t.test(x) SELECT generate_series(1, $1)::FLOAT`, + numRows); err != nil { + t.Fatal(err) + } + + // Now slightly modify the values in duplicate rows. + // We choose a float +0.1 to ensure that none of the derived + // values become duplicate of already-present values. + if _, err := db.Exec(` +INSERT INTO t.test(x) + -- the if ensures that no row is processed two times. +SELECT IF(x::INT::FLOAT = x, + x, + crdb_internal.force_error( + 'NOOPE', 'insert saw its own writes: ' || x::STRING || ' (it is halloween today)')::FLOAT) + + 0.1 + FROM t.test +`); err != nil { + t.Fatal(err) + } + + // Finally verify that no rows has been operated on more than once. + row := db.QueryRow(`SELECT count(DISTINCT x) FROM t.test`) + var cnt int + if err := row.Scan(&cnt); err != nil { + t.Fatal(err) + } + + if cnt != 2*numRows { + t.Fatalf("expected %d rows in final table, got %d", 2*numRows, cnt) + } +} + func TestAppNameStatisticsInitialization(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index e5ac19f140f2..97a1b16937f3 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -292,7 +292,7 @@ func (c *copyMachine) preparePlanner(ctx context.Context) func(context.Context, stmtTs := c.txnOpt.stmtTimestamp autoCommit := false if txn == nil { - txn = client.NewTxn(ctx, c.p.execCfg.DB, c.p.execCfg.NodeID.Get()) + txn = client.NewTxnWithSteppingEnabled(ctx, c.p.execCfg.DB, c.p.execCfg.NodeID.Get()) txnTs = c.p.execCfg.Clock.PhysicalTime() stmtTs = txnTs autoCommit = true diff --git a/pkg/sql/create_index.go b/pkg/sql/create_index.go index 3282f05ba6b0..4e2079b49802 100644 --- a/pkg/sql/create_index.go +++ b/pkg/sql/create_index.go @@ -79,6 +79,11 @@ func MakeIndexDescriptor(n *tree.CreateIndex) (*sqlbase.IndexDescriptor, error) return &indexDesc, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE INDEX performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createIndexNode) ReadingOwnWrites() {} + func (n *createIndexNode) startExec(params runParams) error { _, dropped, err := n.tableDesc.FindIndexByName(string(n.n.Name)) if err == nil { diff --git a/pkg/sql/create_sequence.go b/pkg/sql/create_sequence.go index 0c68019a40db..d19212b8abb5 100644 --- a/pkg/sql/create_sequence.go +++ b/pkg/sql/create_sequence.go @@ -44,6 +44,11 @@ func (p *planner) CreateSequence(ctx context.Context, n *tree.CreateSequence) (p }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE SEQUENCE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createSequenceNode) ReadingOwnWrites() {} + func (n *createSequenceNode) startExec(params runParams) error { // TODO(arul): Allow temporary sequences once temp tables work for regular tables. if n.n.Temporary { diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 3066696b316f..1bf6ecde233c 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -107,6 +107,11 @@ var storageParamExpectedTypes = map[string]storageParamType{ `user_catalog_table`: storageParamUnimplemented, } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE TABLE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createTableNode) ReadingOwnWrites() {} + func (n *createTableNode) startExec(params runParams) error { isTemporary := n.n.Temporary @@ -267,96 +272,107 @@ func (n *createTableNode) startExec(params runParams) error { // If we are in an explicit txn or the source has placeholders, we execute the // CTAS query synchronously. if n.n.As() && !params.p.ExtendedEvalContext().TxnImplicit { - // This is a very simplified version of the INSERT logic: no CHECK - // expressions, no FK checks, no arbitrary insertion order, no - // RETURNING, etc. - - // Instantiate a row inserter and table writer. It has a 1-1 - // mapping to the definitions in the descriptor. - ri, err := row.MakeInserter( - params.p.txn, - sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()), - desc.Columns, - row.SkipFKs, - nil, /* fkTables */ - ¶ms.p.alloc) - if err != nil { - return err - } - ti := tableInserterPool.Get().(*tableInserter) - *ti = tableInserter{ri: ri} - tw := tableWriter(ti) - if n.run.autoCommit == autoCommitEnabled { - tw.enableAutoCommit() - } - defer func() { - tw.close(params.ctx) - *ti = tableInserter{} - tableInserterPool.Put(ti) - }() - if err := tw.init(params.p.txn, params.p.EvalContext()); err != nil { - return err - } - - // Prepare the buffer for row values. At this point, one more column has - // been added by ensurePrimaryKey() to the list of columns in sourcePlan, if - // a PRIMARY KEY is not specified by the user. - rowBuffer := make(tree.Datums, len(desc.Columns)) - pkColIdx := len(desc.Columns) - 1 - - // The optimizer includes the rowID expression as part of the input - // expression. But the heuristic planner does not do this, so construct - // a rowID expression to be evaluated separately. - var defTypedExpr tree.TypedExpr - if n.run.synthRowID { - // Prepare the rowID expression. - defExprSQL := *desc.Columns[pkColIdx].DefaultExpr - defExpr, err := parser.ParseExpr(defExprSQL) - if err != nil { - return err - } - defTypedExpr, err = params.p.analyzeExpr( + err = func() error { + // The data fill portion of CREATE AS must operate on a read snapshot, + // so that it doesn't end up observing its own writes. + prevMode := params.p.Txn().ConfigureStepping(params.ctx, client.SteppingEnabled) + defer func() { _ = params.p.Txn().ConfigureStepping(params.ctx, prevMode) }() + + // This is a very simplified version of the INSERT logic: no CHECK + // expressions, no FK checks, no arbitrary insertion order, no + // RETURNING, etc. + + // Instantiate a row inserter and table writer. It has a 1-1 + // mapping to the definitions in the descriptor. + ri, err := row.MakeInserter( params.ctx, - defExpr, - nil, /*sources*/ - tree.IndexedVarHelper{}, - types.Any, - false, /*requireType*/ - "CREATE TABLE AS") + params.p.txn, + sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()), + desc.Columns, + row.SkipFKs, + nil, /* fkTables */ + ¶ms.p.alloc) if err != nil { return err } - } - - for { - if err := params.p.cancelChecker.Check(); err != nil { + ti := tableInserterPool.Get().(*tableInserter) + *ti = tableInserter{ri: ri} + tw := tableWriter(ti) + if n.run.autoCommit == autoCommitEnabled { + tw.enableAutoCommit() + } + defer func() { + tw.close(params.ctx) + *ti = tableInserter{} + tableInserterPool.Put(ti) + }() + if err := tw.init(params.p.txn, params.p.EvalContext()); err != nil { return err } - if next, err := n.sourcePlan.Next(params); !next { + + // Prepare the buffer for row values. At this point, one more column has + // been added by ensurePrimaryKey() to the list of columns in sourcePlan, if + // a PRIMARY KEY is not specified by the user. + rowBuffer := make(tree.Datums, len(desc.Columns)) + pkColIdx := len(desc.Columns) - 1 + + // The optimizer includes the rowID expression as part of the input + // expression. But the heuristic planner does not do this, so construct + // a rowID expression to be evaluated separately. + var defTypedExpr tree.TypedExpr + if n.run.synthRowID { + // Prepare the rowID expression. + defExprSQL := *desc.Columns[pkColIdx].DefaultExpr + defExpr, err := parser.ParseExpr(defExprSQL) if err != nil { return err } - _, err := tw.finalize( - params.ctx, params.extendedEvalCtx.Tracing.KVTracingEnabled()) + defTypedExpr, err = params.p.analyzeExpr( + params.ctx, + defExpr, + nil, /*sources*/ + tree.IndexedVarHelper{}, + types.Any, + false, /*requireType*/ + "CREATE TABLE AS") if err != nil { return err } - break } - // Populate the buffer and generate the PK value. - copy(rowBuffer, n.sourcePlan.Values()) - if n.run.synthRowID { - rowBuffer[pkColIdx], err = defTypedExpr.Eval(params.p.EvalContext()) - if err != nil { + for { + if err := params.p.cancelChecker.Check(); err != nil { return err } - } + if next, err := n.sourcePlan.Next(params); !next { + if err != nil { + return err + } + _, err := tw.finalize( + params.ctx, params.extendedEvalCtx.Tracing.KVTracingEnabled()) + if err != nil { + return err + } + break + } - err := tw.row(params.ctx, rowBuffer, params.extendedEvalCtx.Tracing.KVTracingEnabled()) - if err != nil { - return err + // Populate the buffer and generate the PK value. + copy(rowBuffer, n.sourcePlan.Values()) + if n.run.synthRowID { + rowBuffer[pkColIdx], err = defTypedExpr.Eval(params.p.EvalContext()) + if err != nil { + return err + } + } + + if err := tw.row(params.ctx, rowBuffer, params.extendedEvalCtx.Tracing.KVTracingEnabled()); err != nil { + return err + } } + return nil + }() + if err != nil { + return err } } @@ -1366,6 +1382,7 @@ func MakeTableDesc( return desc, errors.Errorf("unsupported table def: %T", def) } } + // Now that we have all the other columns set up, we can validate // any computed columns. for _, def := range n.Defs { diff --git a/pkg/sql/create_view.go b/pkg/sql/create_view.go index 435984c79555..bb424cfbbe0f 100644 --- a/pkg/sql/create_view.go +++ b/pkg/sql/create_view.go @@ -37,6 +37,11 @@ type createViewNode struct { planDeps planDependencies } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE VIEW performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createViewNode) ReadingOwnWrites() {} + func (n *createViewNode) startExec(params runParams) error { // TODO(arul): Allow temporary views once temp tables work for regular tables. if n.temporary { diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index ea82b115b397..6b7407426aab 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -988,6 +988,13 @@ func (dsp *DistSQLPlanner) PlanAndRunPostqueries( maybeDistribute bool, ) bool { for _, postqueryPlan := range postqueryPlans { + // We place a sequence point before every postquery, so + // that each subsequent postquery can observe the writes + // by the previous step. + if err := planner.Txn().Step(ctx); err != nil { + recv.SetError(err) + return false + } if err := dsp.planAndRunPostquery( ctx, postqueryPlan, diff --git a/pkg/sql/drop_index.go b/pkg/sql/drop_index.go index d8c62e714a36..a69e22fdb854 100644 --- a/pkg/sql/drop_index.go +++ b/pkg/sql/drop_index.go @@ -59,6 +59,11 @@ func (p *planner) DropIndex(ctx context.Context, n *tree.DropIndex) (planNode, e return &dropIndexNode{n: n, idxNames: idxNames}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP INDEX performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropIndexNode) ReadingOwnWrites() {} + func (n *dropIndexNode) startExec(params runParams) error { ctx := params.ctx for _, index := range n.idxNames { diff --git a/pkg/sql/drop_sequence.go b/pkg/sql/drop_sequence.go index bf047c881a78..fd7760bda4e7 100644 --- a/pkg/sql/drop_sequence.go +++ b/pkg/sql/drop_sequence.go @@ -55,6 +55,11 @@ func (p *planner) DropSequence(ctx context.Context, n *tree.DropSequence) (planN }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP SEQUENCE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropSequenceNode) ReadingOwnWrites() {} + func (n *dropSequenceNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 0ad565c3a336..8c1efbc45128 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -97,6 +97,11 @@ func (p *planner) DropTable(ctx context.Context, n *tree.DropTable) (planNode, e return &dropTableNode{n: n, td: td}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP TABLE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropTableNode) ReadingOwnWrites() {} + func (n *dropTableNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { diff --git a/pkg/sql/drop_view.go b/pkg/sql/drop_view.go index 099ec9fc64f1..85d4af2e028e 100644 --- a/pkg/sql/drop_view.go +++ b/pkg/sql/drop_view.go @@ -69,6 +69,11 @@ func (p *planner) DropView(ctx context.Context, n *tree.DropView) (planNode, err return &dropViewNode{n: n, td: td}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP VIEW performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropViewNode) ReadingOwnWrites() {} + func (n *dropViewNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { diff --git a/pkg/sql/grant_revoke.go b/pkg/sql/grant_revoke.go index 778fde0d88fa..396de948d427 100644 --- a/pkg/sql/grant_revoke.go +++ b/pkg/sql/grant_revoke.go @@ -63,6 +63,11 @@ type changePrivilegesNode struct { changePrivilege func(*sqlbase.PrivilegeDescriptor, string) } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because GRANT/REVOKE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *changePrivilegesNode) ReadingOwnWrites() {} + func (n *changePrivilegesNode) startExec(params runParams) error { ctx := params.ctx p := params.p diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 2a4d0b1f0d9b..e42beeba92bf 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -177,7 +177,7 @@ func (r *insertRun) processSourceRow(params runParams, rowVals tree.Datums) erro // the insert operation (including secondary index updates, FK // cascading updates, etc), before the current KV batch is executed // and a new batch is started. -const maxInsertBatchSize = 10000 +var maxInsertBatchSize = 10000 func (n *insertNode) startExec(params runParams) error { if err := params.p.maybeSetSystemConfig(n.run.ti.tableDesc().GetID()); err != nil { @@ -294,3 +294,10 @@ func (n *insertNode) Close(ctx context.Context) { func (n *insertNode) enableAutoCommit() { n.run.ti.enableAutoCommit() } + +// TestingSetInsertBatchSize exports a constant for testing only. +func TestingSetInsertBatchSize(val int) func() { + oldVal := maxInsertBatchSize + maxInsertBatchSize = val + return func() { maxInsertBatchSize = oldVal } +} diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index d69bd96e30c7..844ec4996106 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -31,9 +31,13 @@ var insertFastPathNodePool = sync.Pool{ }, } -// Check that exec.InsertFastPathMaxRows does not exceed maxInsertBatchSize -// (this is a compile error if the value is negative). -const _ uint = maxInsertBatchSize - exec.InsertFastPathMaxRows +// Check that exec.InsertFastPathMaxRows does not exceed the default +// maxInsertBatchSize. +func init() { + if maxInsertBatchSize < exec.InsertFastPathMaxRows { + panic("decrease exec.InsertFastPathMaxRows") + } +} // insertFastPathNode is a faster implementation of inserting values in a table // and performing FK checks. It is used when all the foreign key checks can be diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 848b779276a5..b28d57263916 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1771,7 +1771,7 @@ func (t *logicTest) processSubtest( return errors.Errorf("kv-batch-size needs an integer argument; %s", err) } t.outf("Setting kv batch size %d", batchSize) - defer row.SetKVBatchSize(int64(batchSize))() + defer row.TestingSetKVBatchSize(int64(batchSize))() default: return errors.Errorf("%s:%d: unknown command: %s", diff --git a/pkg/sql/logictest/testdata/logic_test/statement_source b/pkg/sql/logictest/testdata/logic_test/statement_source index 04ac9c3a1dfb..c26c7c210f14 100644 --- a/pkg/sql/logictest/testdata/logic_test/statement_source +++ b/pkg/sql/logictest/testdata/logic_test/statement_source @@ -72,10 +72,10 @@ SELECT * FROM a ORDER BY b # Regression for #30936: ensure that wrapped planNodes with non-needed columns work ok statement ok -CREATE TABLE b (a int, b int) +CREATE TABLE b (a int, b int); query II -SELECT * FROM b WHERE EXISTS (SELECT * FROM [INSERT INTO b VALUES (1,2) RETURNING a,b]); +SELECT * FROM (VALUES (1, 2)) WHERE EXISTS (SELECT * FROM [INSERT INTO b VALUES (1,2) RETURNING a,b]); ---- 1 2 diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 21bbd02808fa..a7d69829610b 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1200,6 +1200,8 @@ func (ef *execFactory) ConstructInsert( allowAutoCommit bool, skipFKChecks bool, ) (exec.Node, error) { + ctx := ef.planner.extendedEvalCtx.Context + // Derive insert table and column descriptors. rowsNeeded := !returnColOrdSet.Empty() tabDesc := table.(*optTable).desc @@ -1218,7 +1220,7 @@ func (ef *execFactory) ConstructInsert( } // Create the table inserter, which does the bulk of the work. ri, err := row.MakeInserter( - ef.planner.txn, tabDesc, colDescs, checkFKs, fkTables, &ef.planner.alloc, + ctx, ef.planner.txn, tabDesc, colDescs, checkFKs, fkTables, &ef.planner.alloc, ) if err != nil { return nil, err @@ -1270,6 +1272,8 @@ func (ef *execFactory) ConstructInsertFastPath( checkOrdSet exec.CheckOrdinalSet, fkChecks []exec.InsertFastPathFKCheck, ) (exec.Node, error) { + ctx := ef.planner.extendedEvalCtx.Context + // Derive insert table and column descriptors. rowsNeeded := !returnColOrdSet.Empty() tabDesc := table.(*optTable).desc @@ -1277,7 +1281,7 @@ func (ef *execFactory) ConstructInsertFastPath( // Create the table inserter, which does the bulk of the work. ri, err := row.MakeInserter( - ef.planner.txn, tabDesc, colDescs, row.SkipFKs, nil /* fkTables */, &ef.planner.alloc, + ctx, ef.planner.txn, tabDesc, colDescs, row.SkipFKs, nil /* fkTables */, &ef.planner.alloc, ) if err != nil { return nil, err @@ -1345,6 +1349,8 @@ func (ef *execFactory) ConstructUpdate( allowAutoCommit bool, skipFKChecks bool, ) (exec.Node, error) { + ctx := ef.planner.extendedEvalCtx.Context + // Derive table and column descriptors. rowsNeeded := !returnColOrdSet.Empty() tabDesc := table.(*optTable).desc @@ -1376,6 +1382,7 @@ func (ef *execFactory) ConstructUpdate( // CBO will have already determined the set of fetch and update columns, and // passes those sets into the updater (which will basically be a no-op). ru, err := row.MakeUpdater( + ctx, ef.planner.txn, tabDesc, fkTables, @@ -1494,6 +1501,8 @@ func (ef *execFactory) ConstructUpsert( checks exec.CheckOrdinalSet, allowAutoCommit bool, ) (exec.Node, error) { + ctx := ef.planner.extendedEvalCtx.Context + // Derive table and column descriptors. rowsNeeded := !returnColOrdSet.Empty() tabDesc := table.(*optTable).desc @@ -1509,7 +1518,7 @@ func (ef *execFactory) ConstructUpsert( // Create the table inserter, which does the bulk of the insert-related work. ri, err := row.MakeInserter( - ef.planner.txn, tabDesc, insertColDescs, row.CheckFKs, fkTables, &ef.planner.alloc, + ctx, ef.planner.txn, tabDesc, insertColDescs, row.CheckFKs, fkTables, &ef.planner.alloc, ) if err != nil { return nil, err @@ -1521,6 +1530,7 @@ func (ef *execFactory) ConstructUpsert( // columns, and passes those sets into the updater (which will basically be a // no-op). ru, err := row.MakeUpdater( + ctx, ef.planner.txn, tabDesc, fkTables, @@ -1603,6 +1613,8 @@ func (ef *execFactory) ConstructDelete( allowAutoCommit bool, skipFKChecks bool, ) (exec.Node, error) { + ctx := ef.planner.extendedEvalCtx.Context + // Derive table and column descriptors. rowsNeeded := !returnColOrdSet.Empty() tabDesc := table.(*optTable).desc @@ -1629,6 +1641,7 @@ func (ef *execFactory) ConstructDelete( // CBO will have already determined the set of fetch columns, and passes // those sets into the deleter (which will basically be a no-op). rd, err := row.MakeDeleter( + ctx, ef.planner.txn, tabDesc, fkTables, diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 6455d659c7cd..09e241df09be 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -13,6 +13,7 @@ package sql import ( "context" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -130,6 +131,25 @@ type planNodeFastPath interface { FastPathResults() (int, bool) } +// planNodeReadingOwnWrites can be implemented by planNodes which do +// not use the standard SQL principle of reading at the snapshot +// established at the start of the transaction. It requests that +// the top-level (shared) `startExec` function disable stepping +// mode for the duration of the node's `startExec()` call. +// +// This done e.g. for most DDL statements that perform multiple KV +// operations on descriptors, expecting to read their own writes. +// +// Note that only `startExec()` runs with the modified stepping mode, +// not the `Next()` methods. This interface (and the idea of +// temporarily disabling stepping mode) is neither sensical nor +// applicable to planNodes whose execution is interleaved with +// that of others. +type planNodeReadingOwnWrites interface { + // ReadingOwnWrites is a marker interface. + ReadingOwnWrites() +} + var _ planNode = &alterIndexNode{} var _ planNode = &alterSequenceNode{} var _ planNode = &alterTableNode{} @@ -207,6 +227,16 @@ var _ planNodeFastPath = &serializeNode{} var _ planNodeFastPath = &setZoneConfigNode{} var _ planNodeFastPath = &controlJobsNode{} +var _ planNodeReadingOwnWrites = &alterIndexNode{} +var _ planNodeReadingOwnWrites = &alterSequenceNode{} +var _ planNodeReadingOwnWrites = &alterTableNode{} +var _ planNodeReadingOwnWrites = &createIndexNode{} +var _ planNodeReadingOwnWrites = &createSequenceNode{} +var _ planNodeReadingOwnWrites = &createTableNode{} +var _ planNodeReadingOwnWrites = &createViewNode{} +var _ planNodeReadingOwnWrites = &changePrivilegesNode{} +var _ planNodeReadingOwnWrites = &setZoneConfigNode{} + // planNodeRequireSpool serves as marker for nodes whose parent must // ensure that the node is fully run to completion (and the results // spooled) during the start phase. This is currently implemented by @@ -318,6 +348,11 @@ func (p *planTop) close(ctx context.Context) { // startExec calls startExec() on each planNode using a depth-first, post-order // traversal. The subqueries, if any, are also started. // +// If the planNode also implements the nodeReadingOwnWrites interface, +// the txn is temporarily reconfigured to use read-your-own-writes for +// the duration of the call to startExec. This is used e.g. by +// DDL statements. +// // Reminder: walkPlan() ensures that subqueries and sub-plans are // started before startExec() is called. func startExec(params runParams, plan planNode) error { @@ -333,7 +368,11 @@ func startExec(params runParams, plan planNode) error { } return true, nil }, - leaveNode: func(_ string, n planNode) error { + leaveNode: func(_ string, n planNode) (err error) { + if _, ok := n.(planNodeReadingOwnWrites); ok { + prevMode := params.p.Txn().ConfigureStepping(params.ctx, client.SteppingDisabled) + defer func() { _ = params.p.Txn().ConfigureStepping(params.ctx, prevMode) }() + } return n.startExec(params) }, } diff --git a/pkg/sql/rename_column.go b/pkg/sql/rename_column.go index 61d177a2d055..cbeeb842d4d5 100644 --- a/pkg/sql/rename_column.go +++ b/pkg/sql/rename_column.go @@ -50,6 +50,11 @@ func (p *planner) RenameColumn(ctx context.Context, n *tree.RenameColumn) (planN return &renameColumnNode{n: n, tableDesc: tableDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME COLUMN performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameColumnNode) ReadingOwnWrites() {} + func (n *renameColumnNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/rename_database.go b/pkg/sql/rename_database.go index d41195a7d4f3..1fb6aa382b30 100644 --- a/pkg/sql/rename_database.go +++ b/pkg/sql/rename_database.go @@ -63,6 +63,11 @@ func (p *planner) RenameDatabase(ctx context.Context, n *tree.RenameDatabase) (p }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME DATABASE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameDatabaseNode) ReadingOwnWrites() {} + func (n *renameDatabaseNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/rename_index.go b/pkg/sql/rename_index.go index ae6b1f389394..afd4f4398ab3 100644 --- a/pkg/sql/rename_index.go +++ b/pkg/sql/rename_index.go @@ -60,6 +60,11 @@ func (p *planner) RenameIndex(ctx context.Context, n *tree.RenameIndex) (planNod return &renameIndexNode{n: n, idx: idx, tableDesc: tableDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME DATABASE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameIndexNode) ReadingOwnWrites() {} + func (n *renameIndexNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/rename_table.go b/pkg/sql/rename_table.go index f26b451a1efa..5bd06c7f8530 100644 --- a/pkg/sql/rename_table.go +++ b/pkg/sql/rename_table.go @@ -71,6 +71,11 @@ func (p *planner) RenameTable(ctx context.Context, n *tree.RenameTable) (planNod return &renameTableNode{n: n, oldTn: &oldTn, newTn: &newTn, tableDesc: tableDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME DATABASE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameTableNode) ReadingOwnWrites() {} + func (n *renameTableNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/row/cascader.go b/pkg/sql/row/cascader.go index 2d011f47ed2d..3bc954ec5364 100644 --- a/pkg/sql/row/cascader.go +++ b/pkg/sql/row/cascader.go @@ -51,6 +51,7 @@ type cascader struct { // makeDeleteCascader only creates a cascader if there is a chance that there is // a possible cascade. It returns a cascader if one is required and nil if not. func makeDeleteCascader( + ctx context.Context, txn *client.Txn, table *sqlbase.ImmutableTableDescriptor, tablesByID FkTableMetadata, @@ -86,6 +87,27 @@ func makeDeleteCascader( if !required { return nil, nil } + + // TODO(knz,radu): FK cascading actions need to see the writes + // performed by the mutation. Moreover, each stage of the cascading + // actions must observe the writes from the previous stages but not + // its own writes. + // + // In order to make this true, we need to split the cascading + // actions into separate sequencing steps, and have the first + // cascading action happen no early than the end of all the + // "main" part of the statement. Unfortunately, the organization + // of the code does not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + _ = txn.ConfigureStepping(ctx, client.SteppingDisabled) + return &cascader{ txn: txn, fkTables: tablesByID, @@ -105,6 +127,7 @@ func makeDeleteCascader( // makeUpdateCascader only creates a cascader if there is a chance that there is // a possible cascade. It returns a cascader if one is required and nil if not. func makeUpdateCascader( + ctx context.Context, txn *client.Txn, table *sqlbase.ImmutableTableDescriptor, tablesByID FkTableMetadata, @@ -155,6 +178,27 @@ func makeUpdateCascader( if !required { return nil, nil } + + // TODO(knz,radu): FK cascading actions need to see the writes + // performed by the mutation. Moreover, each stage of the cascading + // actions must observe the writes from the previous stages but not + // its own writes. + // + // In order to make this true, we need to split the cascading + // actions into separate sequencing steps, and have the first + // cascading action happen no early than the end of all the + // "main" part of the statement. Unfortunately, the organization + // of the code does not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + _ = txn.ConfigureStepping(ctx, client.SteppingDisabled) + return &cascader{ txn: txn, fkTables: tablesByID, @@ -391,7 +435,7 @@ func (c *cascader) addIndexPKRowFetcher( // addRowDeleter creates the row deleter and primary index row fetcher. func (c *cascader) addRowDeleter( - table *sqlbase.ImmutableTableDescriptor, + ctx context.Context, table *sqlbase.ImmutableTableDescriptor, ) (Deleter, Fetcher, error) { // Is there a cached row fetcher and deleter? if rowDeleter, exists := c.rowDeleters[table.ID]; exists { @@ -407,6 +451,7 @@ func (c *cascader) addRowDeleter( // Create the row deleter. The row deleter is needed prior to the row fetcher // as it will dictate what columns are required in the row fetcher. rowDeleter, err := makeRowDeleterWithoutCascader( + ctx, c.txn, table, c.fkTables, @@ -449,7 +494,7 @@ func (c *cascader) addRowDeleter( // addRowUpdater creates the row updater and primary index row fetcher. func (c *cascader) addRowUpdater( - table *sqlbase.ImmutableTableDescriptor, + ctx context.Context, table *sqlbase.ImmutableTableDescriptor, ) (Updater, Fetcher, error) { // Is there a cached updater? rowUpdater, existsUpdater := c.rowUpdaters[table.ID] @@ -466,6 +511,7 @@ func (c *cascader) addRowUpdater( // Create the row updater. The row updater requires all the columns in the // table. rowUpdater, err := makeUpdaterWithoutCascader( + ctx, c.txn, table, c.fkTables, @@ -585,7 +631,7 @@ func (c *cascader) deleteRows( } // Create or retrieve the row deleter and primary index row fetcher. - rowDeleter, pkRowFetcher, err := c.addRowDeleter(referencingTable) + rowDeleter, pkRowFetcher, err := c.addRowDeleter(ctx, referencingTable) if err != nil { return nil, nil, 0, err } @@ -683,7 +729,7 @@ func (c *cascader) updateRows( } // Create or retrieve the row updater and row fetcher. - rowUpdater, rowFetcher, err := c.addRowUpdater(referencingTable) + rowUpdater, rowFetcher, err := c.addRowUpdater(ctx, referencingTable) if err != nil { return nil, nil, nil, 0, err } diff --git a/pkg/sql/row/deleter.go b/pkg/sql/row/deleter.go index 5bc83e05fbff..2c6317e23527 100644 --- a/pkg/sql/row/deleter.go +++ b/pkg/sql/row/deleter.go @@ -38,6 +38,7 @@ type Deleter struct { // expectation of which values are passed as values to DeleteRow. Any column // passed in requestedCols will be included in FetchCols. func MakeDeleter( + ctx context.Context, txn *client.Txn, tableDesc *sqlbase.ImmutableTableDescriptor, fkTables FkTableMetadata, @@ -47,14 +48,14 @@ func MakeDeleter( alloc *sqlbase.DatumAlloc, ) (Deleter, error) { rowDeleter, err := makeRowDeleterWithoutCascader( - txn, tableDesc, fkTables, requestedCols, checkFKs, alloc, + ctx, txn, tableDesc, fkTables, requestedCols, checkFKs, alloc, ) if err != nil { return Deleter{}, err } if checkFKs == CheckFKs { var err error - rowDeleter.cascader, err = makeDeleteCascader(txn, tableDesc, fkTables, evalCtx, alloc) + rowDeleter.cascader, err = makeDeleteCascader(ctx, txn, tableDesc, fkTables, evalCtx, alloc) if err != nil { return Deleter{}, err } @@ -65,6 +66,7 @@ func MakeDeleter( // makeRowDeleterWithoutCascader creates a rowDeleter but does not create an // additional cascader. func makeRowDeleterWithoutCascader( + ctx context.Context, txn *client.Txn, tableDesc *sqlbase.ImmutableTableDescriptor, fkTables FkTableMetadata, @@ -114,7 +116,7 @@ func makeRowDeleterWithoutCascader( } if checkFKs == CheckFKs { var err error - if rd.Fks, err = makeFkExistenceCheckHelperForDelete(txn, tableDesc, fkTables, + if rd.Fks, err = makeFkExistenceCheckHelperForDelete(ctx, txn, tableDesc, fkTables, fetchColIDtoRowIndex, alloc); err != nil { return Deleter{}, err } diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index c6a7fda7d22d..b916f826804d 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -503,7 +503,7 @@ func (rf *Fetcher) StartInconsistentScan( maxTimestampAge, ) } - txn := client.NewTxn(ctx, db, 0 /* gatewayNodeID */) + txn := client.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */) txn.SetFixedTimestamp(ctx, txnTimestamp) if log.V(1) { log.Infof(ctx, "starting inconsistent scan at timestamp %v", txnTimestamp) @@ -518,7 +518,7 @@ func (rf *Fetcher) StartInconsistentScan( // Advance the timestamp by the time that passed. txnTimestamp = txnTimestamp.Add(now.Sub(txnStartTime).Nanoseconds(), 0 /* logical */) txnStartTime = now - txn = client.NewTxn(ctx, db, 0 /* gatewayNodeID */) + txn = client.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */) txn.SetFixedTimestamp(ctx, txnTimestamp) if log.V(1) { diff --git a/pkg/sql/row/fk_existence_delete.go b/pkg/sql/row/fk_existence_delete.go index d88ea885c8ba..64569f79971b 100644 --- a/pkg/sql/row/fk_existence_delete.go +++ b/pkg/sql/row/fk_existence_delete.go @@ -34,6 +34,7 @@ type fkExistenceCheckForDelete struct { // makeFkExistenceCheckHelperForDelete instantiates a delete helper. func makeFkExistenceCheckHelperForDelete( + ctx context.Context, txn *client.Txn, table *sqlbase.ImmutableTableDescriptor, otherTables FkTableMetadata, @@ -96,6 +97,26 @@ func makeFkExistenceCheckHelperForDelete( h.fks[mutatedIdx.ID] = append(h.fks[mutatedIdx.ID], fk) } + if len(h.fks) > 0 { + // TODO(knz,radu): FK existence checks need to see the writes + // performed by the mutation. + // + // In order to make this true, we need to split the existence + // checks into a separate sequencing step, and have the first + // check happen no early than the end of all the "main" part of + // the statement. Unfortunately, the organization of the code does + // not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + _ = txn.ConfigureStepping(ctx, client.SteppingDisabled) + } + return h, nil } diff --git a/pkg/sql/row/fk_existence_insert.go b/pkg/sql/row/fk_existence_insert.go index 11f975e50fe1..4b0f6a4a1180 100644 --- a/pkg/sql/row/fk_existence_insert.go +++ b/pkg/sql/row/fk_existence_insert.go @@ -43,6 +43,7 @@ type fkExistenceCheckForInsert struct { // makeFkExistenceCheckHelperForInsert instantiates an insert helper. func makeFkExistenceCheckHelperForInsert( + ctx context.Context, txn *client.Txn, table *sqlbase.ImmutableTableDescriptor, otherTables FkTableMetadata, @@ -87,6 +88,26 @@ func makeFkExistenceCheckHelperForInsert( h.fks[mutatedIdx.ID] = append(h.fks[mutatedIdx.ID], fk) } + if len(h.fks) > 0 { + // TODO(knz,radu): FK existence checks need to see the writes + // performed by the mutation. + // + // In order to make this true, we need to split the existence + // checks into a separate sequencing step, and have the first + // check happen no early than the end of all the "main" part of + // the statement. Unfortunately, the organization of the code does + // not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + _ = txn.ConfigureStepping(ctx, client.SteppingDisabled) + } + return h, nil } diff --git a/pkg/sql/row/fk_existence_update.go b/pkg/sql/row/fk_existence_update.go index aef8544121ba..ed7674d06ec7 100644 --- a/pkg/sql/row/fk_existence_update.go +++ b/pkg/sql/row/fk_existence_update.go @@ -62,6 +62,7 @@ type fkExistenceCheckForUpdate struct { // makeFkExistenceCheckHelperForUpdate instantiates an update helper. func makeFkExistenceCheckHelperForUpdate( + ctx context.Context, txn *client.Txn, table *sqlbase.ImmutableTableDescriptor, otherTables FkTableMetadata, @@ -75,13 +76,13 @@ func makeFkExistenceCheckHelperForUpdate( // Instantiate a helper for the referencing tables. var err error - if ret.inbound, err = makeFkExistenceCheckHelperForDelete(txn, table, otherTables, colMap, + if ret.inbound, err = makeFkExistenceCheckHelperForDelete(ctx, txn, table, otherTables, colMap, alloc); err != nil { return ret, err } // Instantiate a helper for the referenced table(s). - ret.outbound, err = makeFkExistenceCheckHelperForInsert(txn, table, otherTables, colMap, alloc) + ret.outbound, err = makeFkExistenceCheckHelperForInsert(ctx, txn, table, otherTables, colMap, alloc) ret.outbound.checker = ret.inbound.checker // We need *some* KV batch checker to perform the checks. It doesn't diff --git a/pkg/sql/row/inserter.go b/pkg/sql/row/inserter.go index 0ca3105648ab..77a5edd62d25 100644 --- a/pkg/sql/row/inserter.go +++ b/pkg/sql/row/inserter.go @@ -40,6 +40,7 @@ type Inserter struct { // // insertCols must contain every column in the primary key. func MakeInserter( + ctx context.Context, txn *client.Txn, tableDesc *sqlbase.ImmutableTableDescriptor, insertCols []sqlbase.ColumnDescriptor, @@ -62,7 +63,7 @@ func MakeInserter( if checkFKs == CheckFKs { var err error - if ri.Fks, err = makeFkExistenceCheckHelperForInsert(txn, tableDesc, fkTables, + if ri.Fks, err = makeFkExistenceCheckHelperForInsert(ctx, txn, tableDesc, fkTables, ri.InsertColIDtoRowIndex, alloc); err != nil { return ri, err } diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 5897ad0d9e19..ff75b011506f 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -28,8 +28,9 @@ import ( // TODO(radu): parameters like this should be configurable var kvBatchSize int64 = 10000 -// SetKVBatchSize changes the kvBatchFetcher batch size, and returns a function that restores it. -func SetKVBatchSize(val int64) func() { +// TestingSetKVBatchSize changes the kvBatchFetcher batch size, and returns a function that restores it. +// This is to be used only in tests - we have no test coverage for arbitrary kv batch sizes at this time. +func TestingSetKVBatchSize(val int64) func() { oldVal := kvBatchSize kvBatchSize = val return func() { kvBatchSize = oldVal } diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 2ea9139aa202..9023499d7378 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -224,6 +224,7 @@ func TestingSetDatumRowConverterBatchSize(newSize int) func() { // NewDatumRowConverter returns an instance of a DatumRowConverter. func NewDatumRowConverter( + ctx context.Context, tableDesc *sqlbase.TableDescriptor, targetColNames tree.NameList, evalCtx *tree.EvalContext, @@ -274,6 +275,7 @@ func NewDatumRowConverter( } ri, err := MakeInserter( + ctx, nil, /* txn */ immutDesc, cols, diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index 6beebd95a5de..5c4c6eb1f7b1 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -72,6 +72,7 @@ const ( // expectation of which values are passed as oldValues to UpdateRow. All the columns // passed in requestedCols will be included in FetchCols at the beginning. func MakeUpdater( + ctx context.Context, txn *client.Txn, tableDesc *sqlbase.ImmutableTableDescriptor, fkTables FkTableMetadata, @@ -83,14 +84,14 @@ func MakeUpdater( alloc *sqlbase.DatumAlloc, ) (Updater, error) { rowUpdater, err := makeUpdaterWithoutCascader( - txn, tableDesc, fkTables, updateCols, requestedCols, updateType, checkFKs, alloc, + ctx, txn, tableDesc, fkTables, updateCols, requestedCols, updateType, checkFKs, alloc, ) if err != nil { return Updater{}, err } if checkFKs == CheckFKs { rowUpdater.cascader, err = makeUpdateCascader( - txn, tableDesc, fkTables, updateCols, evalCtx, alloc, + ctx, txn, tableDesc, fkTables, updateCols, evalCtx, alloc, ) if err != nil { return Updater{}, err @@ -108,6 +109,7 @@ var returnTruePseudoError error = returnTrue{} // makeUpdaterWithoutCascader is the same function as MakeUpdater but does not // create a cascader. func makeUpdaterWithoutCascader( + ctx context.Context, txn *client.Txn, tableDesc *sqlbase.ImmutableTableDescriptor, fkTables FkTableMetadata, @@ -195,14 +197,14 @@ func makeUpdaterWithoutCascader( // them, so request them all. var err error if ru.rd, err = makeRowDeleterWithoutCascader( - txn, tableDesc, fkTables, tableCols, SkipFKs, alloc, + ctx, txn, tableDesc, fkTables, tableCols, SkipFKs, alloc, ); err != nil { return Updater{}, err } ru.FetchCols = ru.rd.FetchCols ru.FetchColIDtoRowIndex = ColIDtoRowIndexFromCols(ru.FetchCols) if ru.ri, err = MakeInserter( - txn, tableDesc, tableCols, SkipFKs, nil /* fkTables */, alloc, + ctx, txn, tableDesc, tableCols, SkipFKs, nil /* fkTables */, alloc, ); err != nil { return Updater{}, err } @@ -277,7 +279,7 @@ func makeUpdaterWithoutCascader( if primaryKeyColChange { updateCols = nil } - if ru.Fks, err = makeFkExistenceCheckHelperForUpdate(txn, tableDesc, fkTables, + if ru.Fks, err = makeFkExistenceCheckHelperForUpdate(ctx, txn, tableDesc, fkTables, updateCols, ru.FetchColIDtoRowIndex, alloc); err != nil { return Updater{}, err } diff --git a/pkg/sql/rowexec/bulk_row_writer.go b/pkg/sql/rowexec/bulk_row_writer.go index 7d82e6d158b7..6fa074978dc8 100644 --- a/pkg/sql/rowexec/bulk_row_writer.go +++ b/pkg/sql/rowexec/bulk_row_writer.go @@ -99,7 +99,8 @@ func (sp *bulkRowWriter) work(ctx context.Context) error { kvCh := make(chan row.KVBatch, 10) var g ctxgroup.Group - conv, err := row.NewDatumRowConverter(&sp.spec.Table, nil /* targetColNames */, sp.EvalCtx, kvCh) + conv, err := row.NewDatumRowConverter(ctx, + &sp.spec.Table, nil /* targetColNames */, sp.EvalCtx, kvCh) if err != nil { return err } diff --git a/pkg/sql/scan_test.go b/pkg/sql/scan_test.go index 261a0002c844..243d50aa207a 100644 --- a/pkg/sql/scan_test.go +++ b/pkg/sql/scan_test.go @@ -125,7 +125,7 @@ func TestScanBatches(t *testing.T) { defer leaktest.AfterTest(t)() // The test will screw around with KVBatchSize; make sure to restore it at the end. - restore := row.SetKVBatchSize(10) + restore := row.TestingSetKVBatchSize(10) defer restore() s, db, _ := serverutils.StartServer( @@ -171,7 +171,7 @@ func TestScanBatches(t *testing.T) { numSpanValues := []int{0, 1, 2, 3} for _, batch := range batchSizes { - row.SetKVBatchSize(int64(batch)) + row.TestingSetKVBatchSize(int64(batch)) for _, numSpans := range numSpanValues { testScanBatchQuery(t, db, numSpans, numAs, numBs, false) testScanBatchQuery(t, db, numSpans, numAs, numBs, true) diff --git a/pkg/sql/set_zone_config.go b/pkg/sql/set_zone_config.go index e5955081111f..46298b97e593 100644 --- a/pkg/sql/set_zone_config.go +++ b/pkg/sql/set_zone_config.go @@ -222,6 +222,11 @@ type setZoneConfigRun struct { numAffected int } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CONFIGURE ZONE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *setZoneConfigNode) ReadingOwnWrites() {} + func (n *setZoneConfigNode) startExec(params runParams) error { var yamlConfig string var setters []func(c *zonepb.ZoneConfig) diff --git a/pkg/sql/stats/create_stats_job_test.go b/pkg/sql/stats/create_stats_job_test.go index 329bee195896..ebdc007e31e4 100644 --- a/pkg/sql/stats/create_stats_job_test.go +++ b/pkg/sql/stats/create_stats_job_test.go @@ -467,7 +467,7 @@ func TestCreateStatsProgress(t *testing.T) { }(rowexec.SamplerProgressInterval) rowexec.SamplerProgressInterval = 10 - resetKVBatchSize := row.SetKVBatchSize(10) + resetKVBatchSize := row.TestingSetKVBatchSize(10) defer resetKVBatchSize() var allowRequest chan struct{} diff --git a/pkg/sql/tablewriter_upsert_opt.go b/pkg/sql/tablewriter_upsert_opt.go index 0cb94e157e5c..0ae66089e3a2 100644 --- a/pkg/sql/tablewriter_upsert_opt.go +++ b/pkg/sql/tablewriter_upsert_opt.go @@ -113,7 +113,9 @@ type optTableUpserter struct { } // init is part of the tableWriter interface. -func (tu *optTableUpserter) init(txn *client.Txn, evalCtx *tree.EvalContext) error { +func (tu *optTableUpserter) init( + ctx context.Context, txn *client.Txn, evalCtx *tree.EvalContext, +) error { tu.tableWriterBase.init(txn) tableDesc := tu.tableDesc() @@ -169,6 +171,7 @@ func (tu *optTableUpserter) init(txn *client.Txn, evalCtx *tree.EvalContext) err var err error tu.ru, err = row.MakeUpdater( + ctx, txn, tu.tableDesc(), tu.fkTables, diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index 2ba65ddbfb74..85838207effa 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -543,6 +543,7 @@ func truncateTableInChunks( } if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { rd, err := row.MakeDeleter( + ctx, txn, sqlbase.NewImmutableTableDescriptor(*tableDesc), nil, diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 059db0425b4f..bf5c768b84a6 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -203,7 +203,7 @@ func (ts *txnState) resetForNewSQLTxn( ts.mon.Start(ts.Ctx, tranCtx.connMon, mon.BoundAccount{} /* reserved */) ts.mu.Lock() if txn == nil { - ts.mu.txn = client.NewTxn(ts.Ctx, tranCtx.db, tranCtx.nodeID) + ts.mu.txn = client.NewTxnWithSteppingEnabled(ts.Ctx, tranCtx.db, tranCtx.nodeID) ts.mu.txn.SetDebugName(opName) } else { ts.mu.txn = txn diff --git a/pkg/sql/upsert.go b/pkg/sql/upsert.go index f47103c426a8..faa60ed62620 100644 --- a/pkg/sql/upsert.go +++ b/pkg/sql/upsert.go @@ -60,7 +60,7 @@ func (n *upsertNode) startExec(params runParams) error { // cache traceKV during execution, to avoid re-evaluating it for every row. n.run.traceKV = params.p.ExtendedEvalContext().Tracing.KVTracingEnabled() - return n.run.tw.init(params.p.txn, params.EvalContext()) + return n.run.tw.init(params.ctx, params.p.txn, params.EvalContext()) } // Next is required because batchedPlanNode inherits from planNode, but