diff --git a/db/crud.go b/db/crud.go index 2e35b70110..9807d27d1f 100644 --- a/db/crud.go +++ b/db/crud.go @@ -2143,8 +2143,8 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc( // compute mouMatch before the callback modifies doc.MetadataOnlyUpdate mouMatch := false - if doc.MetadataOnlyUpdate != nil && base.HexCasToUint64(doc.MetadataOnlyUpdate.CAS) == doc.Cas { - mouMatch = base.HexCasToUint64(doc.MetadataOnlyUpdate.CAS) == doc.Cas + if doc.MetadataOnlyUpdate != nil && doc.MetadataOnlyUpdate.CAS() == doc.Cas { + mouMatch = doc.MetadataOnlyUpdate.CAS() == doc.Cas base.DebugfCtx(ctx, base.KeyVV, "updateDoc(%q): _mou:%+v Metadata-only update match:%t", base.UD(doc.ID), doc.MetadataOnlyUpdate, mouMatch) } else { base.DebugfCtx(ctx, base.KeyVV, "updateDoc(%q): has no _mou", base.UD(doc.ID)) @@ -2382,7 +2382,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do updatedDoc.IsTombstone = currentRevFromHistory.Deleted if doc.MetadataOnlyUpdate != nil { - if doc.MetadataOnlyUpdate.CAS != "" { + if doc.MetadataOnlyUpdate.HexCAS != "" { updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas)) } } else { @@ -2445,8 +2445,8 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do } else if doc != nil { // Update the in-memory CAS values to match macro-expanded values doc.Cas = casOut - if doc.MetadataOnlyUpdate != nil && doc.MetadataOnlyUpdate.CAS == expandMacroCASValueString { - doc.MetadataOnlyUpdate.CAS = base.CasToString(casOut) + if doc.MetadataOnlyUpdate != nil && doc.MetadataOnlyUpdate.HexCAS == expandMacroCASValueString { + doc.MetadataOnlyUpdate.HexCAS = base.CasToString(casOut) } // update the doc's HLV defined post macro expansion doc = postWriteUpdateHLV(doc, casOut) diff --git a/db/document.go b/db/document.go index 754bd80a5f..4769b84594 100644 --- a/db/document.go +++ b/db/document.go @@ -63,14 +63,25 @@ type ChannelSetEntry struct { Compacted bool `json:"compacted,omitempty"` } +// MetadataOnlyUpdate represents a cas value of a document modification if it only updated xattrs and not the document body. The previous cas and revSeqNo are stored as the version of the document before any metadata was modified. This is serialized as _mou. type MetadataOnlyUpdate struct { - CAS string `json:"cas,omitempty"` - PreviousCAS string `json:"pCas,omitempty"` + HexCAS string `json:"cas,omitempty"` // 0x0 hex value from Couchbase Server + PreviousHexCAS string `json:"pCas,omitempty"` // 0x0 hex value from Couchbase Server PreviousRevSeqNo uint64 `json:"pRev,omitempty"` } func (m *MetadataOnlyUpdate) String() string { - return fmt.Sprintf("{CAS:%d PreviousCAS:%d PreviousRevSeqNo:%d}", base.HexCasToUint64(m.CAS), base.HexCasToUint64(m.PreviousCAS), m.PreviousRevSeqNo) + return fmt.Sprintf("{CAS:%d PreviousCAS:%d PreviousRevSeqNo:%d}", m.CAS(), m.PreviousCAS(), m.PreviousRevSeqNo) +} + +// CAS returns the CAS value as a uint64 +func (m *MetadataOnlyUpdate) CAS() uint64 { + return base.HexCasToUint64(m.HexCAS) +} + +// PreviousCAS returns the previous CAS value as a uint64 +func (m *MetadataOnlyUpdate) PreviousCAS() uint64 { + return base.HexCasToUint64(m.PreviousHexCAS) } // The sync-gateway metadata stored in the "_sync" property of a Couchbase document. @@ -1296,15 +1307,15 @@ func (doc *Document) MarshalWithXattrs() (data, syncXattr, vvXattr, mouXattr, gl func computeMetadataOnlyUpdate(currentCas uint64, revNo uint64, currentMou *MetadataOnlyUpdate) *MetadataOnlyUpdate { var prevCas string currentCasString := base.CasToString(currentCas) - if currentMou != nil && currentCasString == currentMou.CAS { - prevCas = currentMou.PreviousCAS + if currentMou != nil && currentCasString == currentMou.HexCAS { + prevCas = currentMou.PreviousHexCAS } else { prevCas = currentCasString } metadataOnlyUpdate := &MetadataOnlyUpdate{ - CAS: expandMacroCASValueString, // when non-empty, this is replaced with cas macro expansion - PreviousCAS: prevCas, + HexCAS: expandMacroCASValueString, // when non-empty, this is replaced with cas macro expansion + PreviousHexCAS: prevCas, PreviousRevSeqNo: revNo, } return metadataOnlyUpdate diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index 53a29d654b..24e74e62f5 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -258,8 +258,8 @@ func TestHLVImport(t *testing.T) { }, expectedMou: func(output *outputData) *MetadataOnlyUpdate { return &MetadataOnlyUpdate{ - CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), - PreviousCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)), + HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), + PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)), PreviousRevSeqNo: output.preImportRevSeqNo, } }, @@ -281,8 +281,8 @@ func TestHLVImport(t *testing.T) { }, expectedMou: func(output *outputData) *MetadataOnlyUpdate { return &MetadataOnlyUpdate{ - CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), - PreviousCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)), + HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), + PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)), PreviousRevSeqNo: output.preImportRevSeqNo, } }, @@ -302,8 +302,8 @@ func TestHLVImport(t *testing.T) { }, expectedMou: func(output *outputData) *MetadataOnlyUpdate { return &MetadataOnlyUpdate{ - CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), - PreviousCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)), + HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), + PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)), PreviousRevSeqNo: output.preImportRevSeqNo, } }, @@ -324,7 +324,7 @@ func TestHLVImport(t *testing.T) { _, xattrs, _, err := collection.dataStore.GetWithXattrs(ctx, docID, []string{base.VirtualXattrRevSeqNo}) require.NoError(t, err) mou := &MetadataOnlyUpdate{ - PreviousCAS: string(base.Uint64CASToLittleEndianHex(cas)), + PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(cas)), PreviousRevSeqNo: RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo]), } opts := &sgbucket.MutateInOptions{ @@ -337,8 +337,8 @@ func TestHLVImport(t *testing.T) { }, expectedMou: func(output *outputData) *MetadataOnlyUpdate { return &MetadataOnlyUpdate{ - CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), - PreviousCAS: output.preImportMou.PreviousCAS, + HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), + PreviousHexCAS: output.preImportMou.PreviousHexCAS, PreviousRevSeqNo: output.preImportRevSeqNo, } }, @@ -355,8 +355,8 @@ func TestHLVImport(t *testing.T) { _, xattrs, _, err := collection.dataStore.GetWithXattrs(ctx, docID, []string{base.VirtualXattrRevSeqNo}) require.NoError(t, err) mou := &MetadataOnlyUpdate{ - CAS: "invalid", - PreviousCAS: string(base.Uint64CASToLittleEndianHex(cas)), + HexCAS: "invalid", + PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(cas)), PreviousRevSeqNo: RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo]), } _, err = collection.dataStore.UpdateXattrs(ctx, docID, 0, cas, map[string][]byte{base.MouXattrName: base.MustJSONMarshal(t, mou)}, nil) @@ -364,8 +364,8 @@ func TestHLVImport(t *testing.T) { }, expectedMou: func(output *outputData) *MetadataOnlyUpdate { return &MetadataOnlyUpdate{ - CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), - PreviousCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)), + HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), + PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(output.preImportCas)), PreviousRevSeqNo: output.preImportRevSeqNo, } }, @@ -389,7 +389,7 @@ func TestHLVImport(t *testing.T) { require.NoError(t, err) mou := &MetadataOnlyUpdate{ - PreviousCAS: string(base.Uint64CASToLittleEndianHex(cas)), + PreviousHexCAS: string(base.Uint64CASToLittleEndianHex(cas)), PreviousRevSeqNo: RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo]), } opts := &sgbucket.MutateInOptions{ @@ -402,8 +402,8 @@ func TestHLVImport(t *testing.T) { }, expectedMou: func(output *outputData) *MetadataOnlyUpdate { return &MetadataOnlyUpdate{ - CAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), - PreviousCAS: output.preImportMou.PreviousCAS, + HexCAS: string(base.Uint64CASToLittleEndianHex(output.postImportCas)), + PreviousHexCAS: output.preImportMou.PreviousHexCAS, PreviousRevSeqNo: output.preImportRevSeqNo, } }, diff --git a/db/import_test.go b/db/import_test.go index 0d3a9cd47a..8f1ee92475 100644 --- a/db/import_test.go +++ b/db/import_test.go @@ -69,8 +69,8 @@ func TestFeedImport(t *testing.T) { mouXattr, mouOk := xattrs[base.MouXattrName] require.True(t, mouOk) require.NoError(t, base.JSONUnmarshal(mouXattr, &mou)) - require.Equal(t, base.CasToString(writeCas), mou.PreviousCAS) - require.Equal(t, base.CasToString(importCas), mou.CAS) + require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS) + require.Equal(t, base.CasToString(importCas), mou.HexCAS) } else { // Expect not found fetching mou xattr require.Error(t, err) @@ -105,8 +105,8 @@ func TestOnDemandImportMou(t *testing.T) { if db.UseMou() { require.NotNil(t, doc.MetadataOnlyUpdate) - require.Equal(t, base.CasToString(writeCas), doc.MetadataOnlyUpdate.PreviousCAS) - require.Equal(t, base.CasToString(doc.Cas), doc.MetadataOnlyUpdate.CAS) + require.Equal(t, base.CasToString(writeCas), doc.MetadataOnlyUpdate.PreviousHexCAS) + require.Equal(t, base.CasToString(doc.Cas), doc.MetadataOnlyUpdate.HexCAS) } else { require.Nil(t, doc.MetadataOnlyUpdate) } @@ -138,8 +138,8 @@ func TestOnDemandImportMou(t *testing.T) { var mou *MetadataOnlyUpdate require.True(t, mouOk) require.NoError(t, base.JSONUnmarshal(mouXattr, &mou)) - require.Equal(t, base.CasToString(writeCas), mou.PreviousCAS) - require.Equal(t, base.CasToString(importCas), mou.CAS) + require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS) + require.Equal(t, base.CasToString(importCas), mou.HexCAS) } else { // expect not found fetching mou xattr require.Error(t, err) @@ -940,8 +940,8 @@ func TestMetadataOnlyUpdate(t *testing.T) { previousRev := syncData.CurrentRev // verify mou contents - require.Equal(t, base.CasToString(writeCas), mou.PreviousCAS) - require.Equal(t, base.CasToString(importCas), mou.CAS) + require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS) + require.Equal(t, base.CasToString(importCas), mou.HexCAS) // 3. Update the previous SDK write via SGW, ensure mou isn't updated again updatedBody := Body{"_rev": previousRev, "foo": "baz"} diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index 875ebbd568..0c4e4a2eaf 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -88,11 +88,7 @@ func TestHLVUpdateDocumentSingleActor(t *testing.T) { if strings.HasPrefix(tc.activePeerID, "cbl") { t.Skip("Skipping Couchbase Lite test, returns unexpected body in proposeChanges: [304], CBG-4257") } - if base.UnitTestUrlIsWalrus() { - t.Skip("rosmar consistent failure CBG-4365") - } else { - t.Skip("intermittent failure in Couchbase Server CBG-4329") - } + t.Skip("intermittent failure in Couchbase Server and rosmar CBG-4329") peers, _ := setupTests(t, tc.topology, tc.activePeerID) body1 := []byte(fmt.Sprintf(`{"peer": "%s", "topology": "%s", "write": 1}`, tc.activePeerID, tc.description())) diff --git a/xdcr/rosmar_xdcr.go b/xdcr/rosmar_xdcr.go index e874213349..43658e9a70 100644 --- a/xdcr/rosmar_xdcr.go +++ b/xdcr/rosmar_xdcr.go @@ -24,6 +24,25 @@ import ( "github.com/couchbaselabs/rosmar" ) +// replicatedDocLocation represents whether a document is from the source or target bucket. +type replicatedDocLocation uint8 + +const ( + sourceDoc replicatedDocLocation = iota + targetDoc +) + +func (r replicatedDocLocation) String() string { + switch r { + case sourceDoc: + return "source" + case targetDoc: + return "target" + default: + return "unknown" + } +} + // rosmarManager implements a XDCR bucket to bucket replication within rosmar. type rosmarManager struct { filterFunc xdcrFilterFunc @@ -65,7 +84,7 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket, // processEvent processes a DCP event coming from a toBucket and replicates it to the target datastore. func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEvent) bool { docID := string(event.Key) - base.TracefCtx(ctx, base.KeySGTest, "Got event %s, opcode: %s", docID, event.Opcode) + base.TracefCtx(ctx, base.KeyVV, "Got event %s, opcode: %s", docID, event.Opcode) col, ok := r.toBucketCollections[event.CollectionID] if !ok { base.ErrorfCtx(ctx, "This violates the assumption that all collections are mapped to a target collection. This should not happen. Found event=%+v", event) @@ -78,7 +97,7 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve case sgbucket.FeedOpDeletion, sgbucket.FeedOpMutation: // Filter out events if we have a non XDCR filter if r.filterFunc != nil && !r.filterFunc(&event) { - base.TracefCtx(ctx, base.KeySGTest, "Filtering doc %s", docID) + base.TracefCtx(ctx, base.KeyVV, "Filtering doc %s", docID) r.mobileDocsFiltered.Add(1) return true } @@ -98,26 +117,16 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve return false } - // When doing the evaluation of cas, we want to ignore import mutations, marked with _mou.cas == cas. In that case, we will just use the _vv.cvCAS for conflict resolution. If _mou.cas is present but out of date, continue to use _vv.ver. - sourceCas := event.Cas - if sourceMou != nil && base.HexCasToUint64(sourceMou.CAS) == sourceCas && sourceHLV != nil { - sourceCas = sourceHLV.CurrentVersionCAS - base.InfofCtx(ctx, base.KeySGTest, "XDCR doc:%s source _mou.cas=cas (%d), using _vv.cvCAS (%d) for conflict resolution", docID, event.Cas, sourceCas) - } - targetCas := actualTargetCas + actualSourceCas := event.Cas + conflictResolutionSourceCas := getConflictResolutionCas(ctx, docID, sourceDoc, actualSourceCas, sourceHLV, sourceMou) + targetHLV, targetMou, err := getHLVAndMou(targetXattrs) if err != nil { base.WarnfCtx(ctx, "Replicating doc %s, could not get target hlv and mou: %s", event.Key, err) r.errorCount.Add(1) return false } - if targetMou != nil && targetHLV != nil { - // _mou.CAS matches the CAS value, use the _vv.cvCAS for conflict resolution - if base.HexCasToUint64(targetMou.CAS) == targetCas { - targetCas = targetHLV.CurrentVersionCAS - base.InfofCtx(ctx, base.KeySGTest, "XDCR doc:%s target _mou.cas=cas (%d), using _vv.cvCAS (%d) for conflict resolution", docID, targetCas, targetHLV.CurrentVersionCAS) - } - } + conflictResolutionTargetCas := getConflictResolutionCas(ctx, docID, targetDoc, actualTargetCas, targetHLV, targetMou) /* full LWW conflict resolution is implemented in rosmar. There is no need to implement this since CAS will always be unique due to rosmar limitations. @@ -159,17 +168,16 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve */ - if sourceCas <= targetCas { - base.InfofCtx(ctx, base.KeySGTest, "XDCR doc:%s skipping replication since sourceCas (%d) < targetCas (%d)", docID, sourceCas, targetCas) + if conflictResolutionSourceCas <= conflictResolutionTargetCas { + base.InfofCtx(ctx, base.KeyVV, "XDCR doc:%s skipping replication since sourceCas (%d) < targetCas (%d)", docID, conflictResolutionSourceCas, conflictResolutionTargetCas) r.targetNewerDocs.Add(1) - base.TracefCtx(ctx, base.KeySGTest, "Skipping replicating doc %s, cas %d <= %d", docID, event.Cas, targetCas) return true } /* else if sourceCas == targetCas { // CBG-4334, check datatype for targetXattrs to see if there are any xattrs present hasSourceXattrs := event.DataType&sgbucket.FeedDataTypeXattr != 0 hasTargetXattrs := len(targetXattrs) > 0 if !(hasSourceXattrs && !hasTargetXattrs) { - base.InfofCtx(ctx, base.KeySGTest, "skipping %q skipping replication since sourceCas (%d) < targetCas (%d)", docID, sourceCas, targetCas) + base.InfofCtx(ctx, base.KeyVV, "skipping %q skipping replication since sourceCas (%d) < targetCas (%d)", docID, sourceCas, targetCas) return true } } @@ -178,13 +186,13 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve if targetSyncXattr, ok := targetXattrs[base.SyncXattrName]; ok { newXattrs[base.SyncXattrName] = targetSyncXattr } - err = updateHLV(newXattrs, sourceHLV, sourceMou, r.fromBucketSourceID, event.Cas) + err = updateHLV(newXattrs, sourceHLV, sourceMou, r.fromBucketSourceID, actualSourceCas) if err != nil { base.WarnfCtx(ctx, "Replicating doc %s, could not update hlv: %s", event.Key, err) r.errorCount.Add(1) return false } - base.InfofCtx(ctx, base.KeySGTest, "Replicating doc %q, with cas (%d), body %s, xattrsKeys: %+v", event.Key, event.Cas, string(body), maps.Keys(newXattrs)) + base.InfofCtx(ctx, base.KeyVV, "Replicating doc %q, with cas (%d), body %s, xattrsKeys: %+v", event.Key, actualSourceCas, string(body), maps.Keys(newXattrs)) err = opWithMeta(ctx, col, actualTargetCas, newXattrs, body, &event) if err != nil { base.WarnfCtx(ctx, "Replicating doc %s, could not write doc: %s", event.Key, err) @@ -342,10 +350,14 @@ func getHLVAndMou(xattrs map[string][]byte) (*db.HybridLogicalVector, *db.Metada return hlv, mou, nil } +// updateHLV will update the xattrs on the target document considering the source's HLV, _mou, sourceID and cas. func updateHLV(xattrs map[string][]byte, sourceHLV *db.HybridLogicalVector, sourceMou *db.MetadataOnlyUpdate, sourceID string, sourceCas uint64) error { + // TODO: read existing targetXattrs[base.VvXattrName] and update the pv CBG-4250. This will need to merge pv from sourceHLV and targetHLV. var targetHLV *db.HybridLogicalVector - if sourceHLV != nil { - // TODO: read existing targetXattrs[base.VvXattrName] and update the pv CBG-4250 + // if source vv.cvCas == cas, the _vv.cv, _vv.cvCAS from the source is correct and we can use it directly. + sourcecvCASMatch := sourceHLV != nil && sourceHLV.CurrentVersionCAS == sourceCas + sourceWasImport := sourceMou != nil && sourceMou.CAS() == sourceCas + if sourceHLV != nil && (sourceWasImport || sourcecvCASMatch) { targetHLV = sourceHLV } else { hlv := db.NewHybridLogicalVector() @@ -365,6 +377,10 @@ func updateHLV(xattrs map[string][]byte, sourceHLV *db.HybridLogicalVector, sour return err } if sourceMou != nil { + // removing _mou.cas and _mou.pRev matches cbs xdcr behavior. + // CBS xdcr maybe should clear _mou.pCas as well, but it is not a problem since all checks for _mou.cas should check current cas for _mou being up to date. + sourceMou.HexCAS = "" + sourceMou.PreviousRevSeqNo = 0 var err error xattrs[base.MouXattrName], err = json.Marshal(sourceMou) if err != nil { @@ -373,3 +389,23 @@ func updateHLV(xattrs map[string][]byte, sourceHLV *db.HybridLogicalVector, sour } return nil } + +// getConflictResolutionCas returns cas for conflict resolution. +// If _mou.cas == actualCas, assume _vv is up to date and use _vv.cvCAS +// Otherwise, return actualCas +func getConflictResolutionCas(ctx context.Context, docID string, location replicatedDocLocation, actualCas uint64, hlv *db.HybridLogicalVector, mou *db.MetadataOnlyUpdate) uint64 { + if mou == nil { + return actualCas + } + // _mou.CAS is out of date, ignoring + if mou.CAS() != actualCas { + return actualCas + } + if hlv == nil { + base.InfofCtx(ctx, base.KeyVV, "XDCR doc:%s %s _mou.cas=cas (%d), but there is no HLV, using 0 for conflict resolution to match behavior of Couchbase Server", docID, location, actualCas) + return 0 + } + // _mou.CAS matches the CAS value, use the _vv.cvCAS for conflict resolution + base.InfofCtx(ctx, base.KeyVV, "XDCR doc:%s %s _mou.cas=cas (%d), using _vv.cvCAS (%d) for conflict resolution", docID, location, actualCas, hlv.CurrentVersionCAS) + return hlv.CurrentVersionCAS +} diff --git a/xdcr/xdcr_test.go b/xdcr/xdcr_test.go index 26fc130933..1d4db1f0eb 100644 --- a/xdcr/xdcr_test.go +++ b/xdcr/xdcr_test.go @@ -344,9 +344,8 @@ func TestVVObeyMou(t *testing.T) { DocsProcessed: 1, }, *stats) - fmt.Printf("HONK HONK HONK\n") mou := &db.MetadataOnlyUpdate{ - PreviousCAS: base.CasToString(fromCas1), + PreviousHexCAS: base.CasToString(fromCas1), PreviousRevSeqNo: db.RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo]), } @@ -384,6 +383,117 @@ func TestVVObeyMou(t *testing.T) { require.Equal(t, expectedVV, vv) } +func TestVVMouImport(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeySGTest) + fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t) + ctx := base.TestCtx(t) + fromBucketSourceID, err := GetSourceID(ctx, fromBucket) + require.NoError(t, err) + + docID := "doc1" + ver1Body := `{"ver":1}` + fromCas1, err := fromDs.WriteWithXattrs(ctx, docID, 0, 0, []byte(ver1Body), map[string][]byte{"ver1": []byte(`{}`)}, nil, + &sgbucket.MutateInOptions{ + MacroExpansion: []sgbucket.MacroExpansionSpec{ + sgbucket.NewMacroExpansionSpec("ver1.cas", sgbucket.MacroCas), + }, + }) + require.NoError(t, err) + + xdcr := startXDCR(t, fromBucket, toBucket, XDCROptions{Mobile: MobileOn}) + defer func() { + assert.NoError(t, xdcr.Stop(ctx)) + }() + requireWaitForXDCRDocsProcessed(t, xdcr, 1) + + body, xattrs, destCas, err := toDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName, base.VirtualXattrRevSeqNo}) + require.NoError(t, err) + require.Equal(t, fromCas1, destCas) + require.JSONEq(t, ver1Body, string(body)) + require.NotContains(t, xattrs, base.MouXattrName) + require.Contains(t, xattrs, base.VvXattrName) + var vv db.HybridLogicalVector + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &vv)) + expectedVV := db.HybridLogicalVector{ + CurrentVersionCAS: fromCas1, + SourceID: fromBucketSourceID, + Version: fromCas1, + } + + require.Equal(t, expectedVV, vv) + + stats, err := xdcr.Stats(ctx) + assert.NoError(t, err) + require.Equal(t, Stats{ + DocsWritten: 1, + DocsProcessed: 1, + }, *stats) + + mou := &db.MetadataOnlyUpdate{ + HexCAS: "expand", + PreviousHexCAS: base.CasToString(fromCas1), + PreviousRevSeqNo: db.RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo]), + } + + opts := &sgbucket.MutateInOptions{ + MacroExpansion: []sgbucket.MacroExpansionSpec{ + sgbucket.NewMacroExpansionSpec(db.XattrMouCasPath(), sgbucket.MacroCas), + sgbucket.NewMacroExpansionSpec("ver2.cas", sgbucket.MacroCas)}, + } + fromCas2, err := fromDs.UpdateXattrs(ctx, docID, 0, fromCas1, map[string][]byte{ + base.MouXattrName: base.MustJSONMarshal(t, mou), + "ver2": []byte(`{}`), + }, opts) + require.NoError(t, err) + require.NotEqual(t, fromCas1, fromCas2) + + requireWaitForXDCRDocsProcessed(t, xdcr, 2) + stats, err = xdcr.Stats(ctx) + assert.NoError(t, err) + require.Equal(t, Stats{ + TargetNewerDocs: 1, + DocsWritten: 1, + DocsProcessed: 2, + }, *stats) + + ver3Body := `{"ver":3}` + fromCas3, err := fromDs.WriteWithXattrs(ctx, docID, 0, fromCas2, []byte(ver3Body), map[string][]byte{"ver3": []byte(`{}`)}, nil, + &sgbucket.MutateInOptions{ + MacroExpansion: []sgbucket.MacroExpansionSpec{ + sgbucket.NewMacroExpansionSpec("ver3.cas", sgbucket.MacroCas), + }, + }) + require.NoError(t, err) + requireWaitForXDCRDocsProcessed(t, xdcr, 3) + + stats, err = xdcr.Stats(ctx) + assert.NoError(t, err) + require.Equal(t, Stats{ + TargetNewerDocs: 1, + DocsWritten: 2, + DocsProcessed: 3, + }, *stats) + + body, xattrs, destCas, err = toDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName}) + require.NoError(t, err) + require.Equal(t, fromCas3, destCas) + require.JSONEq(t, ver3Body, string(body)) + require.Contains(t, xattrs, base.VvXattrName) + vv = db.HybridLogicalVector{} + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &vv)) + require.Equal(t, db.HybridLogicalVector{ + CurrentVersionCAS: fromCas3, + SourceID: fromBucketSourceID, + Version: fromCas3}, vv) + require.Contains(t, xattrs, base.MouXattrName) + var actualMou *db.MetadataOnlyUpdate + require.NoError(t, base.JSONUnmarshal(xattrs[base.MouXattrName], &actualMou)) + // it is weird that couchbase server XDCR doesn't clear _mou but only _mou.cas and _mou.pRev but this is not a problem since eventing and couchbase server read _mou.cas to determine if _mou should be used + require.Equal(t, db.MetadataOnlyUpdate{ + PreviousHexCAS: mou.PreviousHexCAS}, + *actualMou) +} + func TestLWWAfterInitialReplication(t *testing.T) { fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t) ctx := base.TestCtx(t)