Skip to content

Commit

Permalink
sql: deflake TestTrackOnlyUserOpenTransactionsAndActiveStatements
Browse files Browse the repository at this point in the history
This changes the test to block in AfterExecute rather than OnTxnFinish,
which should make the active statements assertion less flaky.

It also fixes a bug where the SELECT FOR UPDATE was not in a txn.

Release note: None
  • Loading branch information
rafiss committed Mar 15, 2024
1 parent 2242972 commit edaa2ef
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func TestWaitWithRetryableError(t *testing.T) {
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
DisableAutoCommitDuringExec: true,
AfterExecute: func(ctx context.Context, stmt string, err error) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
if targetJobID.Load() > 0 &&
strings.Contains(stmt, "SELECT count(*) FROM system.jobs") &&
strings.Contains(stmt, fmt.Sprintf("%d", targetJobID.Load())) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3201,7 +3201,7 @@ func TestLeaseBulkInsertWithImplicitTxn(t *testing.T) {
beforeExecute.Unlock()
}
},
AfterExecute: func(ctx context.Context, stmt string, err error) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
beforeExecute.Lock()
if stmt == beforeExecuteResumeStmt {
beforeExecute.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1994,7 +1994,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}

if ex.server.cfg.TestingKnobs.AfterExecute != nil {
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err())
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), ex.executorType == executorTypeInternal, res.Err())
}

if limitsErr := ex.handleTxnRowsWrittenReadLimits(ctx); limitsErr != nil && res.Err() == nil {
Expand Down
17 changes: 7 additions & 10 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/pgtest"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/duration"
Expand Down Expand Up @@ -1657,15 +1655,14 @@ func TestInjectRetryOnCommitErrors(t *testing.T) {
func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStress(t, "slow test")

ctx := context.Background()
var shouldBlock syncutil.AtomicBool
blockingInternalTxns := make(chan struct{})
g := ctxgroup.WithContext(ctx)
params := base.TestServerArgs{}
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
OnRecordTxnFinish: func(isInternal bool, _ *sessionphase.Times, _ string) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
if isInternal && shouldBlock.Get() {
<-blockingInternalTxns
}
Expand Down Expand Up @@ -1698,7 +1695,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
prevInternalActiveStatements := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value()

// Begin a user-initiated transaction.
testDB.Exec(t, "BEGIN")
testTx := testDB.Begin(t)

// Check that the number of open transactions has incremented, but not the
// internal metric.
Expand All @@ -1708,7 +1705,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
// Create a state of contention. Use a cancellable context so that the
// other queries that get blocked on this one don't deadlock if the test
// aborts.
_, err := sqlDB.ExecContext(ctx, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE")
_, err := testTx.ExecContext(ctx, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE")
require.NoError(t, err)

// Execute internal statement (this case is identical to opening an internal
Expand Down Expand Up @@ -1749,16 +1746,16 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
if v := sqlServer.InternalMetrics.EngineMetrics.SQLTxnsOpen.Value(); v <= prevInternalTxnsOpen {
return errors.Newf("Wrong InternalSQLTxnsOpen value. Expected: greater than %d. Actual: %d", prevInternalTxnsOpen, v)
}
if v := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value(); v != prevInternalActiveStatements+1 {
return errors.Newf("Wrong InternalSQLActiveStatements value. Expected: %d. Actual: %d", prevInternalActiveStatements+1, v)
if v := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value(); v <= prevInternalActiveStatements {
return errors.Newf("Wrong InternalSQLActiveStatements value. Expected: greater than %d. Actual: %d", prevInternalActiveStatements, v)
}
return nil
})

require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())
require.Less(t, prevInternalTxnsOpen, sqlServer.InternalMetrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, prevInternalActiveStatements+1, sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value())
require.Less(t, prevInternalActiveStatements, sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value())

// Create active user-initiated statement.
g.GoCtx(func(ctx context.Context) error {
Expand Down Expand Up @@ -1796,7 +1793,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {

// Commit the initial user-initiated transaction. The internal and user
// select queries are no longer in contention.
testDB.Exec(t, "COMMIT")
require.NoError(t, testTx.Commit())
}()

// Check that both the internal & user statements are no longer active.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,7 +1563,7 @@ type ExecutorTestingKnobs struct {

// AfterExecute is like StatementFilter, but it runs in the same goroutine of the
// statement.
AfterExecute func(ctx context.Context, stmt string, err error)
AfterExecute func(ctx context.Context, stmt string, isInternal bool, err error)

// AfterExecCmd is called after successful execution of any command.
AfterExecCmd func(ctx context.Context, cmd Command, buf *StmtBuf)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/sslocal/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ func TestTransactionServiceLatencyOnExtendedProtocol(t *testing.T) {

var params base.TestServerArgs
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
AfterExecute: func(ctx context.Context, stmt string, err error) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
if currentTestCaseIdx < len(testData) && testData[currentTestCaseIdx].query == stmt {
finishedExecute.Set(true)
}
Expand Down

0 comments on commit edaa2ef

Please sign in to comment.