diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 6a3562b4df59..5f300d944506 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -595,6 +595,7 @@ ALL_TESTS = [ "//pkg/testutils/lint/passes/timer:timer_test", "//pkg/testutils/lint/passes/unconvert:unconvert_test", "//pkg/testutils/lint:lint_test", + "//pkg/testutils/listenerutil:listenerutil_test", "//pkg/testutils/release:release_test", "//pkg/testutils/sqlutils:sqlutils_test", "//pkg/testutils/testcluster:testcluster_test", @@ -2152,6 +2153,8 @@ GO_TARGETS = [ "//pkg/testutils/lint/passes/unconvert:unconvert_test", "//pkg/testutils/lint:lint", "//pkg/testutils/lint:lint_test", + "//pkg/testutils/listenerutil:listenerutil", + "//pkg/testutils/listenerutil:listenerutil_test", "//pkg/testutils/localtestcluster:localtestcluster", "//pkg/testutils/metrictestutils:metrictestutils", "//pkg/testutils/pgtest:pgtest", diff --git a/pkg/base/BUILD.bazel b/pkg/base/BUILD.bazel index 61501d598f01..3d31cd9a92e1 100644 --- a/pkg/base/BUILD.bazel +++ b/pkg/base/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/security/username", "//pkg/server/autoconfig/acprovider", "//pkg/settings/cluster", + "//pkg/testutils/listenerutil", "//pkg/util", "//pkg/util/envutil", "//pkg/util/humanizeutil", diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 08c874a8292d..5e45fddda746 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -204,12 +205,14 @@ type TestClusterArgs struct { // and potentially adjusted according to ReplicationMode. ServerArgsPerNode map[int]TestServerArgs - // If reusable listeners is true, then restart should keep listeners untouched - // so that servers are kept on the same ports. It is up to the test to set - // proxy listeners to TestServerArgs.Listener that would survive - // net.Listener.Close() and then allow restarted server to use them again. - // See testutils.ListenerRegistry. - ReusableListeners bool + // If set, listeners will be created from the below registry and they will be + // retained across restarts (i.e. servers are kept on the same ports, but + // avoiding races where another process grabs the port while the server is + // down). It's also possible not to set this field but set a *ReusableListener + // directly in TestServerArgs.Listener. If a non-reusable listener is set in + // that field, RestartServer will return an error to guide the developer + // towards a non-flaky pattern. + ReusableListenerReg *listenerutil.ListenerRegistry } // DefaultTestTenantOptions specifies the conditions under which the default diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 691137e7a480..7e14780c180f 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -394,6 +394,7 @@ go_test( "//pkg/storage", "//pkg/testutils", "//pkg/testutils/datapathutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index 812c8954467d..3c2ea66aa50d 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -437,7 +438,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { }) defer c.Cleanup() - listenerReg := testutils.NewListenerRegistry() + listenerReg := listenerutil.NewListenerRegistry() defer listenerReg.Close() storeReg := server.NewStickyInMemEnginesRegistry() @@ -461,7 +462,6 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { StickyEngineRegistry: storeReg, }, }, - Listener: listenerReg.GetOrFail(t, i), StoreSpecs: []base.StoreSpec{ { InMemory: true, @@ -470,8 +470,8 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { } } tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{ - ReusableListeners: true, - ServerArgsPerNode: sa, + ReusableListenerReg: listenerReg, + ServerArgsPerNode: sa, }) tc.Start(t) s := sqlutils.MakeSQLRunner(tc.Conns[0]) @@ -554,7 +554,6 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { // NB: If recovery is not performed, server will just hang on startup. // This is caused by liveness range becoming unavailable and preventing any // progress. So it is likely that test will timeout if basic workflow fails. - listenerReg.ReopenOrFail(t, 0) require.NoError(t, tc.RestartServer(0), "restart failed") s = sqlutils.MakeSQLRunner(tc.Conns[0]) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 44066d570f75..b2a86281bf7f 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -457,6 +457,7 @@ go_test( "//pkg/testutils/echotest", "//pkg/testutils/gossiputil", "//pkg/testutils/kvclientutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 23b6b49d4b84..7078c8b74217 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" @@ -92,11 +93,14 @@ func TestStoreRecoverFromEngine(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() ctx := context.Background() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { @@ -198,6 +202,8 @@ func TestStoreRecoverWithErrors(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() numIncrements := 0 keyA := roachpb.Key("a") @@ -205,7 +211,8 @@ func TestStoreRecoverWithErrors(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ @@ -341,6 +348,8 @@ func TestRestoreReplicas(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 2 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -363,8 +372,9 @@ func TestRestoreReplicas(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) store := tc.GetFirstStoreFromServer(t, 0) @@ -661,6 +671,8 @@ func TestSnapshotAfterTruncation(t *testing.T) { t.Run(name, func(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -683,8 +695,9 @@ func TestSnapshotAfterTruncation(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) store := tc.GetFirstStoreFromServer(t, 0) @@ -4192,6 +4205,8 @@ func TestInitRaftGroupOnRequest(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 2 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -4214,8 +4229,9 @@ func TestInitRaftGroupOnRequest(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) @@ -4703,6 +4719,8 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() stopper := stop.NewStopper() ctx := context.Background() @@ -4764,8 +4782,9 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) // Make a key that's in the user data space. @@ -5097,6 +5116,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { stickyEngineRegistry server.StickyInMemEnginesRegistry, ) { stickyEngineRegistry = server.NewStickyInMemEnginesRegistry() + lisReg := listenerutil.NewListenerRegistry() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -5128,10 +5148,12 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { tc = testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) + tc.Stopper().AddCloser(stop.CloserFn(lisReg.Close)) db = tc.GetFirstStoreFromServer(t, 1).DB() // Split off a non-system range so we don't have to account for node liveness diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 4737cd86676f..d6ba3ba968c1 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -51,6 +51,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -2158,13 +2159,16 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() var leaseAcquisitionTrap atomic.Value ctx := context.Background() manual := hlc.NewHybridManualClock() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { diff --git a/pkg/kv/kvserver/client_store_test.go b/pkg/kv/kvserver/client_store_test.go index 738d59ab71ef..20b959d1f135 100644 --- a/pkg/kv/kvserver/client_store_test.go +++ b/pkg/kv/kvserver/client_store_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -120,7 +121,7 @@ func TestStoreLoadReplicaQuiescent(t *testing.T) { testutils.RunTrueAndFalse(t, "kv.expiration_leases_only.enabled", func(t *testing.T, expOnly bool) { storeReg := server.NewStickyInMemEnginesRegistry() defer storeReg.CloseAllStickyInMemEngines() - listenerReg := testutils.NewListenerRegistry() + listenerReg := listenerutil.NewListenerRegistry() defer listenerReg.Close() ctx := context.Background() @@ -128,11 +129,10 @@ func TestStoreLoadReplicaQuiescent(t *testing.T) { kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, expOnly) tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ReusableListeners: true, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: listenerReg, ServerArgs: base.TestServerArgs{ Settings: st, - Listener: listenerReg.GetOrFail(t, 0), RaftConfig: base.RaftConfig{ RaftTickInterval: 100 * time.Millisecond, }, diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index b38a9ef4c9eb..7f30694650fc 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -96,6 +96,7 @@ go_test( "//pkg/storage/enginepb", "//pkg/testutils", "//pkg/testutils/datapathutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/testcluster", @@ -105,6 +106,7 @@ go_test( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", + "//pkg/util/stop", "//pkg/util/strutil", "//pkg/util/timeutil", "//pkg/util/uuid", diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index c050a5e0453f..3326d6f1a500 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -30,10 +30,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -196,8 +198,7 @@ func TestGetPlanStagingState(t *testing.T) { ctx := context.Background() - tc, reg, planStores, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, planStores := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -259,8 +260,7 @@ func TestStageRecoveryPlans(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -301,8 +301,7 @@ func TestStageBadVersions(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 1) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 1) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -331,8 +330,7 @@ func TestStageConflictingPlans(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -372,8 +370,7 @@ func TestForcePlanUpdate(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -415,8 +412,7 @@ func TestNodeDecommissioned(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -449,8 +445,7 @@ func TestRejectDecommissionReachableNode(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -471,8 +466,7 @@ func TestStageRecoveryPlansToWrongCluster(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -504,8 +498,7 @@ func TestRetrieveRangeStatus(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 5) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 5) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -561,8 +554,7 @@ func TestRetrieveApplyStatus(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 5) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 5) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -629,7 +621,6 @@ func TestRetrieveApplyStatus(t *testing.T) { for _, id := range planDetails.UpdatedNodes { tc.StopServer(int(id.NodeID - 1)) - lReg.ReopenOrFail(t, int(id.NodeID-1)) require.NoError(t, tc.RestartServer(int(id.NodeID-1)), "failed to restart node") } @@ -660,8 +651,7 @@ func TestRejectBadVersionApplication(t *testing.T) { ctx := context.Background() - tc, reg, pss, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, pss := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -681,7 +671,6 @@ func TestRejectBadVersionApplication(t *testing.T) { tc.StopServer(1) require.NoError(t, pss[1].SavePlan(plan), "failed to inject plan into storage") - lReg.ReopenOrFail(t, 1) require.NoError(t, tc.RestartServer(1), "failed to restart server") r, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) @@ -700,21 +689,16 @@ func TestRejectBadVersionApplication(t *testing.T) { func prepTestCluster( t *testing.T, nodes int, -) ( - *testcluster.TestCluster, - server.StickyInMemEnginesRegistry, - map[int]loqrecovery.PlanStore, - testutils.ListenerRegistry, -) { +) (*testcluster.TestCluster, server.StickyInMemEnginesRegistry, map[int]loqrecovery.PlanStore) { skip.UnderStressRace(t, "cluster frequently fails to start under stress race") reg := server.NewStickyInMemEnginesRegistry() - lReg := testutils.NewListenerRegistry() + lReg := listenerutil.NewListenerRegistry() args := base.TestClusterArgs{ - ServerArgsPerNode: make(map[int]base.TestServerArgs), - ReusableListeners: true, + ServerArgsPerNode: make(map[int]base.TestServerArgs), + ReusableListenerReg: lReg, } for i := 0; i < nodes; i++ { args.ServerArgsPerNode[i] = base.TestServerArgs{ @@ -732,12 +716,12 @@ func prepTestCluster( StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), }, }, - Listener: lReg.GetOrFail(t, i), } } tc := testcluster.NewTestCluster(t, nodes, args) tc.Start(t) - return tc, reg, prepInMemPlanStores(t, args.ServerArgsPerNode), lReg + tc.Stopper().AddCloser(stop.CloserFn(lReg.Close)) + return tc, reg, prepInMemPlanStores(t, args.ServerArgsPerNode) } func prepInMemPlanStores( diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index fadcebdf1151..66a6ef5c31c2 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -152,11 +153,14 @@ func TestNodeLivenessInitialIncrement(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() ctx := context.Background() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { @@ -869,6 +873,8 @@ func TestNodeLivenessSetDraining(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -891,8 +897,9 @@ func TestNodeLivenessSetDraining(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) @@ -1200,6 +1207,8 @@ func verifyNodeIsDecommissioning(t *testing.T, tc *testcluster.TestCluster, node func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -1222,8 +1231,9 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { ctx := context.Background() tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 99c26cec4aac..9f1a245d7ea9 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -2091,6 +2092,8 @@ func TestReplicateQueueAcquiresInvalidLeases(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() zcfg := zonepb.DefaultZoneConfig() zcfg.NumReplicas = proto.Int32(1) @@ -2098,7 +2101,8 @@ func TestReplicateQueueAcquiresInvalidLeases(t *testing.T) { base.TestClusterArgs{ // Disable the replication queue initially, to assert on the lease // statuses pre and post enabling the replicate queue. - ReplicationMode: base.ReplicationManual, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ Settings: st, DefaultTestTenant: base.TestTenantDisabled, diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 35f1b8f1f1d8..fd0f6f672dee 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -526,6 +526,7 @@ go_test( "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/diagutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/server/server_startup_test.go b/pkg/server/server_startup_test.go index 8f719c1c6bac..363d21c016b6 100644 --- a/pkg/server/server_startup_test.go +++ b/pkg/server/server_startup_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -41,9 +42,12 @@ func TestStartupInjectedFailureSingleNode(t *testing.T) { t.Log("TestStartupInjectedFailure random seed", seed) reg := server.NewStickyInMemEnginesRegistry() defer reg.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() var enableFaults atomic.Bool args := base.TestClusterArgs{ + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { diff --git a/pkg/testutils/BUILD.bazel b/pkg/testutils/BUILD.bazel index 0d561e957c78..9e3caaff30af 100644 --- a/pkg/testutils/BUILD.bazel +++ b/pkg/testutils/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "files.go", "hook.go", "keys.go", - "listener.go", "net.go", "pprof.go", "soon.go", @@ -34,7 +33,6 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", - "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/testutils/listenerutil/BUILD.bazel b/pkg/testutils/listenerutil/BUILD.bazel new file mode 100644 index 000000000000..0632b7aed487 --- /dev/null +++ b/pkg/testutils/listenerutil/BUILD.bazel @@ -0,0 +1,27 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "listenerutil", + srcs = ["listener.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) + +go_test( + name = "listenerutil_test", + srcs = ["listener_test.go"], + args = ["-test.timeout=295s"], + embed = [":listenerutil"], + deps = [ + "//pkg/util/ctxgroup", + "//pkg/util/leaktest", + "//pkg/util/stop", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/testutils/listener.go b/pkg/testutils/listenerutil/listener.go similarity index 66% rename from pkg/testutils/listener.go rename to pkg/testutils/listenerutil/listener.go index 65631427954b..f058a6f68a8c 100644 --- a/pkg/testutils/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -8,11 +8,10 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package testutils +package listenerutil import ( "net" - "testing" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" @@ -32,27 +31,30 @@ import ( // actual network sockets when closed, but will pause accepting connections. // Test could then specifically resume listeners prior to restarting servers. type ListenerRegistry struct { - listeners map[int]*reusableListener + listeners map[int]*ReusableListener } // NewListenerRegistry creates a registry of reusable listeners to be used with -// test cluster. Once created use ListenerRegistry.GetOrFail to create new +// test cluster. Once created use ListenerRegistry.MustGetOrCreate to create new // listeners and inject them into test cluster using Listener field of // base.TestServerArgs. -func NewListenerRegistry() ListenerRegistry { - return ListenerRegistry{listeners: make(map[int]*reusableListener)} +func NewListenerRegistry() *ListenerRegistry { + return &ListenerRegistry{listeners: make(map[int]*ReusableListener)} } -// GetOrFail returns an existing reusable socket listener or creates a new one +// MustGetOrCreate returns an existing reusable socket listener or creates a new one // on a random local port. -func (r *ListenerRegistry) GetOrFail(t *testing.T, idx int) net.Listener { - t.Helper() +func (r *ListenerRegistry) MustGetOrCreate(t require.TestingT, idx int) *ReusableListener { + if h, ok := t.(interface{ Helper() }); ok { + h.Helper() + } if l, ok := r.listeners[idx]; ok { return l } nl, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err, "failed to create network listener") - l := &reusableListener{ + l := &ReusableListener{ + reg: r, id: idx, wrapped: nl, acceptC: make(chan acceptResult), @@ -64,13 +66,13 @@ func (r *ListenerRegistry) GetOrFail(t *testing.T, idx int) net.Listener { return l } -// ReopenOrFail will allow accepting more connections on existing shared -// listener if it was previously closed. If it was not closed, nothing happens. -// If listener wasn't created previously, test failure is raised. -func (r *ListenerRegistry) ReopenOrFail(t *testing.T, idx int) { - l, ok := r.listeners[idx] - require.Truef(t, ok, "socket for id %d is not open", idx) - l.resume() +func (r *ListenerRegistry) MustGet(t require.TestingT, idx int) *ReusableListener { + if l, ok := r.listeners[idx]; ok { + return l + } + t.Errorf("listener %d not found", idx) + t.FailNow() + return nil // not reached } // Close closes and deletes all previously created shared listeners. @@ -87,8 +89,13 @@ type acceptResult struct { err error } -type reusableListener struct { - id int +// A ReusableListener wraps a net.Listener and gives it the ability to be closed +// and reopened, which is useful for tests that want to restart servers under +// the same address without worrying about losing a race with another process' +// port acquisition. +type ReusableListener struct { + reg *ListenerRegistry + id int // idx into reg.listeners wrapped net.Listener acceptC chan acceptResult pauseMu struct { @@ -98,7 +105,7 @@ type reusableListener struct { stopC chan interface{} } -func (l *reusableListener) run() { +func (l *ReusableListener) run() { defer func() { close(l.acceptC) }() @@ -121,20 +128,32 @@ func (l *reusableListener) run() { } } -func (l *reusableListener) pauseC() <-chan interface{} { +func (l *ReusableListener) pauseC() <-chan interface{} { l.pauseMu.RLock() defer l.pauseMu.RUnlock() return l.pauseMu.pauseC } -func (l *reusableListener) resume() { +// Reopen will allow accepting more connections on existing shared listener if +// it was previously closed. If it was not closed, nothing happens. If listener +// wasn't created previously, an error is returned. +func (r *ReusableListener) Reopen() error { + l, ok := r.reg.listeners[r.id] + if !ok { + return errors.Errorf("socket for id %d is not open", r.id) + } + l.resume() + return nil +} + +func (l *ReusableListener) resume() { l.pauseMu.Lock() defer l.pauseMu.Unlock() l.pauseMu.pauseC = make(chan interface{}) } // Accept implements net.Listener interface. -func (l *reusableListener) Accept() (net.Conn, error) { +func (l *ReusableListener) Accept() (net.Conn, error) { select { case c, ok := <-l.acceptC: if !ok { @@ -150,7 +169,7 @@ func (l *reusableListener) Accept() (net.Conn, error) { // doesn't close underlying listener and it is the responsibility of // ListenerRegistry that provided it to close wrapped listener when registry // is closed. -func (l *reusableListener) Close() error { +func (l *ReusableListener) Close() error { l.pauseMu.Lock() defer l.pauseMu.Unlock() select { @@ -163,6 +182,6 @@ func (l *reusableListener) Close() error { } // Addr implements net.Listener interface. -func (l *reusableListener) Addr() net.Addr { +func (l *ReusableListener) Addr() net.Addr { return l.wrapped.Addr() } diff --git a/pkg/testutils/listenerutil/listener_test.go b/pkg/testutils/listenerutil/listener_test.go new file mode 100644 index 000000000000..356ff1291f15 --- /dev/null +++ b/pkg/testutils/listenerutil/listener_test.go @@ -0,0 +1,94 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package listenerutil + +import ( + "context" + "net" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestReusableListener(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + lr := NewListenerRegistry() + defer lr.Close() + ln := lr.MustGetOrCreate(t, 0) + + // connect accepts from the socket and also connects to it to write and then + // read a single byte. The returned error is that of the connecting goroutine; + // an error in Accept() is ignored (but others are returned). + connect := func() error { + return ctxgroup.GoAndWait(ctx, + func(ctx context.Context) (rerr error) { + defer func() { + rerr = errors.Wrap(rerr, "connecter") + }() + c, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + return err + } + if _, err := c.Write([]byte{'x'}); err != nil { + return errors.Wrap(err, "write") + } + sl := make([]byte, 1) + if _, err := c.Read(sl); err != nil { + return errors.Wrap(err, "read") + } + if sl[0] != 'x' { + return errors.Errorf("reply-read: didn't expect %v", sl) + } + _ = c.Close() + return nil + }, + func(ctx context.Context) (rerr error) { + defer func() { + rerr = errors.Wrap(rerr, "accepter") + }() + c, err := ln.Accept() + if err != nil { + t.Logf("ignoring error from Accept: %s", err) + return nil + } + sl := make([]byte, 1) + if _, err := c.Read(sl); err != nil { + return errors.Wrap(err, "read") + } + if sl[0] != 'x' { + return errors.Errorf("read: didn't expect %v", sl) + } + if _, err := c.Write([]byte{'x'}); err != nil { + return errors.Wrap(err, "reply-write") + } + _ = c.Close() + return nil + }, + ) + } + + require.NoError(t, connect()) + require.NoError(t, ln.Close()) + err := connect() + require.Error(t, err) + t.Logf("the expected error is: %v", err) + require.NoError(t, lr.MustGet(t, 0).Reopen()) + require.NoError(t, connect()) +} diff --git a/pkg/testutils/testcluster/BUILD.bazel b/pkg/testutils/testcluster/BUILD.bazel index 2c0fdc71e7d4..b3dc3a2caad4 100644 --- a/pkg/testutils/testcluster/BUILD.bazel +++ b/pkg/testutils/testcluster/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/sql/randgen", "//pkg/storage", "//pkg/testutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/util/allstacks", "//pkg/util/hlc", @@ -63,6 +64,7 @@ go_test( "//pkg/server/serverpb", "//pkg/sql/catalog/desctestutils", "//pkg/testutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/util/httputil", diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index dac74a94dee1..8e6d888a1467 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/allstacks" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -61,6 +62,8 @@ import ( type TestCluster struct { Servers []*server.TestServer Conns []*gosql.DB + // ReusableListeners is populated if (and only if) TestClusterArgs.ReusableListeners is set. + ReusableListeners map[int] /* idx */ *listenerutil.ReusableListener // Connection to the storage cluster. Typically, the first connection in // Conns, but could be different if we're transparently running in a test @@ -281,6 +284,18 @@ func NewTestCluster(t testing.TB, nodes int, clusterArgs base.TestClusterArgs) * serverArgs = tc.clusterArgs.ServerArgs } + // If a reusable listener registry is provided, create reusable listeners + // for every server that doesn't have a custom listener provided. (Only + // servers with a reusable listener can be restarted). + if reg := clusterArgs.ReusableListenerReg; reg != nil && serverArgs.Listener == nil { + ln := reg.MustGetOrCreate(t, i) + serverArgs.Listener = ln + if tc.ReusableListeners == nil { + tc.ReusableListeners = map[int]*listenerutil.ReusableListener{} + } + tc.ReusableListeners[i] = ln + } + if len(serverArgs.StoreSpecs) == 0 { serverArgs.StoreSpecs = []base.StoreSpec{base.DefaultTestStoreSpec} } @@ -1644,7 +1659,11 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server. } serverArgs := tc.serverArgs[idx] - if !tc.clusterArgs.ReusableListeners { + if ln := tc.ReusableListeners[idx]; ln != nil { + serverArgs.Listener = ln + } + + if serverArgs.Listener == nil { if idx == 0 { // If it's the first server, then we need to restart the RPC listener by hand. // Look at NewTestCluster for more details. @@ -1663,6 +1682,16 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server. } } } + } else if ln, ok := serverArgs.Listener.(*listenerutil.ReusableListener); !ok { + // Restarting a server without a reusable listener can cause flakes since the + // port may be occupied by a different process now. Use a reusable listener + // to avoid that problem. + return errors.Errorf( + "ReusableListeners must be set on ClusterArgs or compatible Listener "+ + "needs to be set in serverArgs to restart server %d", idx, + ) + } else if err := ln.Reopen(); err != nil { + return err } for i, specs := range serverArgs.StoreSpecs { diff --git a/pkg/testutils/testcluster/testcluster_test.go b/pkg/testutils/testcluster/testcluster_test.go index 44af28ef8634..6c01b37db2c3 100644 --- a/pkg/testutils/testcluster/testcluster_test.go +++ b/pkg/testutils/testcluster/testcluster_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -275,6 +276,8 @@ func TestRestart(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -297,8 +300,9 @@ func TestRestart(t *testing.T) { ctx := context.Background() tc := StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationAuto, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationAuto, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) require.NoError(t, tc.WaitForFullReplication()) diff --git a/pkg/util/startup/BUILD.bazel b/pkg/util/startup/BUILD.bazel index a95d3018645f..58ad9a5e7407 100644 --- a/pkg/util/startup/BUILD.bazel +++ b/pkg/util/startup/BUILD.bazel @@ -37,7 +37,7 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/spanconfig", - "//pkg/testutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/testcluster", diff --git a/pkg/util/startup/startup_test.go b/pkg/util/startup/startup_test.go index 9e4ea02c5980..9a4512665570 100644 --- a/pkg/util/startup/startup_test.go +++ b/pkg/util/startup/startup_test.go @@ -29,7 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" - "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -137,7 +137,7 @@ func runCircuitBreakerTestForKey( ) ctx := context.Background() - lReg := testutils.NewListenerRegistry() + lReg := listenerutil.NewListenerRegistry() defer lReg.Close() reg := server.NewStickyInMemEnginesRegistry() defer reg.CloseAllStickyInMemEngines() @@ -149,8 +149,8 @@ func runCircuitBreakerTestForKey( kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) args := base.TestClusterArgs{ - ServerArgsPerNode: make(map[int]base.TestServerArgs), - ReusableListeners: true, + ServerArgsPerNode: make(map[int]base.TestServerArgs), + ReusableListenerReg: lReg, } var enableFaults atomic.Bool for i := 0; i < nodes; i++ { @@ -170,7 +170,7 @@ func runCircuitBreakerTestForKey( StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), }, }, - Listener: lReg.GetOrFail(t, i), + Listener: lReg.MustGetOrCreate(t, i), } args.ServerArgsPerNode[i] = a } @@ -278,7 +278,7 @@ func runCircuitBreakerTestForKey( // Restart node and check that it succeeds in reestablishing range quorum // necessary for startup actions. - lReg.ReopenOrFail(t, 5) + require.NoError(t, lReg.MustGet(t, 5).Reopen()) err = tc.RestartServer(5) require.NoError(t, err, "restarting server with range(s) %s tripping circuit breaker", rangesList)