Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4265 avoid panic in rosmar xdcr tests #7231

Merged
merged 6 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 4 additions & 62 deletions topologytest/hlv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,10 @@ func TestHLVCreateDocumentMultiActor(t *testing.T) {
// - Wait for docs last write to be replicated to all other peers
func TestHLVCreateDocumentMultiActorConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV)
if base.UnitTestUrlIsWalrus() {
t.Skip("Panics against rosmar, CBG-4378")
} else {
if !base.UnitTestUrlIsWalrus() {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
for _, tc := range getMultiActorTestCases() {
if strings.Contains(tc.description(), "CBL") {
// Test case flakes given the WaitForDocVersion function only waits for a docID on the cbl peer. We need to be
// able to wait for a specific version to arrive over pull replication
t.Skip("We need to be able to wait for a specific version to arrive over pull replication, CBG-4257")
}
t.Run(tc.description(), func(t *testing.T) {
peers, replications := setupTests(t, tc.topology)

Expand Down Expand Up @@ -261,9 +254,7 @@ func TestHLVUpdateDocumentSingleActor(t *testing.T) {
// - Start replications and wait for last update to be replicated to all peers
func TestHLVUpdateDocumentMultiActorConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV)
if base.UnitTestUrlIsWalrus() {
t.Skip("Panics against rosmar, CBG-4378")
} else {
if !base.UnitTestUrlIsWalrus() {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
for _, tc := range getMultiActorTestCases() {
Expand Down Expand Up @@ -365,9 +356,7 @@ func TestHLVDeleteDocumentMultiActor(t *testing.T) {
// - Start replications and assert doc is deleted on all peers
func TestHLVDeleteDocumentMultiActorConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV)
if base.UnitTestUrlIsWalrus() {
t.Skip("Panics against rosmar, CBG-4378")
} else {
if !base.UnitTestUrlIsWalrus() {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
for _, tc := range getMultiActorTestCases() {
Expand Down Expand Up @@ -395,51 +384,6 @@ func TestHLVDeleteDocumentMultiActorConflict(t *testing.T) {
}
}

// TestHLVUpdateDeleteDocumentMultiActorConflict:
// - Create conflicting docs on each peer
// - Start replications
// - Wait for last write to be replicated to all peers
// - Stop replications
// - Update docs on all peers, then delete the doc on one peer
// - Start replications and assert doc is deleted on all peers (given the delete was the last write)
func TestHLVUpdateDeleteDocumentMultiActorConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV)
if base.UnitTestUrlIsWalrus() {
t.Skip("Panics against rosmar, CBG-4378")
} else {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
for _, tc := range getMultiActorTestCases() {
if strings.Contains(tc.description(), "CBL") {
// Test case flakes given the WaitForDocVersion function only waits for a docID on the cbl peer. We need to be
// able to wait for a specific version to arrive over pull replication
t.Skip("We need to be able to wait for a specific version to arrive over pull replication + unexpected body in proposeChanges: [304] issue, CBG-4257")
}
t.Run(tc.description(), func(t *testing.T) {
peerList := tc.PeerNames()
peers, replications := setupTests(t, tc.topology)
stopPeerReplications(replications)

docID := getDocID(t)
docVersion := createConflictingDocs(t, tc, peers, docID)

startPeerReplications(replications)
waitForVersionAndBody(t, tc, peers, docID, docVersion)

stopPeerReplications(replications)

_ = updateConflictingDocs(t, tc, peers, docID)

lastPeer := peerList[len(peerList)-1]
deleteVersion := peers[lastPeer].DeleteDocument(tc.collectionName(), docID)
t.Logf("deleteVersion: %+v", deleteVersion)

startPeerReplications(replications)
waitForDeletion(t, tc, peers, docID, lastPeer)
})
}
}

// TestHLVDeleteUpdateDocumentMultiActorConflict:
// - Create conflicting docs on each peer
// - Start replications
Expand Down Expand Up @@ -579,9 +523,7 @@ func TestHLVResurrectDocumentMultiActor(t *testing.T) {
// - Start replications and wait for last resurrection operation to be replicated to all peers
func TestHLVResurrectDocumentMultiActorConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyVV)
if base.UnitTestUrlIsWalrus() {
t.Skip("Panics against rosmar, CBG-4378")
} else {
if !base.UnitTestUrlIsWalrus() {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
for _, tc := range getMultiActorTestCases() {
Expand Down
44 changes: 22 additions & 22 deletions topologytest/topologies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,28 +208,28 @@ var Topologies = []Topology{
},
},
// topology 1.4 not present, no P2P supported yet
{
/*
Test topology 1.5
/*
{
Test topology 1.5

+ - - - - - - + +- - - - - - -+
' cluster A ' ' cluster B '
' +---------+ ' ' +---------+ '
' | cbs1 | ' <--> ' | cbs2 | '
' +---------+ ' ' +---------+ '
' +---------+ ' ' +---------+ '
' | sg1 | ' ' | sg2 | '
' +---------+ ' ' +---------+ '
+ - - - - - - + +- - - - - - -+
^ ^
| |
| |
| |
| +------+ |
+---> | cbl1 | <---+
+------+
*/
/* This test doesn't work yet, CouchbaseLiteMockPeer doesn't support writing data to multiple Sync Gateway peers yet
+ - - - - - - + +- - - - - - -+
' cluster A ' ' cluster B '
' +---------+ ' ' +---------+ '
' | cbs1 | ' <--> ' | cbs2 | '
' +---------+ ' ' +---------+ '
' +---------+ ' ' +---------+ '
' | sg1 | ' ' | sg2 | '
' +---------+ ' ' +---------+ '
+ - - - - - - + +- - - - - - -+
^ ^
| |
| |
| |
| +------+ |
+---> | cbl1 | <---+
+------+
*/
/* This test doesn't work yet, CouchbaseLiteMockPeer doesn't support writing data to multiple Sync Gateway peers yet
description: "Sync Gateway -> Couchbase Server -> Couchbase Server",
peers: map[string]PeerOptions{
"cbs1": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID1},
Expand Down Expand Up @@ -283,8 +283,8 @@ var Topologies = []Topology{
},
},
},
*/
},
*/
}

// simpleTopologies represents simplified topologies to make testing the integration test code easier.
Expand Down
5 changes: 4 additions & 1 deletion xdcr/cbs_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucke

// Start starts the XDCR replication.
func (x *couchbaseServerManager) Start(ctx context.Context) error {
if x.replicationID != "" {
return ErrReplicationAlreadyRunning
}
method := http.MethodPost
body := url.Values{}
body.Add("name", fmt.Sprintf("%s_%s", x.fromBucket.GetName(), x.toBucket.GetName()))
Expand Down Expand Up @@ -156,7 +159,7 @@ func (x *couchbaseServerManager) Start(ctx context.Context) error {
func (x *couchbaseServerManager) Stop(ctx context.Context) error {
// replication is not started
if x.replicationID == "" {
return nil
return ErrReplicationNotRunning
}
method := http.MethodDelete
url := "/controller/cancelXDCR/" + url.PathEscape(x.replicationID)
Expand Down
3 changes: 3 additions & 0 deletions xdcr/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/couchbaselabs/rosmar"
)

var ErrReplicationNotRunning = fmt.Errorf("Replication is not running")
var ErrReplicationAlreadyRunning = fmt.Errorf("Replication is already running")

// Manager represents a bucket to bucket replication.
type Manager interface {
// Start starts the replication.
Expand Down
14 changes: 13 additions & 1 deletion xdcr/rosmar_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"

"golang.org/x/exp/maps"
Expand Down Expand Up @@ -47,6 +48,7 @@ func (r replicatedDocLocation) String() string {
type rosmarManager struct {
filterFunc xdcrFilterFunc
terminator chan bool
collectionsLock sync.RWMutex
fromBucketKeyspaces map[uint32]string
toBucketCollections map[uint32]*rosmar.Collection
fromBucket *rosmar.Bucket
Expand Down Expand Up @@ -75,7 +77,6 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket,
replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()),
toBucketCollections: make(map[uint32]*rosmar.Collection),
fromBucketKeyspaces: make(map[uint32]string),
terminator: make(chan bool),
filterFunc: mobileXDCRFilter,
}, nil

Expand All @@ -85,6 +86,8 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket,
func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEvent) bool {
docID := string(event.Key)
base.TracefCtx(ctx, base.KeyVV, "Got event %s, opcode: %s", docID, event.Opcode)
r.collectionsLock.RLock()
defer r.collectionsLock.RUnlock()
col, ok := r.toBucketCollections[event.CollectionID]
if !ok {
base.ErrorfCtx(ctx, "This violates the assumption that all collections are mapped to a target collection. This should not happen. Found event=%+v", event)
Expand Down Expand Up @@ -209,6 +212,12 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve

// Start starts the replication for all existing replications. Errors if there aren't corresponding named collections on each bucket.
func (r *rosmarManager) Start(ctx context.Context) error {
if r.terminator != nil {
return ErrReplicationAlreadyRunning
}
r.collectionsLock.Lock()
defer r.collectionsLock.Unlock()
r.terminator = make(chan bool)
// set up replication to target all existing collections, and map to other collections
scopes := make(map[string][]string)
fromDataStores, err := r.fromBucket.ListDataStores()
Expand Down Expand Up @@ -259,6 +268,9 @@ func (r *rosmarManager) Start(ctx context.Context) error {

// Stop terminates the replication.
func (r *rosmarManager) Stop(_ context.Context) error {
if r.terminator == nil {
return ErrReplicationNotRunning
}
close(r.terminator)
r.terminator = nil
return nil
Expand Down
34 changes: 23 additions & 11 deletions xdcr/xdcr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) {
}
xdcr, err := NewXDCR(ctx, fromBucket, toBucket, opts)
require.NoError(t, err)
err = xdcr.Start(ctx)
require.NoError(t, err)
require.NoError(t, xdcr.Start(ctx))

defer func() {
assert.NoError(t, xdcr.Stop(ctx))
// stop XDCR, will already be stopped if test doesn't fail early
err := xdcr.Stop(ctx)
if err != nil {
assert.Equal(t, ErrReplicationNotRunning, err)
}
}()
require.ErrorIs(t, xdcr.Start(ctx), ErrReplicationAlreadyRunning)
const (
syncDoc = "_sync:doc1doc2"
attachmentDoc = "_sync:att2:foo"
Expand Down Expand Up @@ -115,11 +120,16 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) {
// stats are not updated in real time, so we need to wait a bit
require.EventuallyWithT(t, func(c *assert.CollectT) {
stats, err := xdcr.Stats(ctx)
assert.NoError(t, err)
if !assert.NoError(c, err) {
assert.NoError(c, err)
}
assert.Equal(c, totalDocsFiltered+1, stats.MobileDocsFiltered)
assert.Equal(c, totalDocsWritten+2, stats.DocsWritten)

}, time.Second*5, time.Millisecond*100)

require.NoError(t, xdcr.Stop(ctx))
require.ErrorIs(t, xdcr.Stop(ctx), ErrReplicationNotRunning)
}

// getTwoBucketDataStores creates two data stores in separate buckets to run xdcr within. Returns a named collection or a default collection based on the global test configuration.
Expand Down Expand Up @@ -338,7 +348,7 @@ func TestVVObeyMou(t *testing.T) {
require.Equal(t, expectedVV, vv)

stats, err := xdcr.Stats(ctx)
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, Stats{
DocsWritten: 1,
DocsProcessed: 1,
Expand All @@ -364,7 +374,7 @@ func TestVVObeyMou(t *testing.T) {

requireWaitForXDCRDocsProcessed(t, xdcr, 2)
stats, err = xdcr.Stats(ctx)
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, Stats{
TargetNewerDocs: 1,
DocsWritten: 1,
Expand Down Expand Up @@ -423,7 +433,7 @@ func TestVVMouImport(t *testing.T) {
require.Equal(t, expectedVV, vv)

stats, err := xdcr.Stats(ctx)
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, Stats{
DocsWritten: 1,
DocsProcessed: 1,
Expand All @@ -449,7 +459,7 @@ func TestVVMouImport(t *testing.T) {

requireWaitForXDCRDocsProcessed(t, xdcr, 2)
stats, err = xdcr.Stats(ctx)
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, Stats{
TargetNewerDocs: 1,
DocsWritten: 1,
Expand All @@ -467,7 +477,7 @@ func TestVVMouImport(t *testing.T) {
requireWaitForXDCRDocsProcessed(t, xdcr, 3)

stats, err = xdcr.Stats(ctx)
assert.NoError(t, err)
require.NoError(t, err)
require.Equal(t, Stats{
TargetNewerDocs: 1,
DocsWritten: 2,
Expand Down Expand Up @@ -722,8 +732,10 @@ func requireWaitForXDCRDocsProcessed(t *testing.T, xdcr Manager, expectedDocsPro
ctx := base.TestCtx(t)
require.EventuallyWithT(t, func(c *assert.CollectT) {
stats, err := xdcr.Stats(ctx)
assert.NoError(t, err)
assert.Equal(c, expectedDocsProcessed, stats.DocsProcessed)
if !assert.NoError(c, err) {
return
}
assert.Equal(c, expectedDocsProcessed, stats.DocsProcessed, "all stats=%+v", stats)
}, time.Second*5, time.Millisecond*100)
}

Expand Down
Loading