Skip to content

Commit

Permalink
sql: make SQL statements operate on a read snapshot
Browse files Browse the repository at this point in the history
Previously, all individual KV reads performed by a SQL statement were
able to observe the most recent KV writes that it performed itself.

This is in violation of PostgreSQL's dialect semantics, which mandate
that statements can only observe data as per a read snapshot taken at
the instant a statement begins execution.

Moreover, this invalid behavior causes a real observable bug: a
statement that reads and writes to the same table may never complete,
as the read part may become able to consume the rows that it itself
writes. Or worse, it could cause logical operations to be performed
multiple times: https://en.wikipedia.org/wiki/Halloween_Problem

This patch fixes it (partially) by exploiting the new KV `Step()` API
which decouples the read and write sequence numbers.

The fix is not complete however; additional sequence points must also
be introduced prior to FK existence checks and cascading actions. See
[cockroachdb#42864](cockroachdb#42864) and
[cockroachdb#33475](cockroachdb#33475) for
details.

For now, this patch excludes any mutation that 1) involves a foreign
key and 2) does not uyse the new CBO-driven FK logic, from the
new (fixed) semantics. When a mutation involves a FK without CBO
involvement, the previous (broken) semantics still apply.

Release note (bug fix): SQL mutation statements that target tables
with no foreign key relationships now correctly read data as per the
state of the database when the statement started execution. This is
required for compatibility with PostgreSQL and to ensure deterministic
behavior when certain operations are parallelized. Prior to this fix,
a statement [could incorrectly operate multiple
times](https://en.wikipedia.org/wiki/Halloween_Problem) on data that
itself was writing, and potentially never terminate. This fix is
limited to tables without FK relationships, and for certain operations
on tables with FK relationships; in other cases, the fix is not
active and the bug is still present. A full fix will be provided
in a later release.
  • Loading branch information
knz committed Jan 20, 2020
1 parent 69e0b0d commit 0a658c1
Show file tree
Hide file tree
Showing 65 changed files with 682 additions and 172 deletions.
15 changes: 9 additions & 6 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) {
}

func makeInputConverter(
spec *execinfrapb.ReadImportDataSpec, evalCtx *tree.EvalContext, kvCh chan row.KVBatch,
ctx context.Context,
spec *execinfrapb.ReadImportDataSpec,
evalCtx *tree.EvalContext,
kvCh chan row.KVBatch,
) (inputConverter, error) {

var singleTable *sqlbase.TableDescriptor
Expand Down Expand Up @@ -139,15 +142,15 @@ func makeInputConverter(
kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism),
singleTable, singleTableTargetCols, evalCtx), nil
case roachpb.IOFileFormat_MysqlOutfile:
return newMysqloutfileReader(kvCh, spec.Format.MysqlOut, singleTable, evalCtx)
return newMysqloutfileReader(ctx, kvCh, spec.Format.MysqlOut, singleTable, evalCtx)
case roachpb.IOFileFormat_Mysqldump:
return newMysqldumpReader(kvCh, spec.Tables, evalCtx)
return newMysqldumpReader(ctx, kvCh, spec.Tables, evalCtx)
case roachpb.IOFileFormat_PgCopy:
return newPgCopyReader(kvCh, spec.Format.PgCopy, singleTable, evalCtx)
return newPgCopyReader(ctx, kvCh, spec.Format.PgCopy, singleTable, evalCtx)
case roachpb.IOFileFormat_PgDump:
return newPgDumpReader(kvCh, spec.Format.PgDump, spec.Tables, evalCtx)
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.Tables, evalCtx)
case roachpb.IOFileFormat_Avro:
return newAvroInputReader(kvCh, singleTable, spec.Format.Avro, evalCtx)
return newAvroInputReader(ctx, kvCh, singleTable, spec.Format.Avro, evalCtx)
default:
return nil, errors.Errorf("Requested IMPORT format (%d) not supported by this node", spec.Format.Format)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(converterSpec, &evalCtx, kvCh)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func Load(
}

