diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index f2c06644e64e..1a5ed255a4fc 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -3447,7 +3447,7 @@ func TestLeaseBulkInsertWithImplicitTxn(t *testing.T) { beforeExecute.Unlock() } }, - AfterExecute: func(ctx context.Context, stmt string, err error) { + AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) { beforeExecute.Lock() if stmt == beforeExecuteResumeStmt { beforeExecute.Unlock() diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 3b399b592bff..6e8de4322c77 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2049,7 +2049,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( } if ex.server.cfg.TestingKnobs.AfterExecute != nil { - ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err()) + ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), ex.executorType == executorTypeInternal, res.Err()) } if limitsErr := ex.handleTxnRowsWrittenReadLimits(ctx); limitsErr != nil && res.Err() == nil { diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index c777b69e494b..d434a0ca90bc 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -42,7 +42,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -1660,7 +1659,6 @@ func TestInjectRetryOnCommitErrors(t *testing.T) { func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderStress(t, "slow test") ctx := context.Background() var shouldBlock syncutil.AtomicBool @@ -1668,7 +1666,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { g := ctxgroup.WithContext(ctx) params := base.TestServerArgs{} params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ - OnRecordTxnFinish: func(isInternal bool, _ *sessionphase.Times, _ string) { + AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) { if isInternal && shouldBlock.Get() { <-blockingInternalTxns } @@ -1701,7 +1699,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { prevInternalActiveStatements := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value() // Begin a user-initiated transaction. - testDB.Exec(t, "BEGIN") + testTx := testDB.Begin(t) // Check that the number of open transactions has incremented, but not the // internal metric. @@ -1711,7 +1709,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { // Create a state of contention. Use a cancellable context so that the // other queries that get blocked on this one don't deadlock if the test // aborts. - _, err := sqlDB.ExecContext(ctx, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE") + _, err := testTx.ExecContext(ctx, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE") require.NoError(t, err) // Execute internal statement (this case is identical to opening an internal @@ -1752,8 +1750,8 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { if v := sqlServer.InternalMetrics.EngineMetrics.SQLTxnsOpen.Value(); v <= prevInternalTxnsOpen { return errors.Newf("Wrong InternalSQLTxnsOpen value. Expected: greater than %d. Actual: %d", prevInternalTxnsOpen, v) } - if v := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value(); v != prevInternalActiveStatements+1 { - return errors.Newf("Wrong InternalSQLActiveStatements value. Expected: %d. Actual: %d", prevInternalActiveStatements+1, v) + if v := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value(); v <= prevInternalActiveStatements { + return errors.Newf("Wrong InternalSQLActiveStatements value. Expected: greater than %d. Actual: %d", prevInternalActiveStatements, v) } return nil }) @@ -1761,7 +1759,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value()) require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value()) require.Less(t, prevInternalTxnsOpen, sqlServer.InternalMetrics.EngineMetrics.SQLTxnsOpen.Value()) - require.Equal(t, prevInternalActiveStatements+1, sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value()) + require.Less(t, prevInternalActiveStatements, sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value()) // Create active user-initiated statement. g.GoCtx(func(ctx context.Context) error { @@ -1799,7 +1797,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) { // Commit the initial user-initiated transaction. The internal and user // select queries are no longer in contention. - testDB.Exec(t, "COMMIT") + require.NoError(t, testTx.Commit()) }() // Check that both the internal & user statements are no longer active. diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 8aa88f3ea039..fe563119b27e 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1521,7 +1521,7 @@ type ExecutorTestingKnobs struct { // AfterExecute is like StatementFilter, but it runs in the same goroutine of the // statement. - AfterExecute func(ctx context.Context, stmt string, err error) + AfterExecute func(ctx context.Context, stmt string, isInternal bool, err error) // AfterExecCmd is called after successful execution of any command. AfterExecCmd func(ctx context.Context, cmd Command, buf *StmtBuf) diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_test.go b/pkg/sql/sqlstats/sslocal/sql_stats_test.go index 81ecdbe0dcb5..7d5ddf775009 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_test.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_test.go @@ -736,7 +736,7 @@ func TestTransactionServiceLatencyOnExtendedProtocol(t *testing.T) { var params base.TestServerArgs params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ - AfterExecute: func(ctx context.Context, stmt string, err error) { + AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) { if currentTestCaseIdx < len(testData) && testData[currentTestCaseIdx].query == stmt { finishedExecute.Set(true) }