diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index 9eb75f9279..a23812e6d7 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -155,17 +155,10 @@ func TestHLVCreateDocumentMultiActor(t *testing.T) { // - Wait for docs last write to be replicated to all other peers func TestHLVCreateDocumentMultiActorConflict(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV) - if base.UnitTestUrlIsWalrus() { - t.Skip("Panics against rosmar, CBG-4378") - } else { + if !base.UnitTestUrlIsWalrus() { t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379") } for _, tc := range getMultiActorTestCases() { - if strings.Contains(tc.description(), "CBL") { - // Test case flakes given the WaitForDocVersion function only waits for a docID on the cbl peer. We need to be - // able to wait for a specific version to arrive over pull replication - t.Skip("We need to be able to wait for a specific version to arrive over pull replication, CBG-4257") - } t.Run(tc.description(), func(t *testing.T) { peers, replications := setupTests(t, tc.topology) @@ -261,9 +254,7 @@ func TestHLVUpdateDocumentSingleActor(t *testing.T) { // - Start replications and wait for last update to be replicated to all peers func TestHLVUpdateDocumentMultiActorConflict(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV) - if base.UnitTestUrlIsWalrus() { - t.Skip("Panics against rosmar, CBG-4378") - } else { + if !base.UnitTestUrlIsWalrus() { t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379") } for _, tc := range getMultiActorTestCases() { @@ -365,9 +356,7 @@ func TestHLVDeleteDocumentMultiActor(t *testing.T) { // - Start replications and assert doc is deleted on all peers func TestHLVDeleteDocumentMultiActorConflict(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV) - if base.UnitTestUrlIsWalrus() { - t.Skip("Panics against rosmar, CBG-4378") - } else { + if !base.UnitTestUrlIsWalrus() { t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379") } for _, tc := range getMultiActorTestCases() { @@ -395,51 +384,6 @@ func TestHLVDeleteDocumentMultiActorConflict(t *testing.T) { } } -// TestHLVUpdateDeleteDocumentMultiActorConflict: -// - Create conflicting docs on each peer -// - Start replications -// - Wait for last write to be replicated to all peers -// - Stop replications -// - Update docs on all peers, then delete the doc on one peer -// - Start replications and assert doc is deleted on all peers (given the delete was the last write) -func TestHLVUpdateDeleteDocumentMultiActorConflict(t *testing.T) { - base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV) - if base.UnitTestUrlIsWalrus() { - t.Skip("Panics against rosmar, CBG-4378") - } else { - t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379") - } - for _, tc := range getMultiActorTestCases() { - if strings.Contains(tc.description(), "CBL") { - // Test case flakes given the WaitForDocVersion function only waits for a docID on the cbl peer. We need to be - // able to wait for a specific version to arrive over pull replication - t.Skip("We need to be able to wait for a specific version to arrive over pull replication + unexpected body in proposeChanges: [304] issue, CBG-4257") - } - t.Run(tc.description(), func(t *testing.T) { - peerList := tc.PeerNames() - peers, replications := setupTests(t, tc.topology) - stopPeerReplications(replications) - - docID := getDocID(t) - docVersion := createConflictingDocs(t, tc, peers, docID) - - startPeerReplications(replications) - waitForVersionAndBody(t, tc, peers, docID, docVersion) - - stopPeerReplications(replications) - - _ = updateConflictingDocs(t, tc, peers, docID) - - lastPeer := peerList[len(peerList)-1] - deleteVersion := peers[lastPeer].DeleteDocument(tc.collectionName(), docID) - t.Logf("deleteVersion: %+v", deleteVersion) - - startPeerReplications(replications) - waitForDeletion(t, tc, peers, docID, lastPeer) - }) - } -} - // TestHLVDeleteUpdateDocumentMultiActorConflict: // - Create conflicting docs on each peer // - Start replications @@ -579,9 +523,7 @@ func TestHLVResurrectDocumentMultiActor(t *testing.T) { // - Start replications and wait for last resurrection operation to be replicated to all peers func TestHLVResurrectDocumentMultiActorConflict(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV) - if base.UnitTestUrlIsWalrus() { - t.Skip("Panics against rosmar, CBG-4378") - } else { + if !base.UnitTestUrlIsWalrus() { t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379") } for _, tc := range getMultiActorTestCases() { diff --git a/topologytest/topologies_test.go b/topologytest/topologies_test.go index 408b9dcb5f..bb8d13e34d 100644 --- a/topologytest/topologies_test.go +++ b/topologytest/topologies_test.go @@ -208,28 +208,28 @@ var Topologies = []Topology{ }, }, // topology 1.4 not present, no P2P supported yet - { - /* - Test topology 1.5 + /* + { + Test topology 1.5 - + - - - - - - + +- - - - - - -+ - ' cluster A ' ' cluster B ' - ' +---------+ ' ' +---------+ ' - ' | cbs1 | ' <--> ' | cbs2 | ' - ' +---------+ ' ' +---------+ ' - ' +---------+ ' ' +---------+ ' - ' | sg1 | ' ' | sg2 | ' - ' +---------+ ' ' +---------+ ' - + - - - - - - + +- - - - - - -+ - ^ ^ - | | - | | - | | - | +------+ | - +---> | cbl1 | <---+ - +------+ - */ - /* This test doesn't work yet, CouchbaseLiteMockPeer doesn't support writing data to multiple Sync Gateway peers yet + + - - - - - - + +- - - - - - -+ + ' cluster A ' ' cluster B ' + ' +---------+ ' ' +---------+ ' + ' | cbs1 | ' <--> ' | cbs2 | ' + ' +---------+ ' ' +---------+ ' + ' +---------+ ' ' +---------+ ' + ' | sg1 | ' ' | sg2 | ' + ' +---------+ ' ' +---------+ ' + + - - - - - - + +- - - - - - -+ + ^ ^ + | | + | | + | | + | +------+ | + +---> | cbl1 | <---+ + +------+ + */ + /* This test doesn't work yet, CouchbaseLiteMockPeer doesn't support writing data to multiple Sync Gateway peers yet description: "Sync Gateway -> Couchbase Server -> Couchbase Server", peers: map[string]PeerOptions{ "cbs1": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID1}, @@ -283,8 +283,8 @@ var Topologies = []Topology{ }, }, }, - */ }, + */ } // simpleTopologies represents simplified topologies to make testing the integration test code easier. diff --git a/xdcr/cbs_xdcr.go b/xdcr/cbs_xdcr.go index 82ed6470db..e09c85cd9a 100644 --- a/xdcr/cbs_xdcr.go +++ b/xdcr/cbs_xdcr.go @@ -116,6 +116,9 @@ func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucke // Start starts the XDCR replication. func (x *couchbaseServerManager) Start(ctx context.Context) error { + if x.replicationID != "" { + return ErrReplicationAlreadyRunning + } method := http.MethodPost body := url.Values{} body.Add("name", fmt.Sprintf("%s_%s", x.fromBucket.GetName(), x.toBucket.GetName())) @@ -156,7 +159,7 @@ func (x *couchbaseServerManager) Start(ctx context.Context) error { func (x *couchbaseServerManager) Stop(ctx context.Context) error { // replication is not started if x.replicationID == "" { - return nil + return ErrReplicationNotRunning } method := http.MethodDelete url := "/controller/cancelXDCR/" + url.PathEscape(x.replicationID) diff --git a/xdcr/replication.go b/xdcr/replication.go index d655647a8b..3f1c4bcc21 100644 --- a/xdcr/replication.go +++ b/xdcr/replication.go @@ -18,6 +18,9 @@ import ( "github.com/couchbaselabs/rosmar" ) +var ErrReplicationNotRunning = fmt.Errorf("Replication is not running") +var ErrReplicationAlreadyRunning = fmt.Errorf("Replication is already running") + // Manager represents a bucket to bucket replication. type Manager interface { // Start starts the replication. diff --git a/xdcr/rosmar_xdcr.go b/xdcr/rosmar_xdcr.go index 8b63be2b26..1cbb46d58c 100644 --- a/xdcr/rosmar_xdcr.go +++ b/xdcr/rosmar_xdcr.go @@ -14,6 +14,7 @@ import ( "errors" "fmt" "strings" + "sync" "sync/atomic" "golang.org/x/exp/maps" @@ -47,6 +48,7 @@ func (r replicatedDocLocation) String() string { type rosmarManager struct { filterFunc xdcrFilterFunc terminator chan bool + collectionsLock sync.RWMutex fromBucketKeyspaces map[uint32]string toBucketCollections map[uint32]*rosmar.Collection fromBucket *rosmar.Bucket @@ -75,7 +77,6 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket, replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()), toBucketCollections: make(map[uint32]*rosmar.Collection), fromBucketKeyspaces: make(map[uint32]string), - terminator: make(chan bool), filterFunc: mobileXDCRFilter, }, nil @@ -85,6 +86,8 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket, func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEvent) bool { docID := string(event.Key) base.TracefCtx(ctx, base.KeyVV, "Got event %s, opcode: %s", docID, event.Opcode) + r.collectionsLock.RLock() + defer r.collectionsLock.RUnlock() col, ok := r.toBucketCollections[event.CollectionID] if !ok { base.ErrorfCtx(ctx, "This violates the assumption that all collections are mapped to a target collection. This should not happen. Found event=%+v", event) @@ -209,6 +212,12 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve // Start starts the replication for all existing replications. Errors if there aren't corresponding named collections on each bucket. func (r *rosmarManager) Start(ctx context.Context) error { + if r.terminator != nil { + return ErrReplicationAlreadyRunning + } + r.collectionsLock.Lock() + defer r.collectionsLock.Unlock() + r.terminator = make(chan bool) // set up replication to target all existing collections, and map to other collections scopes := make(map[string][]string) fromDataStores, err := r.fromBucket.ListDataStores() @@ -259,6 +268,9 @@ func (r *rosmarManager) Start(ctx context.Context) error { // Stop terminates the replication. func (r *rosmarManager) Stop(_ context.Context) error { + if r.terminator == nil { + return ErrReplicationNotRunning + } close(r.terminator) r.terminator = nil return nil diff --git a/xdcr/xdcr_test.go b/xdcr/xdcr_test.go index 84d9037046..b47d4ab7cf 100644 --- a/xdcr/xdcr_test.go +++ b/xdcr/xdcr_test.go @@ -45,11 +45,16 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) { } xdcr, err := NewXDCR(ctx, fromBucket, toBucket, opts) require.NoError(t, err) - err = xdcr.Start(ctx) - require.NoError(t, err) + require.NoError(t, xdcr.Start(ctx)) + defer func() { - assert.NoError(t, xdcr.Stop(ctx)) + // stop XDCR, will already be stopped if test doesn't fail early + err := xdcr.Stop(ctx) + if err != nil { + assert.Equal(t, ErrReplicationNotRunning, err) + } }() + require.ErrorIs(t, xdcr.Start(ctx), ErrReplicationAlreadyRunning) const ( syncDoc = "_sync:doc1doc2" attachmentDoc = "_sync:att2:foo" @@ -115,11 +120,16 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) { // stats are not updated in real time, so we need to wait a bit require.EventuallyWithT(t, func(c *assert.CollectT) { stats, err := xdcr.Stats(ctx) - assert.NoError(t, err) + if !assert.NoError(c, err) { + assert.NoError(c, err) + } assert.Equal(c, totalDocsFiltered+1, stats.MobileDocsFiltered) assert.Equal(c, totalDocsWritten+2, stats.DocsWritten) }, time.Second*5, time.Millisecond*100) + + require.NoError(t, xdcr.Stop(ctx)) + require.ErrorIs(t, xdcr.Stop(ctx), ErrReplicationNotRunning) } // getTwoBucketDataStores creates two data stores in separate buckets to run xdcr within. Returns a named collection or a default collection based on the global test configuration. @@ -338,7 +348,7 @@ func TestVVObeyMou(t *testing.T) { require.Equal(t, expectedVV, vv) stats, err := xdcr.Stats(ctx) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, Stats{ DocsWritten: 1, DocsProcessed: 1, @@ -364,7 +374,7 @@ func TestVVObeyMou(t *testing.T) { requireWaitForXDCRDocsProcessed(t, xdcr, 2) stats, err = xdcr.Stats(ctx) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, Stats{ TargetNewerDocs: 1, DocsWritten: 1, @@ -423,7 +433,7 @@ func TestVVMouImport(t *testing.T) { require.Equal(t, expectedVV, vv) stats, err := xdcr.Stats(ctx) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, Stats{ DocsWritten: 1, DocsProcessed: 1, @@ -449,7 +459,7 @@ func TestVVMouImport(t *testing.T) { requireWaitForXDCRDocsProcessed(t, xdcr, 2) stats, err = xdcr.Stats(ctx) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, Stats{ TargetNewerDocs: 1, DocsWritten: 1, @@ -467,7 +477,7 @@ func TestVVMouImport(t *testing.T) { requireWaitForXDCRDocsProcessed(t, xdcr, 3) stats, err = xdcr.Stats(ctx) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, Stats{ TargetNewerDocs: 1, DocsWritten: 2, @@ -623,6 +633,9 @@ func TestReplicateXattrs(t *testing.T) { // TestVVMultiActor verifies that updates by multiple actors (updates to different clusters/buckets) are properly // reflected in the HLV (cv and pv). func TestVVMultiActor(t *testing.T) { + if !base.UnitTestUrlIsWalrus() { + t.Skip("This test can fail with CBS due to CBS-4334 since a document without xattrs will be written to the target bucket, even if it is otherwise up to date") + } fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t) ctx := base.TestCtx(t) fromBucketSourceID, err := GetSourceID(ctx, fromBucket) @@ -722,8 +735,10 @@ func requireWaitForXDCRDocsProcessed(t *testing.T, xdcr Manager, expectedDocsPro ctx := base.TestCtx(t) require.EventuallyWithT(t, func(c *assert.CollectT) { stats, err := xdcr.Stats(ctx) - assert.NoError(t, err) - assert.Equal(c, expectedDocsProcessed, stats.DocsProcessed) + if !assert.NoError(c, err) { + return + } + assert.Equal(c, expectedDocsProcessed, stats.DocsProcessed, "all stats=%+v", stats) }, time.Second*5, time.Millisecond*100) }