Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: deflake TestTrackOnlyUserOpenTransactionsAndActiveStatements #120505

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func TestWaitWithRetryableError(t *testing.T) {
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
DisableAutoCommitDuringExec: true,
AfterExecute: func(ctx context.Context, stmt string, err error) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
if targetJobID.Load() > 0 &&
strings.Contains(stmt, "SELECT count(*) FROM system.jobs") &&
strings.Contains(stmt, fmt.Sprintf("%d", targetJobID.Load())) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3447,7 +3447,7 @@ func TestLeaseBulkInsertWithImplicitTxn(t *testing.T) {
beforeExecute.Unlock()
}
},
AfterExecute: func(ctx context.Context, stmt string, err error) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
beforeExecute.Lock()
if stmt == beforeExecuteResumeStmt {
beforeExecute.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2049,7 +2049,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}

if ex.server.cfg.TestingKnobs.AfterExecute != nil {
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err())
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), ex.executorType == executorTypeInternal, res.Err())
}

if limitsErr := ex.handleTxnRowsWrittenReadLimits(ctx); limitsErr != nil && res.Err() == nil {
Expand Down
16 changes: 7 additions & 9 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -1660,15 +1659,14 @@ func TestInjectRetryOnCommitErrors(t *testing.T) {
func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStress(t, "slow test")

ctx := context.Background()
var shouldBlock syncutil.AtomicBool
blockingInternalTxns := make(chan struct{})
g := ctxgroup.WithContext(ctx)
params := base.TestServerArgs{}
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
OnRecordTxnFinish: func(isInternal bool, _ *sessionphase.Times, _ string) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
if isInternal && shouldBlock.Get() {
<-blockingInternalTxns
}
Expand Down Expand Up @@ -1701,7 +1699,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
prevInternalActiveStatements := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value()

// Begin a user-initiated transaction.
testDB.Exec(t, "BEGIN")
testTx := testDB.Begin(t)

// Check that the number of open transactions has incremented, but not the
// internal metric.
Expand All @@ -1711,7 +1709,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
// Create a state of contention. Use a cancellable context so that the
// other queries that get blocked on this one don't deadlock if the test
// aborts.
_, err := sqlDB.ExecContext(ctx, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE")
_, err := testTx.ExecContext(ctx, "SELECT * FROM t.foo WHERE i = 1 FOR UPDATE")
require.NoError(t, err)

// Execute internal statement (this case is identical to opening an internal
Expand Down Expand Up @@ -1752,16 +1750,16 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
if v := sqlServer.InternalMetrics.EngineMetrics.SQLTxnsOpen.Value(); v <= prevInternalTxnsOpen {
return errors.Newf("Wrong InternalSQLTxnsOpen value. Expected: greater than %d. Actual: %d", prevInternalTxnsOpen, v)
}
if v := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value(); v != prevInternalActiveStatements+1 {
return errors.Newf("Wrong InternalSQLActiveStatements value. Expected: %d. Actual: %d", prevInternalActiveStatements+1, v)
if v := sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value(); v <= prevInternalActiveStatements {
return errors.Newf("Wrong InternalSQLActiveStatements value. Expected: greater than %d. Actual: %d", prevInternalActiveStatements, v)
}
return nil
})

require.Equal(t, int64(1), sqlServer.Metrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, int64(0), sqlServer.Metrics.EngineMetrics.SQLActiveStatements.Value())
require.Less(t, prevInternalTxnsOpen, sqlServer.InternalMetrics.EngineMetrics.SQLTxnsOpen.Value())
require.Equal(t, prevInternalActiveStatements+1, sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value())
require.Less(t, prevInternalActiveStatements, sqlServer.InternalMetrics.EngineMetrics.SQLActiveStatements.Value())

// Create active user-initiated statement.
g.GoCtx(func(ctx context.Context) error {
Expand Down Expand Up @@ -1799,7 +1797,7 @@ func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {

// Commit the initial user-initiated transaction. The internal and user
// select queries are no longer in contention.
testDB.Exec(t, "COMMIT")
require.NoError(t, testTx.Commit())
}()

// Check that both the internal & user statements are no longer active.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,7 @@ type ExecutorTestingKnobs struct {

// AfterExecute is like StatementFilter, but it runs in the same goroutine of the
// statement.
AfterExecute func(ctx context.Context, stmt string, err error)
AfterExecute func(ctx context.Context, stmt string, isInternal bool, err error)

// AfterExecCmd is called after successful execution of any command.
AfterExecCmd func(ctx context.Context, cmd Command, buf *StmtBuf)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/sslocal/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func TestTransactionServiceLatencyOnExtendedProtocol(t *testing.T) {

var params base.TestServerArgs
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
AfterExecute: func(ctx context.Context, stmt string, err error) {
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
if currentTestCaseIdx < len(testData) && testData[currentTestCaseIdx].query == stmt {
finishedExecute.Set(true)
}
Expand Down
Loading