Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4365 rosmar xdcr, use _mou.cas for conflict resolution #7206

Merged
merged 4 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,8 +2143,8 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(

// compute mouMatch before the callback modifies doc.MetadataOnlyUpdate
mouMatch := false
if doc.MetadataOnlyUpdate != nil && base.HexCasToUint64(doc.MetadataOnlyUpdate.CAS) == doc.Cas {
mouMatch = base.HexCasToUint64(doc.MetadataOnlyUpdate.CAS) == doc.Cas
if doc.MetadataOnlyUpdate != nil && doc.MetadataOnlyUpdate.CAS() == doc.Cas {
mouMatch = doc.MetadataOnlyUpdate.CAS() == doc.Cas
base.DebugfCtx(ctx, base.KeyVV, "updateDoc(%q): _mou:%+v Metadata-only update match:%t", base.UD(doc.ID), doc.MetadataOnlyUpdate, mouMatch)
} else {
base.DebugfCtx(ctx, base.KeyVV, "updateDoc(%q): has no _mou", base.UD(doc.ID))
Expand Down Expand Up @@ -2382,7 +2382,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do

updatedDoc.IsTombstone = currentRevFromHistory.Deleted
if doc.MetadataOnlyUpdate != nil {
if doc.MetadataOnlyUpdate.CAS != "" {
if doc.MetadataOnlyUpdate.HexCAS != "" {
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas))
}
} else {
Expand Down Expand Up @@ -2445,8 +2445,8 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
} else if doc != nil {
// Update the in-memory CAS values to match macro-expanded values
doc.Cas = casOut
if doc.MetadataOnlyUpdate != nil && doc.MetadataOnlyUpdate.CAS == expandMacroCASValueString {
doc.MetadataOnlyUpdate.CAS = base.CasToString(casOut)
if doc.MetadataOnlyUpdate != nil && doc.MetadataOnlyUpdate.HexCAS == expandMacroCASValueString {
doc.MetadataOnlyUpdate.HexCAS = base.CasToString(casOut)
}
// update the doc's HLV defined post macro expansion
doc = postWriteUpdateHLV(doc, casOut)
Expand Down
25 changes: 18 additions & 7 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,25 @@ type ChannelSetEntry struct {
Compacted bool `json:"compacted,omitempty"`
}

// MetadataOnlyUpdate represents a cas value of a document modification if it only updated xattrs and not the document body. The previous cas and revSeqNo are stored as the version of the document before any metadata was modified. This is serialized as _mou.
type MetadataOnlyUpdate struct {
CAS string `json:"cas,omitempty"`
PreviousCAS string `json:"pCas,omitempty"`
HexCAS string `json:"cas,omitempty"` // 0x0 hex value from Couchbase Server
PreviousHexCAS string `json:"pCas,omitempty"` // 0x0 hex value from Couchbase Server
PreviousRevSeqNo uint64 `json:"pRev,omitempty"`
}

func (m *MetadataOnlyUpdate) String() string {
return fmt.Sprintf("{CAS:%d PreviousCAS:%d PreviousRevSeqNo:%d}", base.HexCasToUint64(m.CAS), base.HexCasToUint64(m.PreviousCAS), m.PreviousRevSeqNo)
return fmt.Sprintf("{CAS:%d PreviousCAS:%d PreviousRevSeqNo:%d}", m.CAS(), m.PreviousCAS(), m.PreviousRevSeqNo)
}

// CAS returns the CAS value as a uint64
func (m *MetadataOnlyUpdate) CAS() uint64 {
return base.HexCasToUint64(m.HexCAS)
}

// PreviousCAS returns the previous CAS value as a uint64
func (m *MetadataOnlyUpdate) PreviousCAS() uint64 {
return base.HexCasToUint64(m.PreviousHexCAS)
}

// The sync-gateway metadata stored in the "_sync" property of a Couchbase document.
Expand Down Expand Up @@ -1296,15 +1307,15 @@ func (doc *Document) MarshalWithXattrs() (data, syncXattr, vvXattr, mouXattr, gl
func computeMetadataOnlyUpdate(currentCas uint64, revNo uint64, currentMou *MetadataOnlyUpdate) *MetadataOnlyUpdate {
var prevCas string
currentCasString := base.CasToString(currentCas)
if currentMou != nil && currentCasString == currentMou.CAS {
prevCas = currentMou.PreviousCAS
if currentMou != nil && currentCasString == currentMou.HexCAS {
prevCas = currentMou.PreviousHexCAS
} else {
prevCas = currentCasString
}

metadataOnlyUpdate := &MetadataOnlyUpdate{
CAS: expandMacroCASValueString, // when non-empty, this is replaced with cas macro expansion
PreviousCAS: prevCas,
HexCAS: expandMacroCASValueString, // when non-empty, this is replaced with cas macro expansion
PreviousHexCAS: prevCas,
PreviousRevSeqNo: revNo,
}
return metadataOnlyUpdate
Expand Down
32 changes: 16 additions & 16 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ func TestHLVImport(t *testing.T) {
},
expectedMou: func(output *outputData) *MetadataOnlyUpdate {
return &MetadataOnlyUpdate{
CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)),
HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)),
PreviousRevSeqNo: output.preImportRevSeqNo,
}
},
Expand All @@ -281,8 +281,8 @@ func TestHLVImport(t *testing.T) {
},
expectedMou: func(output *outputData) *MetadataOnlyUpdate {
return &MetadataOnlyUpdate{
CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)),
HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)),
PreviousRevSeqNo: output.preImportRevSeqNo,
}
},
Expand All @@ -302,8 +302,8 @@ func TestHLVImport(t *testing.T) {
},
expectedMou: func(output *outputData) *MetadataOnlyUpdate {
return &MetadataOnlyUpdate{
CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)),
HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)),
PreviousRevSeqNo: output.preImportRevSeqNo,
}
},
Expand All @@ -324,7 +324,7 @@ func TestHLVImport(t *testing.T) {
_, xattrs, _, err := collection.dataStore.GetWithXattrs(ctx, docID, []string{base.VirtualXattrRevSeqNo})
require.NoError(t, err)
mou := &MetadataOnlyUpdate{
PreviousCAS: string(base.Uint64CASToLittleEndianHex(cas)),
PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(cas)),
PreviousRevSeqNo: RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo]),
}
opts := &sgbucket.MutateInOptions{
Expand All @@ -337,8 +337,8 @@ func TestHLVImport(t *testing.T) {
},
expectedMou: func(output *outputData) *MetadataOnlyUpdate {
return &MetadataOnlyUpdate{
CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousCAS: output.preImportMou.PreviousCAS,
HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousHexCAS: output.preImportMou.PreviousHexCAS,
PreviousRevSeqNo: output.preImportRevSeqNo,
}
},
Expand All @@ -355,17 +355,17 @@ func TestHLVImport(t *testing.T) {
_, xattrs, _, err := collection.dataStore.GetWithXattrs(ctx, docID, []string{base.VirtualXattrRevSeqNo})
require.NoError(t, err)
mou := &MetadataOnlyUpdate{
CAS: "invalid",
PreviousCAS: string(base.Uint64CASToLittleEndianHex(cas)),
HexCAS: "invalid",
PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(cas)),
PreviousRevSeqNo: RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo]),
}
_, err = collection.dataStore.UpdateXattrs(ctx, docID, 0, cas, map[string][]byte{base.MouXattrName: base.MustJSONMarshal(t, mou)}, nil)
require.NoError(t, err)
},
expectedMou: func(output *outputData) *MetadataOnlyUpdate {
return &MetadataOnlyUpdate{
CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)),
HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)),
PreviousRevSeqNo: output.preImportRevSeqNo,
}
},
Expand All @@ -389,7 +389,7 @@ func TestHLVImport(t *testing.T) {
require.NoError(t, err)

mou := &MetadataOnlyUpdate{
PreviousCAS: string(base.Uint64CASToLittleEndianHex(cas)),
PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(cas)),
PreviousRevSeqNo: RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo]),
}
opts := &sgbucket.MutateInOptions{
Expand All @@ -402,8 +402,8 @@ func TestHLVImport(t *testing.T) {
},
expectedMou: func(output *outputData) *MetadataOnlyUpdate {
return &MetadataOnlyUpdate{
CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousCAS: output.preImportMou.PreviousCAS,
HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)),
PreviousHexCAS: output.preImportMou.PreviousHexCAS,
PreviousRevSeqNo: output.preImportRevSeqNo,
}
},
Expand Down
16 changes: 8 additions & 8 deletions db/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestFeedImport(t *testing.T) {
mouXattr, mouOk := xattrs[base.MouXattrName]
require.True(t, mouOk)
require.NoError(t, base.JSONUnmarshal(mouXattr, &mou))
require.Equal(t, base.CasToString(writeCas), mou.PreviousCAS)
require.Equal(t, base.CasToString(importCas), mou.CAS)
require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS)
require.Equal(t, base.CasToString(importCas), mou.HexCAS)
} else {
// Expect not found fetching mou xattr
require.Error(t, err)
Expand Down Expand Up @@ -105,8 +105,8 @@ func TestOnDemandImportMou(t *testing.T) {

if db.UseMou() {
require.NotNil(t, doc.MetadataOnlyUpdate)
require.Equal(t, base.CasToString(writeCas), doc.MetadataOnlyUpdate.PreviousCAS)
require.Equal(t, base.CasToString(doc.Cas), doc.MetadataOnlyUpdate.CAS)
require.Equal(t, base.CasToString(writeCas), doc.MetadataOnlyUpdate.PreviousHexCAS)
require.Equal(t, base.CasToString(doc.Cas), doc.MetadataOnlyUpdate.HexCAS)
} else {
require.Nil(t, doc.MetadataOnlyUpdate)
}
Expand Down Expand Up @@ -138,8 +138,8 @@ func TestOnDemandImportMou(t *testing.T) {
var mou *MetadataOnlyUpdate
require.True(t, mouOk)
require.NoError(t, base.JSONUnmarshal(mouXattr, &mou))
require.Equal(t, base.CasToString(writeCas), mou.PreviousCAS)
require.Equal(t, base.CasToString(importCas), mou.CAS)
require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS)
require.Equal(t, base.CasToString(importCas), mou.HexCAS)
} else {
// expect not found fetching mou xattr
require.Error(t, err)
Expand Down Expand Up @@ -940,8 +940,8 @@ func TestMetadataOnlyUpdate(t *testing.T) {
previousRev := syncData.CurrentRev

// verify mou contents
require.Equal(t, base.CasToString(writeCas), mou.PreviousCAS)
require.Equal(t, base.CasToString(importCas), mou.CAS)
require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS)
require.Equal(t, base.CasToString(importCas), mou.HexCAS)

// 3. Update the previous SDK write via SGW, ensure mou isn't updated again
updatedBody := Body{"_rev": previousRev, "foo": "baz"}
Expand Down
6 changes: 1 addition & 5 deletions topologytest/hlv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,7 @@ func TestHLVUpdateDocumentSingleActor(t *testing.T) {
if strings.HasPrefix(tc.activePeerID, "cbl") {
t.Skip("Skipping Couchbase Lite test, returns unexpected body in proposeChanges: [304], CBG-4257")
}
if base.UnitTestUrlIsWalrus() {
t.Skip("rosmar consistent failure CBG-4365")
} else {
t.Skip("intermittent failure in Couchbase Server CBG-4329")
}
t.Skip("intermittent failure in Couchbase Server and rosmar CBG-4329")
peers, _ := setupTests(t, tc.topology, tc.activePeerID)

body1 := []byte(fmt.Sprintf(`{"peer": "%s", "topology": "%s", "write": 1}`, tc.activePeerID, tc.description()))
Expand Down
84 changes: 60 additions & 24 deletions xdcr/rosmar_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ import (
"github.com/couchbaselabs/rosmar"
)

// replicatedDocLocation represents whether a document is from the source or target bucket.
type replicatedDocLocation uint8

const (
sourceDoc replicatedDocLocation = iota
targetDoc
)

func (r replicatedDocLocation) String() string {
switch r {
case sourceDoc:
return "source"
case targetDoc:
return "target"
default:
return "unknown"
}
}

// rosmarManager implements a XDCR bucket to bucket replication within rosmar.
type rosmarManager struct {
filterFunc xdcrFilterFunc
Expand Down Expand Up @@ -65,7 +84,7 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket,
// processEvent processes a DCP event coming from a toBucket and replicates it to the target datastore.
func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEvent) bool {
docID := string(event.Key)
base.TracefCtx(ctx, base.KeySGTest, "Got event %s, opcode: %s", docID, event.Opcode)
base.TracefCtx(ctx, base.KeyVV, "Got event %s, opcode: %s", docID, event.Opcode)
col, ok := r.toBucketCollections[event.CollectionID]
if !ok {
base.ErrorfCtx(ctx, "This violates the assumption that all collections are mapped to a target collection. This should not happen. Found event=%+v", event)
Expand All @@ -78,7 +97,7 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve
case sgbucket.FeedOpDeletion, sgbucket.FeedOpMutation:
// Filter out events if we have a non XDCR filter
if r.filterFunc != nil && !r.filterFunc(&event) {
base.TracefCtx(ctx, base.KeySGTest, "Filtering doc %s", docID)
base.TracefCtx(ctx, base.KeyVV, "Filtering doc %s", docID)
r.mobileDocsFiltered.Add(1)
return true
}
Expand All @@ -98,26 +117,16 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve
return false
}

// When doing the evaluation of cas, we want to ignore import mutations, marked with _mou.cas == cas. In that case, we will just use the _vv.cvCAS for conflict resolution. If _mou.cas is present but out of date, continue to use _vv.ver.
sourceCas := event.Cas
if sourceMou != nil && base.HexCasToUint64(sourceMou.CAS) == sourceCas && sourceHLV != nil {
sourceCas = sourceHLV.CurrentVersionCAS
base.InfofCtx(ctx, base.KeySGTest, "XDCR doc:%s source _mou.cas=cas (%d), using _vv.cvCAS (%d) for conflict resolution", docID, event.Cas, sourceCas)
}
targetCas := actualTargetCas
actualSourceCas := event.Cas
conflictResolutionSourceCas := getConflictResolutionCas(ctx, docID, sourceDoc, actualSourceCas, sourceHLV, sourceMou)

targetHLV, targetMou, err := getHLVAndMou(targetXattrs)
if err != nil {
base.WarnfCtx(ctx, "Replicating doc %s, could not get target hlv and mou: %s", event.Key, err)
r.errorCount.Add(1)
return false
}
if targetMou != nil && targetHLV != nil {
// _mou.CAS matches the CAS value, use the _vv.cvCAS for conflict resolution
if base.HexCasToUint64(targetMou.CAS) == targetCas {
targetCas = targetHLV.CurrentVersionCAS
base.InfofCtx(ctx, base.KeySGTest, "XDCR doc:%s target _mou.cas=cas (%d), using _vv.cvCAS (%d) for conflict resolution", docID, targetCas, targetHLV.CurrentVersionCAS)
}
}
conflictResolutionTargetCas := getConflictResolutionCas(ctx, docID, targetDoc, actualTargetCas, targetHLV, targetMou)

/* full LWW conflict resolution is implemented in rosmar. There is no need to implement this since CAS will always be unique due to rosmar limitations.

Expand Down Expand Up @@ -159,17 +168,16 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve

*/

if sourceCas <= targetCas {
base.InfofCtx(ctx, base.KeySGTest, "XDCR doc:%s skipping replication since sourceCas (%d) < targetCas (%d)", docID, sourceCas, targetCas)
if conflictResolutionSourceCas <= conflictResolutionTargetCas {
base.InfofCtx(ctx, base.KeyVV, "XDCR doc:%s skipping replication since sourceCas (%d) < targetCas (%d)", docID, conflictResolutionSourceCas, conflictResolutionTargetCas)
r.targetNewerDocs.Add(1)
base.TracefCtx(ctx, base.KeySGTest, "Skipping replicating doc %s, cas %d <= %d", docID, event.Cas, targetCas)
return true
} /* else if sourceCas == targetCas {
// CBG-4334, check datatype for targetXattrs to see if there are any xattrs present
hasSourceXattrs := event.DataType&sgbucket.FeedDataTypeXattr != 0
hasTargetXattrs := len(targetXattrs) > 0
if !(hasSourceXattrs && !hasTargetXattrs) {
base.InfofCtx(ctx, base.KeySGTest, "skipping %q skipping replication since sourceCas (%d) < targetCas (%d)", docID, sourceCas, targetCas)
base.InfofCtx(ctx, base.KeyVV, "skipping %q skipping replication since sourceCas (%d) < targetCas (%d)", docID, sourceCas, targetCas)
return true
}
}
Expand All @@ -178,13 +186,13 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve
if targetSyncXattr, ok := targetXattrs[base.SyncXattrName]; ok {
newXattrs[base.SyncXattrName] = targetSyncXattr
}
err = updateHLV(newXattrs, sourceHLV, sourceMou, r.fromBucketSourceID, event.Cas)
err = updateHLV(newXattrs, sourceHLV, sourceMou, r.fromBucketSourceID, actualSourceCas)
if err != nil {
base.WarnfCtx(ctx, "Replicating doc %s, could not update hlv: %s", event.Key, err)
r.errorCount.Add(1)
return false
}
base.InfofCtx(ctx, base.KeySGTest, "Replicating doc %q, with cas (%d), body %s, xattrsKeys: %+v", event.Key, event.Cas, string(body), maps.Keys(newXattrs))
base.InfofCtx(ctx, base.KeyVV, "Replicating doc %q, with cas (%d), body %s, xattrsKeys: %+v", event.Key, actualSourceCas, string(body), maps.Keys(newXattrs))
err = opWithMeta(ctx, col, actualTargetCas, newXattrs, body, &event)
if err != nil {
base.WarnfCtx(ctx, "Replicating doc %s, could not write doc: %s", event.Key, err)
Expand Down Expand Up @@ -342,10 +350,14 @@ func getHLVAndMou(xattrs map[string][]byte) (*db.HybridLogicalVector, *db.Metada
return hlv, mou, nil
}

// updateHLV will update the xattrs on the target document considering the source's HLV, _mou, sourceID and cas.
func updateHLV(xattrs map[string][]byte, sourceHLV *db.HybridLogicalVector, sourceMou *db.MetadataOnlyUpdate, sourceID string, sourceCas uint64) error {
// TODO: read existing targetXattrs[base.VvXattrName] and update the pv CBG-4250. This will need to merge pv from sourceHLV and targetHLV.
var targetHLV *db.HybridLogicalVector
if sourceHLV != nil {
// TODO: read existing targetXattrs[base.VvXattrName] and update the pv CBG-4250
// if source vv.cvCas == cas, the _vv.cv, _vv.cvCAS from the source is correct and we can use it directly.
sourcecvCASMatch := sourceHLV != nil && sourceHLV.CurrentVersionCAS == sourceCas
sourceWasImport := sourceMou != nil && sourceMou.CAS() == sourceCas
if sourceHLV != nil && (sourceWasImport || sourcecvCASMatch) {
targetHLV = sourceHLV
} else {
hlv := db.NewHybridLogicalVector()
Expand All @@ -365,6 +377,10 @@ func updateHLV(xattrs map[string][]byte, sourceHLV *db.HybridLogicalVector, sour
return err
}
if sourceMou != nil {
// removing _mou.cas and _mou.pRev matches cbs xdcr behavior.
// CBS xdcr maybe should clear _mou.pCas as well, but it is not a problem since all checks for _mou.cas should check current cas for _mou being up to date.
sourceMou.HexCAS = ""
sourceMou.PreviousRevSeqNo = 0
var err error
xattrs[base.MouXattrName], err = json.Marshal(sourceMou)
if err != nil {
Expand All @@ -373,3 +389,23 @@ func updateHLV(xattrs map[string][]byte, sourceHLV *db.HybridLogicalVector, sour
}
return nil
}

// getConflictResolutionCas returns cas for conflict resolution.
// If _mou.cas == actualCas, assume _vv is up to date and use _vv.cvCAS
// Otherwise, return actualCas
func getConflictResolutionCas(ctx context.Context, docID string, location replicatedDocLocation, actualCas uint64, hlv *db.HybridLogicalVector, mou *db.MetadataOnlyUpdate) uint64 {
if mou == nil {
return actualCas
}
// _mou.CAS is out of date, ignoring
if mou.CAS() != actualCas {
return actualCas
}
if hlv == nil {
base.InfofCtx(ctx, base.KeyVV, "XDCR doc:%s %s _mou.cas=cas (%d), but there is no HLV, using 0 for conflict resolution to match behavior of Couchbase Server", docID, location, actualCas)
return 0
}
// _mou.CAS matches the CAS value, use the _vv.cvCAS for conflict resolution
base.InfofCtx(ctx, base.KeyVV, "XDCR doc:%s %s _mou.cas=cas (%d), using _vv.cvCAS (%d) for conflict resolution", docID, location, actualCas, hlv.CurrentVersionCAS)
return hlv.CurrentVersionCAS
}
Loading