From edaa2ef5274a5f3f4f9909e331cd5c5bd150704a Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Thu, 14 Mar 2024 16:09:16 +0000 Subject: [PATCH] sql: deflake TestTrackOnlyUserOpenTransactionsAndActiveStatements This changes the test to block in AfterExecute rather than OnTxnFinish, which should make the active statements assertion less flaky. It also fixes a bug where the SELECT FOR UPDATE was not in a txn. Release note: None --- pkg/jobs/registry_external_test.go | 2 +- pkg/sql/catalog/lease/lease_test.go | 2 +- pkg/sql/conn_executor_exec.go | 2 +- pkg/sql/conn_executor_test.go | 17 +++++++---------- pkg/sql/exec_util.go | 2 +- pkg/sql/sqlstats/sslocal/sql_stats_test.go | 2 +- 6 files changed, 12 insertions(+), 15 deletions(-) diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index 1290ea3ef564..83e2bb618046 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -747,7 +747,7 @@ func TestWaitWithRetryableError(t *testing.T) { Knobs: base.TestingKnobs{ SQLExecutor: &sql.ExecutorTestingKnobs{ DisableAutoCommitDuringExec: true, - AfterExecute: func(ctx context.Context, stmt string, err error) { + AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) { if targetJobID.Load() > 0 && strings.Contains(stmt, "SELECT count(*) FROM system.jobs") && strings.Contains(stmt, fmt.Sprintf("%d", targetJobID.Load())) { diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 475bcefa41a5..d58c73a8ef1e 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -3201,7 +3201,7 @@ func TestLeaseBulkInsertWithImplicitTxn(t *testing.T) { beforeExecute.Unlock() } }, - AfterExecute: func(ctx context.Context, stmt string, err error) { + AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) { beforeExecute.Lock() if stmt == beforeExecuteResumeStmt { beforeExecute.Unlock() diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 8190de32b352..42f483c20ece 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1994,7 +1994,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( } if ex.server.cfg.TestingKnobs.AfterExecute != nil { - ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err()) + ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), ex.executorType == executorTypeInternal, res.Err()) } if limitsErr := ex.handleTxnRowsWrittenReadLimits(ctx); limitsErr != nil && res.Err() == nil { diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index aa9e144cb0cb..3eaa522aee31 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -41,13 +41,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/pgtest" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/duration" @@ -1657,7 +1655,6 @@ func TestInjectRetryOnCommitErrors(t *testing.T) { func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderStress(t, "slow test") ctx := context.Background() var shouldBlock syncutil.AtomicBool @@ -1665,7 +1662,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { g := ctxgroup.WithContext(ctx) params := base.TestServerArgs{} params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ - OnRecordTxnFinish: func(isInternal bool, _ *sessionphase.Times, _ string) { + AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) { if isInternal && shouldBlock.Get() { <-blockingInternalTxns } @@ -1698,7 +1695,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { prevInternalActiveStatements := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value() // Begin a user-initiated transaction. - testDB.Exec(t, "BEGIN") + testTx := testDB.Begin(t) // Check that the number of open transactions has incremented, but not the // internal metric. @@ -1708,7 +1705,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { // Create a state of contention. Use a cancellable context so that the // other queries that get blocked on this one don't deadlock if the test // aborts. - _, err := sqlDB.ExecContext(ctx, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE") + _, err := testTx.ExecContext(ctx, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE") require.NoError(t, err) // Execute internal statement (this case is identical to opening an internal @@ -1749,8 +1746,8 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { if v := sqlServer.InternalMetrics.EngineMetrics.SQLTxnsOpen.Value(); v <= prevInternalTxnsOpen { return errors.Newf("Wrong InternalSQLTxnsOpen value. Expected: greater than %d. Actual: %d", prevInternalTxnsOpen, v) } - if v := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value(); v != prevInternalActiveStatements+1 { - return errors.Newf("Wrong InternalSQLActiveStatements value. Expected: %d. Actual: %d", prevInternalActiveStatements+1, v) + if v := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value(); v <= prevInternalActiveStatements { + return errors.Newf("Wrong InternalSQLActiveStatements value. Expected: greater than %d. Actual: %d", prevInternalActiveStatements, v) } return nil }) @@ -1758,7 +1755,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value()) require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value()) require.Less(t, prevInternalTxnsOpen, sqlServer.InternalMetrics.EngineMetrics.SQLTxnsOpen.Value()) - require.Equal(t, prevInternalActiveStatements+1, sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value()) + require.Less(t, prevInternalActiveStatements, sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value()) // Create active user-initiated statement. g.GoCtx(func(ctx context.Context) error { @@ -1796,7 +1793,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { // Commit the initial user-initiated transaction. The internal and user // select queries are no longer in contention. - testDB.Exec(t, "COMMIT") + require.NoError(t, testTx.Commit()) }() // Check that both the internal & user statements are no longer active. diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index a80ab7d0321e..3b088aa7d3a2 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1563,7 +1563,7 @@ type ExecutorTestingKnobs struct { // AfterExecute is like StatementFilter, but it runs in the same goroutine of the // statement. - AfterExecute func(ctx context.Context, stmt string, err error) + AfterExecute func(ctx context.Context, stmt string, isInternal bool, err error) // AfterExecCmd is called after successful execution of any command. AfterExecCmd func(ctx context.Context, cmd Command, buf *StmtBuf) diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_test.go b/pkg/sql/sqlstats/sslocal/sql_stats_test.go index 9c8b1871f3ae..9bcd1faba48f 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_test.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_test.go @@ -732,7 +732,7 @@ func TestTransactionServiceLatencyOnExtendedProtocol(t *testing.T) { var params base.TestServerArgs params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ - AfterExecute: func(ctx context.Context, stmt string, err error) { + AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) { if currentTestCaseIdx < len(testData) && testData[currentTestCaseIdx].query == stmt { finishedExecute.Set(true) }