ri, err = row.MakeInserter(
nil, tableDesc, tableDesc.Columns, row.SkipFKs, nil /* fkTables */, &sqlbase.DatumAlloc{},
ctx, nil, tableDesc, tableDesc.Columns, row.SkipFKs, nil /* fkTables */, &sqlbase.DatumAlloc{},
)
if err != nil {
return backupccl.BackupDescriptor{}, errors.Wrap(err, "make row inserter")
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/read_import_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,13 @@ type avroInputReader struct {
var _ inputConverter = &avroInputReader{}

func newAvroInputReader(
ctx context.Context,
kvCh chan row.KVBatch,
tableDesc *sqlbase.TableDescriptor,
avro roachpb.AvroOptions,
evalCtx *tree.EvalContext,
) (*avroInputReader, error) {
conv, err := row.NewDatumRowConverter(tableDesc, nil /* targetColNames */, evalCtx, kvCh)
conv, err := row.NewDatumRowConverter(ctx, tableDesc, nil /* targetColNames */, evalCtx, kvCh)

if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (th *testHelper) newRecordStream(
// we're using nil kv channel for this test).
defer row.TestingSetDatumRowConverterBatchSize(numRecords + 1)()
evalCtx := tree.MakeTestingEvalContext(nil)
conv, err := row.NewDatumRowConverter(th.schemaTable, nil, &evalCtx, nil)
conv, err := row.NewDatumRowConverter(context.Background(), th.schemaTable, nil, &evalCtx, nil)
require.NoError(t, err)

opts := roachpb.AvroOptions{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func runImport(
) (*roachpb.BulkOpSummary, error) {
// Used to send ingested import rows to the KV layer.
kvCh := make(chan row.KVBatch, 10)
conv, err := makeInputConverter(spec, flowCtx.NewEvalCtx(), kvCh)
conv, err := makeInputConverter(ctx, spec, flowCtx.NewEvalCtx(), kvCh)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (c *csvInputReader) convertRecordWorker(ctx context.Context, workerID int)
// Create a new evalCtx per converter so each go routine gets its own
// collationenv, which can't be accessed in parallel.
evalCtx := c.evalCtx.Copy()
conv, err := row.NewDatumRowConverter(c.tableDesc, c.targetCols, evalCtx, c.kvCh)
conv, err := row.NewDatumRowConverter(ctx, c.tableDesc, c.targetCols, evalCtx, c.kvCh)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/read_import_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type mysqldumpReader struct {
var _ inputConverter = &mysqldumpReader{}

func newMysqldumpReader(
ctx context.Context,
kvCh chan row.KVBatch,
tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable,
evalCtx *tree.EvalContext,
Expand All @@ -65,7 +66,7 @@ func newMysqldumpReader(
converters[name] = nil
continue
}
conv, err := row.NewDatumRowConverter(table.Desc, nil /* targetColNames */, evalCtx, kvCh)
conv, err := row.NewDatumRowConverter(ctx, table.Desc, nil /* targetColNames */, evalCtx, kvCh)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMysqldumpDataReader(t *testing.T) {
tables := map[string]*execinfrapb.ReadImportDataSpec_ImportTable{"simple": {Desc: table}}

kvCh := make(chan row.KVBatch, 10)
converter, err := newMysqldumpReader(kvCh, tables, testEvalCtx)
converter, err := newMysqldumpReader(ctx, kvCh, tables, testEvalCtx)

if err != nil {
t.Fatal(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/read_import_mysqlout.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ type mysqloutfileReader struct {
var _ inputConverter = &mysqloutfileReader{}

func newMysqloutfileReader(
ctx context.Context,
kvCh chan row.KVBatch,
opts roachpb.MySQLOutfileOptions,
tableDesc *sqlbase.TableDescriptor,
evalCtx *tree.EvalContext,
) (*mysqloutfileReader, error) {
conv, err := row.NewDatumRowConverter(tableDesc, nil /* targetColNames */, evalCtx, kvCh)
conv, err := row.NewDatumRowConverter(ctx, tableDesc, nil /* targetColNames */, evalCtx, kvCh)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/read_import_pgcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ type pgCopyReader struct {
var _ inputConverter = &pgCopyReader{}

func newPgCopyReader(
ctx context.Context,
kvCh chan row.KVBatch,
opts roachpb.PgCopyOptions,
tableDesc *sqlbase.TableDescriptor,
evalCtx *tree.EvalContext,
) (*pgCopyReader, error) {
conv, err := row.NewDatumRowConverter(tableDesc, nil /* targetColNames */, evalCtx, kvCh)
conv, err := row.NewDatumRowConverter(ctx, tableDesc, nil /* targetColNames */, evalCtx, kvCh)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ var _ inputConverter = &pgDumpReader{}

// newPgDumpReader creates a new inputConverter for pg_dump files.
func newPgDumpReader(
ctx context.Context,
kvCh chan row.KVBatch,
opts roachpb.PgDumpOptions,
descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable,
Expand All @@ -417,7 +418,7 @@ func newPgDumpReader(
converters := make(map[string]*row.DatumRowConverter, len(descs))
for name, table := range descs {
if table.Desc.IsTable() {
conv, err := row.NewDatumRowConverter(table.Desc, nil /* targetColNames */, evalCtx, kvCh)
conv, err := row.NewDatumRowConverter(ctx, table.Desc, nil /* targetColNames */, evalCtx, kvCh)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func NewWorkloadKVConverter(
//
// This worker needs its own EvalContext and DatumAlloc.
func (w *WorkloadKVConverter) Worker(ctx context.Context, evalCtx *tree.EvalContext) error {
conv, err := row.NewDatumRowConverter(w.tableDesc, nil /* targetColNames */, evalCtx, w.kvCh)
conv, err := row.NewDatumRowConverter(ctx, w.tableDesc, nil /* targetColNames */, evalCtx, w.kvCh)
if err != nil {
return err
}
Expand Down
19 changes: 16 additions & 3 deletions pkg/internal/client/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,23 @@ func (m *MockTransactionalSender) PrepareRetryableError(ctx context.Context, msg
}

// Step is part of the TxnSender interface.
func (m *MockTransactionalSender) Step() error { panic("unimplemented") }
func (m *MockTransactionalSender) Step(_ context.Context) error {
// At least one test (e.g sql/TestPortalsDestroyedOnTxnFinish) requires
// the ability to run simple statements that do not access storage,
// and that requires a non-panicky Step().
return nil
}

// DisableStepping is part of the TxnSender interface.
func (m *MockTransactionalSender) DisableStepping() error { panic("unimplemented") }
// ConfigureStepping is part of the TxnSender interface.
func (m *MockTransactionalSender) ConfigureStepping(context.Context, SteppingMode) SteppingMode {
// See Step() above.
return SteppingDisabled
}

// GetSteppingMode is part of the TxnSender interface.
func (m *MockTransactionalSender) GetSteppingMode(context.Context) SteppingMode {
return SteppingDisabled
}

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
Expand Down
41 changes: 29 additions & 12 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,24 +239,41 @@ type TxnSender interface {
// operations observe the data at the time the snapshot was
// established and ignore writes performed since.
//
// Before the first step is taken, the transaction operates as if
// there was a step after every write: each read to a key is able to
// see the latest write before it. This makes the step behavior
// opt-in and backward-compatible with existing code which does not
// need it.
// Step() can only be called after stepping mode has been enabled
// using ConfigureStepping(SteppingEnabled).
//
// The method is idempotent.
Step() error
Step(context.Context) error

// DisableStepping disables the sequencing point behavior and
// ensures that every read can read the latest write. The effect
// remains disabled until the next call to Step(). The method is
// idempotent.
// ConfigureStepping sets the sequencing point behavior.
//
// Note that a Sender is initially in the non-stepping mode,
// i.e. uses reads-own-writes by default.
DisableStepping() error
// i.e. uses reads-own-writes by default. This makes the step
// behavior opt-in and backward-compatible with existing code which
// does not need it.
//
// Calling ConfigureStepping(SteppingEnabled) when the stepping mode
// is currently disabled implies calling Step(), for convenience.
ConfigureStepping(ctx context.Context, mode SteppingMode) (prevMode SteppingMode)

// GetSteppingMode accompanies ConfigureStepping. It is provided
// for use in tests and assertion checks.
GetSteppingMode(ctx context.Context) (curMode SteppingMode)
}

// SteppingMode is the argument type to ConfigureStepping.
type SteppingMode bool

const (
// SteppingDisabled is the default mode, where each read can
// observe the latest write.
SteppingDisabled SteppingMode = false

// SteppingEnabled can be set to indicate that read operations
// operate on a snapshot taken at the latest Step() invocation.
SteppingEnabled SteppingMode = true
)

// TxnStatusOpt represents options for TxnSender.GetMeta().
type TxnStatusOpt int

Expand Down
42 changes: 40 additions & 2 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Txn struct {
}

// NewTxn returns a new RootTxn.
// Note: for SQL usage, prefer NewTxnWithSteppingEnabled() below.
//
// If the transaction is used to send any operations, CommitOrCleanup() or
// CleanupOnError() should eventually be called to commit/rollback the
Expand Down Expand Up @@ -109,6 +110,13 @@ func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
return NewTxnFromProto(ctx, db, gatewayNodeID, now, RootTxn, &kvTxn)
}

// NewTxnWithSteppingEnabled is like NewTxn but suitable for use by SQL.
func NewTxnWithSteppingEnabled(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
txn := NewTxn(ctx, db, gatewayNodeID)
_ = txn.ConfigureStepping(ctx, SteppingEnabled)
return txn
}

// NewTxnFromProto is like NewTxn but assumes the Transaction object is already initialized.
// Do not use this directly; use NewTxn() instead.
// This function exists for testing only.
Expand Down Expand Up @@ -766,7 +774,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
}
}

cause := errors.Cause(err)
cause := errors.UnwrapAll(err)

var retryable bool
switch t := cause.(type) {
Expand Down Expand Up @@ -1035,8 +1043,11 @@ func (txn *Txn) replaceRootSenderIfTxnAbortedLocked(
// transaction, even once the proto is reset.
txn.recordPreviousTxnIDLocked(txn.mu.ID)
txn.mu.ID = newTxn.ID
// Create a new txn sender.
// Create a new txn sender. We need to preserve the stepping mode,
// if any.
// prevSteppingMode := txn.mu.sender.GetSteppingMode(ctx)
txn.mu.sender = txn.db.factory.RootTransactionalSender(newTxn, txn.mu.userPriority)
// txn.mu.sender.ConfigureStepping(ctx, prevSteppingMode)
}

func (txn *Txn) recordPreviousTxnIDLocked(prevTxnID uuid.UUID) {
Expand Down Expand Up @@ -1179,3 +1190,30 @@ func (txn *Txn) Active() bool {
defer txn.mu.Unlock()
return txn.mu.sender.Active()
}

// Step performs a sequencing step. Step-wise execution must be
// already enabled.
//
// In step-wise execution, reads operate at a snapshot established at
// the last step, instead of the latest write if not yet enabled.
func (txn *Txn) Step(ctx context.Context) error {
if txn.typ != RootTxn {
return errors.WithContextTags(
errors.AssertionFailedf("txn.Step() only allowed in RootTxn"), ctx)
}
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.Step(ctx)
}

// ConfigureStepping configures step-wise execution in the
// transaction.
func (txn *Txn) ConfigureStepping(ctx context.Context, mode SteppingMode) (prevMode SteppingMode) {
if txn.typ != RootTxn {
panic(errors.WithContextTags(
errors.AssertionFailedf("txn.ConfigureStepping() only allowed in RootTxn"), ctx))
}
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.ConfigureStepping(ctx, mode)
}
33 changes: 25 additions & 8 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,22 +1077,39 @@ func (tc *TxnCoordSender) PrepareRetryableError(ctx context.Context, msg string)
}

// Step is part of the TxnSender interface.
func (tc *TxnCoordSender) Step() error {
func (tc *TxnCoordSender) Step(ctx context.Context) error {
// log.Infof(ctx, "STEP %+v", errors.New("WOOF"))
if tc.typ != client.RootTxn {
return errors.AssertionFailedf("cannot call Step() in leaf txn")
return errors.WithContextTags(
errors.AssertionFailedf("cannot call Step() in leaf txn"), ctx)
}
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.interceptorAlloc.txnSeqNumAllocator.stepLocked()
return tc.interceptorAlloc.txnSeqNumAllocator.stepLocked(ctx)
}

// DisableStepping is part of the TxnSender interface.
func (tc *TxnCoordSender) DisableStepping() error {
// ConfigureStepping is part of the TxnSender interface.
func (tc *TxnCoordSender) ConfigureStepping(
ctx context.Context, mode client.SteppingMode,
) (prevMode client.SteppingMode) {
if tc.typ != client.RootTxn {
return errors.AssertionFailedf("cannot call DisableStepping() in leaf txn")
panic(errors.WithContextTags(
errors.AssertionFailedf("cannot call ConfigureStepping() in leaf txn"), ctx))
}
tc.mu.Lock()
defer tc.mu.Unlock()
tc.interceptorAlloc.txnSeqNumAllocator.disableSteppingLocked()
return nil
prevMode = tc.interceptorAlloc.txnSeqNumAllocator.configureSteppingLocked(mode)
// if mode != prevMode {
// log.Infof(ctx, "CONFIGURE STEPPING: %v -> %v // %+v", prevMode, mode, errors.New("WOOF"))
// }
return prevMode
}

// GetSteppingMode is part of the TxnSender interface.
func (tc *TxnCoordSender) GetSteppingMode(ctx context.Context) (curMode client.SteppingMode) {
curMode = client.SteppingDisabled
if tc.interceptorAlloc.txnSeqNumAllocator.steppingModeEnabled {
curMode = client.SteppingEnabled
}
return curMode
}
Loading

0 comments on commit 0a658c1

Please sign in to comment.