Skip to content

Commit

Permalink
CBG-1971: TestReplicationConcurrentPush flaky test fix (#6173)
Browse files Browse the repository at this point in the history
* CBG-1971: Unit test flaking due to active replicatior not initializing (race condition). This commit makes that less likely to be hit.

* updates to stop race fail

* remove print lines

* some comment updates

* updates after comments to address race more

* remove debug logging and correct comments
  • Loading branch information
gregns1 authored Apr 20, 2023
1 parent 5f4a620 commit 4b8dfb4
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
10 changes: 10 additions & 0 deletions db/sg_replicate_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,10 @@ func (m *sgReplicateManager) SubscribeCfgChanges(ctx context.Context) error {
base.DebugfCtx(m.loggingCtx, base.KeyCluster, "Error subscribing to %s key changes: %v", cfgKeySGRCluster, err)
return err
}
err = m.RefreshReplicationCfg(ctx)
if err != nil {
base.WarnfCtx(m.loggingCtx, "Error while updating refreshing replications before subscribing to cfg: %v", err)
}
m.closeWg.Add(1)
go func() {
defer base.FatalPanicHandler()
Expand Down Expand Up @@ -1194,6 +1198,12 @@ func (c *SGRCluster) GetReplicationIDsForNode(nodeUUID string) (replicationIDs [
return replicationIDs
}

func (m *sgReplicateManager) GetNumberActiveReplicators() int {
m.activeReplicatorsLock.Lock()
defer m.activeReplicatorsLock.Unlock()
return len(m.activeReplicators)
}

// RebalanceReplications distributes the set of defined replications across the set of available nodes
func (c *SGRCluster) RebalanceReplications() {

Expand Down
4 changes: 2 additions & 2 deletions rest/replicatortest/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,12 +956,12 @@ func TestReplicationConcurrentPush(t *testing.T) {

activeRT, remoteRT, remoteURLString, teardown := rest.SetupSGRPeers(t)
defer teardown()
// Create push replications, verify running
// Create push replications, verify running, also verify active replicators are created
activeRT.CreateReplication("rep_ABC", remoteURLString, db.ActiveReplicatorTypePush, []string{"ABC"}, true, db.ConflictResolverDefault)
activeRT.CreateReplication("rep_DEF", remoteURLString, db.ActiveReplicatorTypePush, []string{"DEF"}, true, db.ConflictResolverDefault)
activeRT.WaitForAssignedReplications(2)
activeRT.WaitForReplicationStatus("rep_ABC", db.ReplicationStateRunning)
activeRT.WaitForReplicationStatus("rep_DEF", db.ReplicationStateRunning)
activeRT.WaitForActiveReplicatorInitialization(2)

// Create docs on active
docAllChannels1 := t.Name() + "All1"
Expand Down
8 changes: 8 additions & 0 deletions rest/utilities_testing_resttester.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ func (rt *RestTester) WaitForCheckpointLastSequence(expectedName string) (string
return lastSeq, rt.WaitForCondition(successFunc)
}

func (rt *RestTester) WaitForActiveReplicatorInitialization(count int) {
successFunc := func() bool {
ar := rt.GetDatabase().SGReplicateMgr.GetNumberActiveReplicators()
return ar == count
}
require.NoError(rt.TB, rt.WaitForCondition(successFunc), "mismatch on number of active replicators")
}

// createReplication creates a replication via the REST API with the specified ID, remoteURL, direction and channel filter
func (rt *RestTester) CreateReplication(replicationID string, remoteURLString string, direction db.ActiveReplicatorDirection, channels []string, continuous bool, conflictResolver db.ConflictResolverType) {
rt.CreateReplicationForDB("{{.db}}", replicationID, remoteURLString, direction, channels, continuous, conflictResolver)
Expand Down

0 comments on commit 4b8dfb4

Please sign in to comment.