Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-1971: TestReplicationConcurrentPush flaky test fix #6173

Merged
merged 6 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions db/sg_replicate_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,10 @@ func (m *sgReplicateManager) SubscribeCfgChanges(ctx context.Context) error {
base.DebugfCtx(m.loggingCtx, base.KeyCluster, "Error subscribing to %s key changes: %v", cfgKeySGRCluster, err)
return err
}
err = m.RefreshReplicationCfg(ctx)
if err != nil {
base.WarnfCtx(m.loggingCtx, "Error while updating refreshing replications before subscribing to cfg: %v", err)
}
m.closeWg.Add(1)
go func() {
defer base.FatalPanicHandler()
Expand Down Expand Up @@ -1194,6 +1198,12 @@ func (c *SGRCluster) GetReplicationIDsForNode(nodeUUID string) (replicationIDs [
return replicationIDs
}

func (m *sgReplicateManager) GetNumberActiveReplicators() int {
m.activeReplicatorsLock.Lock()
defer m.activeReplicatorsLock.Unlock()
return len(m.activeReplicators)
}

// RebalanceReplications distributes the set of defined replications across the set of available nodes
func (c *SGRCluster) RebalanceReplications() {

Expand Down
4 changes: 2 additions & 2 deletions rest/replicatortest/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,12 +956,12 @@ func TestReplicationConcurrentPush(t *testing.T) {

activeRT, remoteRT, remoteURLString, teardown := rest.SetupSGRPeers(t)
defer teardown()
// Create push replications, verify running
// Create push replications, verify running, also verify active replicators are created
activeRT.CreateReplication("rep_ABC", remoteURLString, db.ActiveReplicatorTypePush, []string{"ABC"}, true, db.ConflictResolverDefault)
activeRT.CreateReplication("rep_DEF", remoteURLString, db.ActiveReplicatorTypePush, []string{"DEF"}, true, db.ConflictResolverDefault)
activeRT.WaitForAssignedReplications(2)
activeRT.WaitForReplicationStatus("rep_ABC", db.ReplicationStateRunning)
activeRT.WaitForReplicationStatus("rep_DEF", db.ReplicationStateRunning)
activeRT.WaitForActiveReplicatorInitialization(2)

// Create docs on active
docAllChannels1 := t.Name() + "All1"
Expand Down
8 changes: 8 additions & 0 deletions rest/utilities_testing_resttester.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ func (rt *RestTester) WaitForCheckpointLastSequence(expectedName string) (string
return lastSeq, rt.WaitForCondition(successFunc)
}

func (rt *RestTester) WaitForActiveReplicatorInitialization(count int) {
successFunc := func() bool {
ar := rt.GetDatabase().SGReplicateMgr.GetNumberActiveReplicators()
return ar == count
}
require.NoError(rt.TB, rt.WaitForCondition(successFunc), "mismatch on number of active replicators")
}

// createReplication creates a replication via the REST API with the specified ID, remoteURL, direction and channel filter
func (rt *RestTester) CreateReplication(replicationID string, remoteURLString string, direction db.ActiveReplicatorDirection, channels []string, continuous bool, conflictResolver db.ConflictResolverType) {
rt.CreateReplicationForDB("{{.db}}", replicationID, remoteURLString, direction, channels, continuous, conflictResolver)
Expand Down