From fb298af80481b9069c24efa3e01799026feee3cd Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 6 Dec 2024 14:30:54 -0500 Subject: [PATCH 1/5] CBG-4265 avoid panic in rosmar xdcr tests - return an error if xdcr is already running when Start is called, or already stopped when Stop is called - allow rosmar xdcr to be restarted via Start/Stop/Start by resetting terminator - don't return empty topology in Topologies which causes test panic - enable rosmar multi actor conflict tests - remove a test that is a duplicate of existing test --- topologytest/hlv_test.go | 66 ++------------------------------- topologytest/topologies_test.go | 44 +++++++++++----------- xdcr/cbs_xdcr.go | 5 ++- xdcr/replication.go | 3 ++ xdcr/rosmar_xdcr.go | 8 +++- xdcr/xdcr_test.go | 14 +++++-- 6 files changed, 51 insertions(+), 89 deletions(-) diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index 9eb75f9279..a23812e6d7 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -155,17 +155,10 @@ func TestHLVCreateDocumentMultiActor(t *testing.T) { // - Wait for docs last write to be replicated to all other peers func TestHLVCreateDocumentMultiActorConflict(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV) - if base.UnitTestUrlIsWalrus() { - t.Skip("Panics against rosmar, CBG-4378") - } else { + if !base.UnitTestUrlIsWalrus() { t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379") } for _, tc := range getMultiActorTestCases() { - if strings.Contains(tc.description(), "CBL") { - // Test case flakes given the WaitForDocVersion function only waits for a docID on the cbl peer. We need to be - // able to wait for a specific version to arrive over pull replication - t.Skip("We need to be able to wait for a specific version to arrive over pull replication, CBG-4257") - } t.Run(tc.description(), func(t *testing.T) { peers, replications := setupTests(t, tc.topology) @@ -261,9 +254,7 @@ func TestHLVUpdateDocumentSingleActor(t *testing.T) { // - Start replications and wait for last update to be replicated to all peers func TestHLVUpdateDocumentMultiActorConflict(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV) - if base.UnitTestUrlIsWalrus() { - t.Skip("Panics against rosmar, CBG-4378") - } else { + if !base.UnitTestUrlIsWalrus() { t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379") } for _, tc := range getMultiActorTestCases() { @@ -365,9 +356,7 @@ func TestHLVDeleteDocumentMultiActor(t *testing.T) { // - Start replications and assert doc is deleted on all peers func TestHLVDeleteDocumentMultiActorConflict(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV) - if base.UnitTestUrlIsWalrus() { - t.Skip("Panics against rosmar, CBG-4378") - } else { + if !base.UnitTestUrlIsWalrus() { t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379") } for _, tc := range getMultiActorTestCases() { @@ -395,51 +384,6 @@ func TestHLVDeleteDocumentMultiActorConflict(t *testing.T) { } } -// TestHLVUpdateDeleteDocumentMultiActorConflict: -// - Create conflicting docs on each peer -// - Start replications -// - Wait for last write to be replicated to all peers -// - Stop replications -// - Update docs on all peers, then delete the doc on one peer -// - Start replications and assert doc is deleted on all peers (given the delete was the last write) -func TestHLVUpdateDeleteDocumentMultiActorConflict(t *testing.T) { - base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV) - if base.UnitTestUrlIsWalrus() { - t.Skip("Panics against rosmar, CBG-4378") - } else { - t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379") - } - for _, tc := range getMultiActorTestCases() { - if strings.Contains(tc.description(), "CBL") { - // Test case flakes given the WaitForDocVersion function only waits for a docID on the cbl peer. We need to be - // able to wait for a specific version to arrive over pull replication - t.Skip("We need to be able to wait for a specific version to arrive over pull replication + unexpected body in proposeChanges: [304] issue, CBG-4257") - } - t.Run(tc.description(), func(t *testing.T) { - peerList := tc.PeerNames() - peers, replications := setupTests(t, tc.topology) - stopPeerReplications(replications) - - docID := getDocID(t) - docVersion := createConflictingDocs(t, tc, peers, docID) - - startPeerReplications(replications) - waitForVersionAndBody(t, tc, peers, docID, docVersion) - - stopPeerReplications(replications) - - _ = updateConflictingDocs(t, tc, peers, docID) - - lastPeer := peerList[len(peerList)-1] - deleteVersion := peers[lastPeer].DeleteDocument(tc.collectionName(), docID) - t.Logf("deleteVersion: %+v", deleteVersion) - - startPeerReplications(replications) - waitForDeletion(t, tc, peers, docID, lastPeer) - }) - } -} - // TestHLVDeleteUpdateDocumentMultiActorConflict: // - Create conflicting docs on each peer // - Start replications @@ -579,9 +523,7 @@ func TestHLVResurrectDocumentMultiActor(t *testing.T) { // - Start replications and wait for last resurrection operation to be replicated to all peers func TestHLVResurrectDocumentMultiActorConflict(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV) - if base.UnitTestUrlIsWalrus() { - t.Skip("Panics against rosmar, CBG-4378") - } else { + if !base.UnitTestUrlIsWalrus() { t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379") } for _, tc := range getMultiActorTestCases() { diff --git a/topologytest/topologies_test.go b/topologytest/topologies_test.go index 408b9dcb5f..bb8d13e34d 100644 --- a/topologytest/topologies_test.go +++ b/topologytest/topologies_test.go @@ -208,28 +208,28 @@ var Topologies = []Topology{ }, }, // topology 1.4 not present, no P2P supported yet - { - /* - Test topology 1.5 + /* + { + Test topology 1.5 - + - - - - - - + +- - - - - - -+ - ' cluster A ' ' cluster B ' - ' +---------+ ' ' +---------+ ' - ' | cbs1 | ' <--> ' | cbs2 | ' - ' +---------+ ' ' +---------+ ' - ' +---------+ ' ' +---------+ ' - ' | sg1 | ' ' | sg2 | ' - ' +---------+ ' ' +---------+ ' - + - - - - - - + +- - - - - - -+ - ^ ^ - | | - | | - | | - | +------+ | - +---> | cbl1 | <---+ - +------+ - */ - /* This test doesn't work yet, CouchbaseLiteMockPeer doesn't support writing data to multiple Sync Gateway peers yet + + - - - - - - + +- - - - - - -+ + ' cluster A ' ' cluster B ' + ' +---------+ ' ' +---------+ ' + ' | cbs1 | ' <--> ' | cbs2 | ' + ' +---------+ ' ' +---------+ ' + ' +---------+ ' ' +---------+ ' + ' | sg1 | ' ' | sg2 | ' + ' +---------+ ' ' +---------+ ' + + - - - - - - + +- - - - - - -+ + ^ ^ + | | + | | + | | + | +------+ | + +---> | cbl1 | <---+ + +------+ + */ + /* This test doesn't work yet, CouchbaseLiteMockPeer doesn't support writing data to multiple Sync Gateway peers yet description: "Sync Gateway -> Couchbase Server -> Couchbase Server", peers: map[string]PeerOptions{ "cbs1": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID1}, @@ -283,8 +283,8 @@ var Topologies = []Topology{ }, }, }, - */ }, + */ } // simpleTopologies represents simplified topologies to make testing the integration test code easier. diff --git a/xdcr/cbs_xdcr.go b/xdcr/cbs_xdcr.go index 82ed6470db..e09c85cd9a 100644 --- a/xdcr/cbs_xdcr.go +++ b/xdcr/cbs_xdcr.go @@ -116,6 +116,9 @@ func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucke // Start starts the XDCR replication. func (x *couchbaseServerManager) Start(ctx context.Context) error { + if x.replicationID != "" { + return ErrReplicationAlreadyRunning + } method := http.MethodPost body := url.Values{} body.Add("name", fmt.Sprintf("%s_%s", x.fromBucket.GetName(), x.toBucket.GetName())) @@ -156,7 +159,7 @@ func (x *couchbaseServerManager) Start(ctx context.Context) error { func (x *couchbaseServerManager) Stop(ctx context.Context) error { // replication is not started if x.replicationID == "" { - return nil + return ErrReplicationNotRunning } method := http.MethodDelete url := "/controller/cancelXDCR/" + url.PathEscape(x.replicationID) diff --git a/xdcr/replication.go b/xdcr/replication.go index d655647a8b..3f1c4bcc21 100644 --- a/xdcr/replication.go +++ b/xdcr/replication.go @@ -18,6 +18,9 @@ import ( "github.com/couchbaselabs/rosmar" ) +var ErrReplicationNotRunning = fmt.Errorf("Replication is not running") +var ErrReplicationAlreadyRunning = fmt.Errorf("Replication is already running") + // Manager represents a bucket to bucket replication. type Manager interface { // Start starts the replication. diff --git a/xdcr/rosmar_xdcr.go b/xdcr/rosmar_xdcr.go index 8b63be2b26..addc35b9d7 100644 --- a/xdcr/rosmar_xdcr.go +++ b/xdcr/rosmar_xdcr.go @@ -75,7 +75,6 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket, replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()), toBucketCollections: make(map[uint32]*rosmar.Collection), fromBucketKeyspaces: make(map[uint32]string), - terminator: make(chan bool), filterFunc: mobileXDCRFilter, }, nil @@ -209,6 +208,10 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve // Start starts the replication for all existing replications. Errors if there aren't corresponding named collections on each bucket. func (r *rosmarManager) Start(ctx context.Context) error { + if r.terminator != nil { + return ErrReplicationAlreadyRunning + } + r.terminator = make(chan bool) // set up replication to target all existing collections, and map to other collections scopes := make(map[string][]string) fromDataStores, err := r.fromBucket.ListDataStores() @@ -259,6 +262,9 @@ func (r *rosmarManager) Start(ctx context.Context) error { // Stop terminates the replication. func (r *rosmarManager) Stop(_ context.Context) error { + if r.terminator == nil { + return ErrReplicationNotRunning + } close(r.terminator) r.terminator = nil return nil diff --git a/xdcr/xdcr_test.go b/xdcr/xdcr_test.go index 852deb7d80..7ed4c2176e 100644 --- a/xdcr/xdcr_test.go +++ b/xdcr/xdcr_test.go @@ -45,11 +45,16 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) { } xdcr, err := NewXDCR(ctx, fromBucket, toBucket, opts) require.NoError(t, err) - err = xdcr.Start(ctx) - require.NoError(t, err) + require.NoError(t, xdcr.Start(ctx)) + defer func() { - assert.NoError(t, xdcr.Stop(ctx)) + // stop XDCR, will already be stopped if test doesn't fail early + err := xdcr.Stop(ctx) + if err != nil { + assert.Equal(t, ErrReplicationNotRunning, err) + } }() + require.ErrorIs(t, xdcr.Start(ctx), ErrReplicationAlreadyRunning) const ( syncDoc = "_sync:doc1doc2" attachmentDoc = "_sync:att2:foo" @@ -120,6 +125,9 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) { assert.Equal(c, totalDocsWritten+2, stats.DocsWritten) }, time.Second*5, time.Millisecond*100) + + require.NoError(t, xdcr.Stop(ctx)) + require.ErrorIs(t, xdcr.Stop(ctx), ErrReplicationNotRunning) } // getTwoBucketDataStores creates two data stores in separate buckets to run xdcr within. Returns a named collection or a default collection based on the global test configuration. From de21d568f8ac3bc8c894bc3cf0f87cc046ff6ae8 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 6 Dec 2024 14:53:27 -0500 Subject: [PATCH 2/5] lock setting up collections in the case that dcp feed is running when stopping and starting very quickly --- xdcr/rosmar_xdcr.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/xdcr/rosmar_xdcr.go b/xdcr/rosmar_xdcr.go index addc35b9d7..1cbb46d58c 100644 --- a/xdcr/rosmar_xdcr.go +++ b/xdcr/rosmar_xdcr.go @@ -14,6 +14,7 @@ import ( "errors" "fmt" "strings" + "sync" "sync/atomic" "golang.org/x/exp/maps" @@ -47,6 +48,7 @@ func (r replicatedDocLocation) String() string { type rosmarManager struct { filterFunc xdcrFilterFunc terminator chan bool + collectionsLock sync.RWMutex fromBucketKeyspaces map[uint32]string toBucketCollections map[uint32]*rosmar.Collection fromBucket *rosmar.Bucket @@ -84,6 +86,8 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket, func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEvent) bool { docID := string(event.Key) base.TracefCtx(ctx, base.KeyVV, "Got event %s, opcode: %s", docID, event.Opcode) + r.collectionsLock.RLock() + defer r.collectionsLock.RUnlock() col, ok := r.toBucketCollections[event.CollectionID] if !ok { base.ErrorfCtx(ctx, "This violates the assumption that all collections are mapped to a target collection. This should not happen. Found event=%+v", event) @@ -211,6 +215,8 @@ func (r *rosmarManager) Start(ctx context.Context) error { if r.terminator != nil { return ErrReplicationAlreadyRunning } + r.collectionsLock.Lock() + defer r.collectionsLock.Unlock() r.terminator = make(chan bool) // set up replication to target all existing collections, and map to other collections scopes := make(map[string][]string) From 9bf780e9f880bbde1f2f5779b8fd1f1bab4083f7 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 6 Dec 2024 15:43:04 -0500 Subject: [PATCH 3/5] switch to require and CollectT --- xdcr/xdcr_test.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/xdcr/xdcr_test.go b/xdcr/xdcr_test.go index 7ed4c2176e..1b7930916e 100644 --- a/xdcr/xdcr_test.go +++ b/xdcr/xdcr_test.go @@ -120,7 +120,9 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) { // stats are not updated in real time, so we need to wait a bit require.EventuallyWithT(t, func(c *assert.CollectT) { stats, err := xdcr.Stats(ctx) - assert.NoError(t, err) + if !assert.NoError(c, err) { + assert.NoError(c, err) + } assert.Equal(c, totalDocsFiltered+1, stats.MobileDocsFiltered) assert.Equal(c, totalDocsWritten+2, stats.DocsWritten) @@ -346,7 +348,7 @@ func TestVVObeyMou(t *testing.T) { require.Equal(t, expectedVV, vv) stats, err := xdcr.Stats(ctx) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, Stats{ DocsWritten: 1, DocsProcessed: 1, @@ -372,7 +374,7 @@ func TestVVObeyMou(t *testing.T) { requireWaitForXDCRDocsProcessed(t, xdcr, 2) stats, err = xdcr.Stats(ctx) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, Stats{ TargetNewerDocs: 1, DocsWritten: 1, @@ -431,7 +433,7 @@ func TestVVMouImport(t *testing.T) { require.Equal(t, expectedVV, vv) stats, err := xdcr.Stats(ctx) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, Stats{ DocsWritten: 1, DocsProcessed: 1, @@ -457,7 +459,7 @@ func TestVVMouImport(t *testing.T) { requireWaitForXDCRDocsProcessed(t, xdcr, 2) stats, err = xdcr.Stats(ctx) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, Stats{ TargetNewerDocs: 1, DocsWritten: 1, @@ -475,7 +477,7 @@ func TestVVMouImport(t *testing.T) { requireWaitForXDCRDocsProcessed(t, xdcr, 3) stats, err = xdcr.Stats(ctx) - assert.NoError(t, err) + require.NoError(t, err) require.Equal(t, Stats{ TargetNewerDocs: 1, DocsWritten: 2, @@ -730,7 +732,9 @@ func requireWaitForXDCRDocsProcessed(t *testing.T, xdcr Manager, expectedDocsPro ctx := base.TestCtx(t) require.EventuallyWithT(t, func(c *assert.CollectT) { stats, err := xdcr.Stats(ctx) - assert.NoError(t, err) + if !assert.NoError(c, err) { + return + } assert.Equal(c, expectedDocsProcessed, stats.DocsProcessed) }, time.Second*5, time.Millisecond*100) } From 8b9b8a2046fbecb2bc5912106a3037bd918403d9 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 6 Dec 2024 15:51:54 -0500 Subject: [PATCH 4/5] improve debug message --- xdcr/xdcr_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xdcr/xdcr_test.go b/xdcr/xdcr_test.go index 1b7930916e..26da2cce71 100644 --- a/xdcr/xdcr_test.go +++ b/xdcr/xdcr_test.go @@ -735,7 +735,7 @@ func requireWaitForXDCRDocsProcessed(t *testing.T, xdcr Manager, expectedDocsPro if !assert.NoError(c, err) { return } - assert.Equal(c, expectedDocsProcessed, stats.DocsProcessed) + assert.Equal(c, expectedDocsProcessed, stats.DocsProcessed, "all stats=%+v", stats) }, time.Second*5, time.Millisecond*100) } From cd31bd135c20729885a9a218f317a1279fa66bbb Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 9 Dec 2024 12:44:25 -0500 Subject: [PATCH 5/5] skip test with cbs --- xdcr/xdcr_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/xdcr/xdcr_test.go b/xdcr/xdcr_test.go index f198116a8c..b47d4ab7cf 100644 --- a/xdcr/xdcr_test.go +++ b/xdcr/xdcr_test.go @@ -633,6 +633,9 @@ func TestReplicateXattrs(t *testing.T) { // TestVVMultiActor verifies that updates by multiple actors (updates to different clusters/buckets) are properly // reflected in the HLV (cv and pv). func TestVVMultiActor(t *testing.T) { + if !base.UnitTestUrlIsWalrus() { + t.Skip("This test can fail with CBS due to CBS-4334 since a document without xattrs will be written to the target bucket, even if it is otherwise up to date") + } fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t) ctx := base.TestCtx(t) fromBucketSourceID, err := GetSourceID(ctx, fromBucket)