From 4b8dfb4c0406df001c799946ef7730b31a5fd967 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com> Date: Thu, 20 Apr 2023 18:20:00 +0100 Subject: [PATCH] CBG-1971: TestReplicationConcurrentPush flaky test fix (#6173) * CBG-1971: Unit test flaking due to active replicatior not initializing (race condition). This commit makes that less likely to be hit. * updates to stop race fail * remove print lines * some comment updates * updates after comments to address race more * remove debug logging and correct comments --- db/sg_replicate_cfg.go | 10 ++++++++++ rest/replicatortest/replicator_test.go | 4 ++-- rest/utilities_testing_resttester.go | 8 ++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/db/sg_replicate_cfg.go b/db/sg_replicate_cfg.go index 4dfa334d23..37164152fb 100644 --- a/db/sg_replicate_cfg.go +++ b/db/sg_replicate_cfg.go @@ -833,6 +833,10 @@ func (m *sgReplicateManager) SubscribeCfgChanges(ctx context.Context) error { base.DebugfCtx(m.loggingCtx, base.KeyCluster, "Error subscribing to %s key changes: %v", cfgKeySGRCluster, err) return err } + err = m.RefreshReplicationCfg(ctx) + if err != nil { + base.WarnfCtx(m.loggingCtx, "Error while updating refreshing replications before subscribing to cfg: %v", err) + } m.closeWg.Add(1) go func() { defer base.FatalPanicHandler() @@ -1194,6 +1198,12 @@ func (c *SGRCluster) GetReplicationIDsForNode(nodeUUID string) (replicationIDs [ return replicationIDs } +func (m *sgReplicateManager) GetNumberActiveReplicators() int { + m.activeReplicatorsLock.Lock() + defer m.activeReplicatorsLock.Unlock() + return len(m.activeReplicators) +} + // RebalanceReplications distributes the set of defined replications across the set of available nodes func (c *SGRCluster) RebalanceReplications() { diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 5ddcc8d85d..6b05397154 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -956,12 +956,12 @@ func TestReplicationConcurrentPush(t *testing.T) { activeRT, remoteRT, remoteURLString, teardown := rest.SetupSGRPeers(t) defer teardown() - // Create push replications, verify running + // Create push replications, verify running, also verify active replicators are created activeRT.CreateReplication("rep_ABC", remoteURLString, db.ActiveReplicatorTypePush, []string{"ABC"}, true, db.ConflictResolverDefault) activeRT.CreateReplication("rep_DEF", remoteURLString, db.ActiveReplicatorTypePush, []string{"DEF"}, true, db.ConflictResolverDefault) - activeRT.WaitForAssignedReplications(2) activeRT.WaitForReplicationStatus("rep_ABC", db.ReplicationStateRunning) activeRT.WaitForReplicationStatus("rep_DEF", db.ReplicationStateRunning) + activeRT.WaitForActiveReplicatorInitialization(2) // Create docs on active docAllChannels1 := t.Name() + "All1" diff --git a/rest/utilities_testing_resttester.go b/rest/utilities_testing_resttester.go index d25f3143db..b5790205a5 100644 --- a/rest/utilities_testing_resttester.go +++ b/rest/utilities_testing_resttester.go @@ -126,6 +126,14 @@ func (rt *RestTester) WaitForCheckpointLastSequence(expectedName string) (string return lastSeq, rt.WaitForCondition(successFunc) } +func (rt *RestTester) WaitForActiveReplicatorInitialization(count int) { + successFunc := func() bool { + ar := rt.GetDatabase().SGReplicateMgr.GetNumberActiveReplicators() + return ar == count + } + require.NoError(rt.TB, rt.WaitForCondition(successFunc), "mismatch on number of active replicators") +} + // createReplication creates a replication via the REST API with the specified ID, remoteURL, direction and channel filter func (rt *RestTester) CreateReplication(replicationID string, remoteURLString string, direction db.ActiveReplicatorDirection, channels []string, continuous bool, conflictResolver db.ConflictResolverType) { rt.CreateReplicationForDB("{{.db}}", replicationID, remoteURLString, direction, channels, continuous, conflictResolver)