Skip to content

Commit

Permalink
CBG-4265 avoid panic in rosmar xdcr tests (#7231)
Browse files Browse the repository at this point in the history
* CBG-4265 avoid panic in rosmar xdcr tests

- return an error if xdcr is already running when Start is called, or
  already stopped when Stop is called
- allow rosmar xdcr to be restarted via Start/Stop/Start by resetting
  terminator
- don't return empty topology in Topologies which causes test panic
- enable rosmar multi actor conflict tests
- remove a test that is a duplicate of existing test

* lock setting up collections in the case that dcp feed is running when stopping and starting very quickly

* switch to require and CollectT

* improve debug message

* skip test with cbs
  • Loading branch information
torcolvin authored Dec 9, 2024
1 parent 2e2afce commit 113a4ef
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 97 deletions.
66 changes: 4 additions & 62 deletions topologytest/hlv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,10 @@ func TestHLVCreateDocumentMultiActor(t *testing.T) {
// - Wait for docs last write to be replicated to all other peers
func TestHLVCreateDocumentMultiActorConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV)
if base.UnitTestUrlIsWalrus() {
t.Skip("Panics against rosmar, CBG-4378")
} else {
if !base.UnitTestUrlIsWalrus() {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
for _, tc := range getMultiActorTestCases() {
if strings.Contains(tc.description(), "CBL") {
// Test case flakes given the WaitForDocVersion function only waits for a docID on the cbl peer. We need to be
// able to wait for a specific version to arrive over pull replication
t.Skip("We need to be able to wait for a specific version to arrive over pull replication, CBG-4257")
}
t.Run(tc.description(), func(t *testing.T) {
peers, replications := setupTests(t, tc.topology)

Expand Down Expand Up @@ -261,9 +254,7 @@ func TestHLVUpdateDocumentSingleActor(t *testing.T) {
// - Start replications and wait for last update to be replicated to all peers
func TestHLVUpdateDocumentMultiActorConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV)
if base.UnitTestUrlIsWalrus() {
t.Skip("Panics against rosmar, CBG-4378")
} else {
if !base.UnitTestUrlIsWalrus() {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
for _, tc := range getMultiActorTestCases() {
Expand Down Expand Up @@ -365,9 +356,7 @@ func TestHLVDeleteDocumentMultiActor(t *testing.T) {
// - Start replications and assert doc is deleted on all peers
func TestHLVDeleteDocumentMultiActorConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV)
if base.UnitTestUrlIsWalrus() {
t.Skip("Panics against rosmar, CBG-4378")
} else {
if !base.UnitTestUrlIsWalrus() {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
for _, tc := range getMultiActorTestCases() {
Expand Down Expand Up @@ -395,51 +384,6 @@ func TestHLVDeleteDocumentMultiActorConflict(t *testing.T) {
}
}

// TestHLVUpdateDeleteDocumentMultiActorConflict:
// - Create conflicting docs on each peer
// - Start replications
// - Wait for last write to be replicated to all peers
// - Stop replications
// - Update docs on all peers, then delete the doc on one peer
// - Start replications and assert doc is deleted on all peers (given the delete was the last write)
func TestHLVUpdateDeleteDocumentMultiActorConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV)
if base.UnitTestUrlIsWalrus() {
t.Skip("Panics against rosmar, CBG-4378")
} else {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
for _, tc := range getMultiActorTestCases() {
if strings.Contains(tc.description(), "CBL") {
// Test case flakes given the WaitForDocVersion function only waits for a docID on the cbl peer. We need to be
// able to wait for a specific version to arrive over pull replication
t.Skip("We need to be able to wait for a specific version to arrive over pull replication + unexpected body in proposeChanges: [304] issue, CBG-4257")
}
t.Run(tc.description(), func(t *testing.T) {
peerList := tc.PeerNames()
peers, replications := setupTests(t, tc.topology)
stopPeerReplications(replications)

docID := getDocID(t)
docVersion := createConflictingDocs(t, tc, peers, docID)

startPeerReplications(replications)
waitForVersionAndBody(t, tc, peers, docID, docVersion)

stopPeerReplications(replications)

_ = updateConflictingDocs(t, tc, peers, docID)

lastPeer := peerList[len(peerList)-1]
deleteVersion := peers[lastPeer].DeleteDocument(tc.collectionName(), docID)
t.Logf("deleteVersion: %+v", deleteVersion)

startPeerReplications(replications)
waitForDeletion(t, tc, peers, docID, lastPeer)
})
}
}

// TestHLVDeleteUpdateDocumentMultiActorConflict:
// - Create conflicting docs on each peer
// - Start replications
Expand Down Expand Up @@ -579,9 +523,7 @@ func TestHLVResurrectDocumentMultiActor(t *testing.T) {
// - Start replications and wait for last resurrection operation to be replicated to all peers
func TestHLVResurrectDocumentMultiActorConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV)
if base.UnitTestUrlIsWalrus() {
t.Skip("Panics against rosmar, CBG-4378")
} else {
if !base.UnitTestUrlIsWalrus() {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
for _, tc := range getMultiActorTestCases() {
Expand Down
44 changes: 22 additions & 22 deletions topologytest/topologies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,28 +208,28 @@ var Topologies = []Topology{
},
},
// topology 1.4 not present, no P2P supported yet
{
/*
Test topology 1.5
/*
{
Test topology 1.5
+ - - - - - - + +- - - - - - -+
' cluster A ' ' cluster B '
' +---------+ ' ' +---------+ '
' | cbs1 | ' <--> ' | cbs2 | '
' +---------+ ' ' +---------+ '
' +---------+ ' ' +---------+ '
' | sg1 | ' ' | sg2 | '
' +---------+ ' ' +---------+ '
+ - - - - - - + +- - - - - - -+
^ ^
| |
| |
| |
| +------+ |
+---> | cbl1 | <---+
+------+
*/
/* This test doesn't work yet, CouchbaseLiteMockPeer doesn't support writing data to multiple Sync Gateway peers yet
+ - - - - - - + +- - - - - - -+
' cluster A ' ' cluster B '
' +---------+ ' ' +---------+ '
' | cbs1 | ' <--> ' | cbs2 | '
' +---------+ ' ' +---------+ '
' +---------+ ' ' +---------+ '
' | sg1 | ' ' | sg2 | '
' +---------+ ' ' +---------+ '
+ - - - - - - + +- - - - - - -+
^ ^
| |
| |
| |
| +------+ |
+---> | cbl1 | <---+
+------+
*/
/* This test doesn't work yet, CouchbaseLiteMockPeer doesn't support writing data to multiple Sync Gateway peers yet
description: "Sync Gateway -> Couchbase Server -> Couchbase Server",
peers: map[string]PeerOptions{
"cbs1": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID1},
Expand Down Expand Up @@ -283,8 +283,8 @@ var Topologies = []Topology{
},
},
},
*/
},
*/
}

// simpleTopologies represents simplified topologies to make testing the integration test code easier.
Expand Down
5 changes: 4 additions & 1 deletion xdcr/cbs_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucke

// Start starts the XDCR replication.
func (x *couchbaseServerManager) Start(ctx context.Context) error {
if x.replicationID != "" {
return ErrReplicationAlreadyRunning
}
method := http.MethodPost
body := url.Values{}
body.Add("name", fmt.Sprintf("%s_%s", x.fromBucket.GetName(), x.toBucket.GetName()))
Expand Down Expand Up @@ -156,7 +159,7 @@ func (x *couchbaseServerManager) Start(ctx context.Context) error {
func (x *couchbaseServerManager) Stop(ctx context.Context) error {
// replication is not started
if x.replicationID == "" {
return nil
return ErrReplicationNotRunning
}
method := http.MethodDelete
url := "/controller/cancelXDCR/" + url.PathEscape(x.replicationID)
Expand Down
3 changes: 3 additions & 0 deletions xdcr/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/couchbaselabs/rosmar"
)

var ErrReplicationNotRunning = fmt.Errorf("Replication is not running")
var ErrReplicationAlreadyRunning = fmt.Errorf("Replication is already running")

// Manager represents a bucket to bucket replication.
type Manager interface {
// Start starts the replication.
Expand Down
14 changes: 13 additions & 1 deletion xdcr/rosmar_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"

"golang.org/x/exp/maps"
Expand Down Expand Up @@ -47,6 +48,7 @@ func (r replicatedDocLocation) String() string {
type rosmarManager struct {
filterFunc xdcrFilterFunc
terminator chan bool
collectionsLock sync.RWMutex
fromBucketKeyspaces map[uint32]string
toBucketCollections map[uint32]*rosmar.Collection
fromBucket *rosmar.Bucket
Expand Down Expand Up @@ -75,7 +77,6 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket,
replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()),
toBucketCollections: make(map[uint32]*rosmar.Collection),
fromBucketKeyspaces: make(map[uint32]string),
terminator: make(chan bool),
filterFunc: mobileXDCRFilter,
}, nil

Expand All @@ -85,6 +86,8 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket,
func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEvent) bool {
docID := string(event.Key)
base.TracefCtx(ctx, base.KeyVV, "Got event %s, opcode: %s", docID, event.Opcode)
r.collectionsLock.RLock()
defer r.collectionsLock.RUnlock()
col, ok := r.toBucketCollections[event.CollectionID]
if !ok {
base.ErrorfCtx(ctx, "This violates the assumption that all collections are mapped to a target collection. This should not happen. Found event=%+v", event)
Expand Down Expand Up @@ -209,6 +212,12 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve

// Start starts the replication for all existing replications. Errors if there aren't corresponding named collections on each bucket.
func (r *rosmarManager) Start(ctx context.Context) error {
if r.terminator != nil {
return ErrReplicationAlreadyRunning
}
r.collectionsLock.Lock()
defer r.collectionsLock.Unlock()
r.terminator = make(chan bool)
// set up replication to target all existing collections, and map to other collections
scopes := make(map[string][]string)
fromDataStores, err := r.fromBucket.ListDataStores()
Expand Down Expand Up @@ -259,6 +268,9 @@ func (r *rosmarManager) Start(ctx context.Context) error {

// Stop terminates the replication.
func (r *rosmarManager) Stop(_ context.Context) error {
if r.terminator == nil {
return ErrReplicationNotRunning
}
close(r.terminator)
r.terminator = nil
return nil
Expand Down
37 changes: 26 additions & 11 deletions xdcr/xdcr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) {
}
xdcr, err := NewXDCR(ctx, fromBucket, toBucket, opts)
require.NoError(t, err)
err = xdcr.Start(ctx)
require.NoError(t, err)
require.NoError(t, xdcr.Start(ctx))

defer func() {
assert.NoError(t, xdcr.Stop(ctx))
// stop XDCR, will already be stopped if test doesn't fail early
err := xdcr.Stop(ctx)
if err != nil {
assert.Equal(t, ErrReplicationNotRunning, err)
}
}()
require.ErrorIs(t, xdcr.Start(ctx), ErrReplicationAlreadyRunning)
const (
syncDoc = "_sync:doc1doc2"
attachmentDoc = "_sync:att2:foo"
Expand Down Expand Up @@ -115,11 +120,16 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) {
// stats are not updated in real time, so we need to wait a bit
require.EventuallyWithT(t, func(c *assert.CollectT) {
stats, err := xdcr.Stats(ctx)
assert.NoError(t, err)
if !assert.NoError(c, err) {
assert.NoError(c, err)
}
assert.Equal(c, totalDocsFiltered+1, stats.MobileDocsFiltered)
assert.Equal(c, totalDocsWritten+2, stats.DocsWritten)

}, time.Second*5, time.Millisecond*100)

require.NoError(t, xdcr.Stop(ctx))
require.ErrorIs(t, xdcr.Stop(ctx), ErrReplicationNotRunning)
}

// getTwoBucketDataStores creates two data stores in separate buckets to run xdcr within. Returns a named collection or a default collection based on the global test configuration.
Expand Down Expand Up @@ -338,7 +348,7 @@ func TestVVObeyMou(t *testing.T) {
require.Equal(t, expectedVV, vv)

stats, err := xdcr.Stats(ctx)
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, Stats{
DocsWritten: 1,
DocsProcessed: 1,
Expand All @@ -364,7 +374,7 @@ func TestVVObeyMou(t *testing.T) {

requireWaitForXDCRDocsProcessed(t, xdcr, 2)
stats, err = xdcr.Stats(ctx)
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, Stats{
TargetNewerDocs: 1,
DocsWritten: 1,
Expand Down Expand Up @@ -423,7 +433,7 @@ func TestVVMouImport(t *testing.T) {
require.Equal(t, expectedVV, vv)

stats, err := xdcr.Stats(ctx)
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, Stats{
DocsWritten: 1,
DocsProcessed: 1,
Expand All @@ -449,7 +459,7 @@ func TestVVMouImport(t *testing.T) {

requireWaitForXDCRDocsProcessed(t, xdcr, 2)
stats, err = xdcr.Stats(ctx)
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, Stats{
TargetNewerDocs: 1,
DocsWritten: 1,
Expand All @@ -467,7 +477,7 @@ func TestVVMouImport(t *testing.T) {
requireWaitForXDCRDocsProcessed(t, xdcr, 3)

stats, err = xdcr.Stats(ctx)
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, Stats{
TargetNewerDocs: 1,
DocsWritten: 2,
Expand Down Expand Up @@ -623,6 +633,9 @@ func TestReplicateXattrs(t *testing.T) {
// TestVVMultiActor verifies that updates by multiple actors (updates to different clusters/buckets) are properly
// reflected in the HLV (cv and pv).
func TestVVMultiActor(t *testing.T) {
if !base.UnitTestUrlIsWalrus() {
t.Skip("This test can fail with CBS due to CBS-4334 since a document without xattrs will be written to the target bucket, even if it is otherwise up to date")
}
fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t)
ctx := base.TestCtx(t)
fromBucketSourceID, err := GetSourceID(ctx, fromBucket)
Expand Down Expand Up @@ -722,8 +735,10 @@ func requireWaitForXDCRDocsProcessed(t *testing.T, xdcr Manager, expectedDocsPro
ctx := base.TestCtx(t)
require.EventuallyWithT(t, func(c *assert.CollectT) {
stats, err := xdcr.Stats(ctx)
assert.NoError(t, err)
assert.Equal(c, expectedDocsProcessed, stats.DocsProcessed)
if !assert.NoError(c, err) {
return
}
assert.Equal(c, expectedDocsProcessed, stats.DocsProcessed, "all stats=%+v", stats)
}, time.Second*5, time.Millisecond*100)
}

Expand Down

0 comments on commit 113a4ef

Please sign in to comment.