Skip to content

Commit

Permalink
Merge pull request #120566 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-120505

release-23.2: sql: deflake TestTrackOnlyUserOpenTransactionsAndActiveStatements
  • Loading branch information
rafiss committed Mar 15, 2024
2 parents 51dee74 + edaa2ef commit e31b35b
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func TestWaitWithRetryableError(t *testing.T) {
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
DisableAutoCommitDuringExec: true,
AfterExecute: func(ctx context.Context, stmt string, err error) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
if targetJobID.Load() > 0 &&
strings.Contains(stmt, "SELECT count(*) FROM system.jobs") &&
strings.Contains(stmt, fmt.Sprintf("%d", targetJobID.Load())) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3201,7 +3201,7 @@ func TestLeaseBulkInsertWithImplicitTxn(t *testing.T) {
beforeExecute.Unlock()
}
},
AfterExecute: func(ctx context.Context, stmt string, err error) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
beforeExecute.Lock()
if stmt == beforeExecuteResumeStmt {
beforeExecute.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1994,7 +1994,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}

if ex.server.cfg.TestingKnobs.AfterExecute != nil {
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err())
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), ex.executorType == executorTypeInternal, res.Err())
}

if limitsErr := ex.handleTxnRowsWrittenReadLimits(ctx); limitsErr != nil && res.Err() == nil {
Expand Down
17 changes: 7 additions & 10 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/pgtest"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/duration"
Expand Down Expand Up @@ -1657,15 +1655,14 @@ func TestInjectRetryOnCommitErrors(t *testing.T) {
func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStress(t, "slow test")

ctx := context.Background()
var shouldBlock syncutil.AtomicBool
blockingInternalTxns := make(chan struct{})
g := ctxgroup.WithContext(ctx)
params := base.TestServerArgs{}
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
OnRecordTxnFinish: func(isInternal bool, _ *sessionphase.Times, _ string) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
if isInternal && shouldBlock.Get() {
<-blockingInternalTxns
}
Expand Down Expand Up @@ -1698,7 +1695,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
prevInternalActiveStatements := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value()

// Begin a user-initiated transaction.
testDB.Exec(t, "BEGIN")
testTx := testDB.Begin(t)

// Check that the number of open transactions has incremented, but not the
// internal metric.
Expand All @@ -1708,7 +1705,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
// Create a state of contention. Use a cancellable context so that the
// other queries that get blocked on this one don't deadlock if the test
// aborts.
_, err := sqlDB.ExecContext(ctx, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE")
_, err := testTx.ExecContext(ctx, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE")
require.NoError(t, err)

// Execute internal statement (this case is identical to opening an internal
Expand Down Expand Up @@ -1749,16 +1746,16 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
if v := sqlServer.InternalMetrics.EngineMetrics.SQLTxnsOpen.Value(); v <= prevInternalTxnsOpen {
return errors.Newf("Wrong InternalSQLTxnsOpen value. Expected: greater than %d. Actual: %d", prevInternalTxnsOpen, v)
}
if v := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value(); v != prevInternalActiveStatements+1 {
return errors.Newf("Wrong InternalSQLActiveStatements value. Expected: %d. Actual: %d", prevInternalActiveStatements+1, v)
if v := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value(); v <= prevInternalActiveStatements {
return errors.Newf("Wrong InternalSQLActiveStatements value. Expected: greater than %d. Actual: %d", prevInternalActiveStatements, v)
}
return nil
})

require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())
require.Less(t, prevInternalTxnsOpen, sqlServer.InternalMetrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, prevInternalActiveStatements+1, sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value())
require.Less(t, prevInternalActiveStatements, sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value())

// Create active user-initiated statement.
g.GoCtx(func(ctx context.Context) error {
Expand Down Expand Up @@ -1796,7 +1793,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {

// Commit the initial user-initiated transaction. The internal and user
// select queries are no longer in contention.
testDB.Exec(t, "COMMIT")
require.NoError(t, testTx.Commit())
}()

// Check that both the internal & user statements are no longer active.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,7 +1563,7 @@ type ExecutorTestingKnobs struct {

// AfterExecute is like StatementFilter, but it runs in the same goroutine of the
// statement.
AfterExecute func(ctx context.Context, stmt string, err error)
AfterExecute func(ctx context.Context, stmt string, isInternal bool, err error)

// AfterExecCmd is called after successful execution of any command.
AfterExecCmd func(ctx context.Context, cmd Command, buf *StmtBuf)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/sslocal/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ func TestTransactionServiceLatencyOnExtendedProtocol(t *testing.T) {

var params base.TestServerArgs
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
AfterExecute: func(ctx context.Context, stmt string, err error) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
if currentTestCaseIdx < len(testData) && testData[currentTestCaseIdx].query == stmt {
finishedExecute.Set(true)
}
Expand Down

0 comments on commit e31b35b

Please sign in to comment.