From 89cb9a37847da316a9b607939a330a19d66a2591 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Mon, 11 Mar 2024 05:40:58 -0700 Subject: [PATCH] CBG-3255 Replication protocol support for HLV - push replication (#6700) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * CBG-3255 Push replication support for HLV Adds push replication support for HLV clients. Delta sync and attachments are not yet supported (pending CBG-3736, CBG-3797). On proposeChanges, checks whether the incoming CV and parent version represent a new document, known version, valid update, or conflict. Uses the same handling as revTreeID (conflict if parent version isn’t the server’s current version), with the additional non-conflict case where the incoming CV and server CV share the same source and the incoming CV is a newer version. For the incoming rev, detects conflict based on the incoming cv (based on the implicit hierarchy in an HLV, where cv > pv > mv). Includes some test helpers to support writing tests with simplified versions (e.g. 1@abc) while still asserting for encoded source and version. * Test fixes * Fixes/cleanup based on PR review --------- Co-authored-by: Gregory Newman-Smith --- base/util.go | 15 ++ db/blip_handler.go | 84 ++++++++--- db/crud.go | 81 ++++++++-- db/crud_test.go | 50 +++--- db/database_test.go | 134 ++++++++++++++++ db/document.go | 13 +- db/document_test.go | 2 +- db/hybrid_logical_vector.go | 164 +++++++++++++++++--- db/hybrid_logical_vector_test.go | 252 +++++++++++++++++++++++++------ db/revision_cache_interface.go | 4 +- db/utilities_hlv_testing.go | 97 +++++++++++- rest/attachment_test.go | 10 +- rest/blip_api_attachment_test.go | 20 +-- rest/blip_api_crud_test.go | 103 ++++++++++++- rest/blip_api_delta_sync_test.go | 24 ++- rest/blip_client_test.go | 14 +- 16 files changed, 896 insertions(+), 171 deletions(-) diff --git a/base/util.go b/base/util.go index 18c855cd78..b1bf20f29f 100644 --- a/base/util.go +++ b/base/util.go @@ -1004,6 +1004,10 @@ func HexCasToUint64(cas string) uint64 { return binary.LittleEndian.Uint64(casBytes[0:8]) } +func CasToString(cas uint64) string { + return string(Uint64CASToLittleEndianHex(cas)) +} + func Uint64CASToLittleEndianHex(cas uint64) []byte { littleEndian := make([]byte, 8) binary.LittleEndian.PutUint64(littleEndian, cas) @@ -1014,6 +1018,17 @@ func Uint64CASToLittleEndianHex(cas uint64) []byte { return encodedArray } +// Converts a string decimal representation ("100") to little endian hex string ("0x64") +func StringDecimalToLittleEndianHex(value string) (string, error) { + intValue, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return "", err + } + hexValue := Uint64CASToLittleEndianHex(intValue) + return string(hexValue), nil + +} + func Crc32cHash(input []byte) uint32 { // crc32.MakeTable already ensures singleton table creation, so shouldn't need to cache. table := crc32.MakeTable(crc32.Castagnoli) diff --git a/db/blip_handler.go b/db/blip_handler.go index 37b2224b4f..01dbe5b29a 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -801,12 +801,18 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error { for i, change := range changeList { docID := change[0].(string) - revID := change[1].(string) + rev := change[1].(string) // rev can represent a RevTree ID or HLV current version parentRevID := "" if len(change) > 2 { parentRevID = change[2].(string) } - status, currentRev := bh.collection.CheckProposedRev(bh.loggingCtx, docID, revID, parentRevID) + var status ProposedRevStatus + var currentRev string + if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 { + status, currentRev = bh.collection.CheckProposedVersion(bh.loggingCtx, docID, rev, parentRevID) + } else { + status, currentRev = bh.collection.CheckProposedRev(bh.loggingCtx, docID, rev, parentRevID) + } if status == ProposedRev_OK_IsNew { // Remember that the doc doesn't exist locally, in order to optimize the upcoming Put: bh.collectionCtx.notePendingInsertion(docID) @@ -948,6 +954,10 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err } }() + if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 && bh.conflictResolver != nil { + return base.HTTPErrorf(http.StatusNotImplemented, "conflict resolver handling (ISGR) not yet implemented for v4 protocol") + } + // throttle concurrent revs if cap(bh.inFlightRevsThrottle) > 0 { select { @@ -966,13 +976,13 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err // Doc metadata comes from the BLIP message metadata, not magic document properties: docID, found := revMessage.ID() - revID, rfound := revMessage.Rev() + rev, rfound := revMessage.Rev() if !found || !rfound { - return base.HTTPErrorf(http.StatusBadRequest, "Missing docID or revID") + return base.HTTPErrorf(http.StatusBadRequest, "Missing docID or rev") } if bh.readOnly { - return base.HTTPErrorf(http.StatusForbidden, "Replication context is read-only, docID: %s, revID:%s", docID, revID) + return base.HTTPErrorf(http.StatusForbidden, "Replication context is read-only, docID: %s, rev:%s", docID, rev) } base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s %s", bh.serialNumber, rq.Profile(), revMessage.String()) @@ -992,7 +1002,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err return err } if removed, ok := body[BodyRemoved].(bool); ok && removed { - base.InfofCtx(bh.loggingCtx, base.KeySync, "Purging doc %v - removed at rev %v", base.UD(docID), revID) + base.InfofCtx(bh.loggingCtx, base.KeySync, "Purging doc %v - removed at rev %v", base.UD(docID), rev) if err := bh.collection.Purge(bh.loggingCtx, docID); err != nil { return err } @@ -1004,7 +1014,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err if err != nil { base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqStr, err) } else { - bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID}) + bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: rev}) } } return nil @@ -1012,9 +1022,31 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err } newDoc := &Document{ - ID: docID, - RevID: revID, + ID: docID, + } + + var history []string + var incomingHLV HybridLogicalVector + // Build history/HLV + if bh.activeCBMobileSubprotocol < CBMobileReplicationV4 { + newDoc.RevID = rev + history = []string{rev} + if historyStr := rq.Properties[RevMessageHistory]; historyStr != "" { + history = append(history, strings.Split(historyStr, ",")...) + } + } else { + versionVectorStr := rev + if historyStr := rq.Properties[RevMessageHistory]; historyStr != "" { + versionVectorStr += ";" + historyStr + } + incomingHLV, err = extractHLVFromBlipMessage(versionVectorStr) + if err != nil { + base.InfofCtx(bh.loggingCtx, base.KeySync, "Error parsing hlv while processing rev for doc %v. HLV:%v Error: %v", base.UD(docID), versionVectorStr, err) + return base.HTTPErrorf(http.StatusUnprocessableEntity, "error extracting hlv from blip message") + } + newDoc.HLV = &incomingHLV } + newDoc.UpdateBodyBytes(bodyBytes) injectedAttachmentsForDelta := false @@ -1033,7 +1065,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err // while retrieving deltaSrcRevID. Couchbase Lite replication guarantees client has access to deltaSrcRevID, // due to no-conflict write restriction, but we still need to enforce security here to prevent leaking data about previous // revisions to malicious actors (in the scenario where that user has write but not read access). - deltaSrcRev, err := bh.collection.GetRev(bh.loggingCtx, docID, deltaSrcRevID, false, nil) + var deltaSrcRev DocumentRevision + if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 { + cv := Version{} + cv.SourceID, cv.Value = incomingHLV.GetCurrentVersion() + deltaSrcRev, err = bh.collection.GetCV(bh.loggingCtx, docID, &cv, RevCacheOmitBody) + } else { + deltaSrcRev, err = bh.collection.GetRev(bh.loggingCtx, docID, deltaSrcRevID, false, nil) + } if err != nil { return base.HTTPErrorf(http.StatusUnprocessableEntity, "Can't fetch doc %s for deltaSrc=%s %v", base.UD(docID), deltaSrcRevID, err) } @@ -1059,7 +1098,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err // err should only ever be a FleeceDeltaError here - but to be defensive, handle other errors too (e.g. somehow reaching this code in a CE build) if err != nil { // Something went wrong in the diffing library. We want to know about this! - base.WarnfCtx(bh.loggingCtx, "Error patching deltaSrc %s with %s for doc %s with delta - err: %v", deltaSrcRevID, revID, base.UD(docID), err) + base.WarnfCtx(bh.loggingCtx, "Error patching deltaSrc %s with %s for doc %s with delta - err: %v", deltaSrcRevID, rev, base.UD(docID), err) return base.HTTPErrorf(http.StatusUnprocessableEntity, "Error patching deltaSrc with delta: %s", err) } @@ -1097,15 +1136,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err } } - history := []string{revID} - if historyStr := rq.Properties[RevMessageHistory]; historyStr != "" { - history = append(history, strings.Split(historyStr, ",")...) - } - var rawBucketDoc *sgbucket.BucketDocument // Pull out attachments if injectedAttachmentsForDelta || bytes.Contains(bodyBytes, []byte(BodyAttachments)) { + // temporarily error here if V4 + if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 { + return base.HTTPErrorf(http.StatusNotImplemented, "attachment handling not yet supported for v4 protocol") + } body := newDoc.Body(bh.loggingCtx) var currentBucketDoc *Document @@ -1142,7 +1180,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err if !ok { // If we don't have this attachment already, ensure incoming revpos is greater than minRevPos, otherwise // update to ensure it's fetched and uploaded - bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, revID) + bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, rev) continue } @@ -1182,7 +1220,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err // digest is different we need to override the revpos and set it to the current revision to ensure // the attachment is requested and stored if int(incomingAttachmentRevpos) <= minRevpos && currentAttachmentDigest != incomingAttachmentDigest { - bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, revID) + bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, rev) } } @@ -1190,7 +1228,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err } if err := bh.downloadOrVerifyAttachments(rq.Sender, body, minRevpos, docID, currentDigests); err != nil { - base.ErrorfCtx(bh.loggingCtx, "Error during downloadOrVerifyAttachments for doc %s/%s: %v", base.UD(docID), revID, err) + base.ErrorfCtx(bh.loggingCtx, "Error during downloadOrVerifyAttachments for doc %s/%s: %v", base.UD(docID), rev, err) return err } @@ -1200,7 +1238,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err } if rawBucketDoc == nil && bh.collectionCtx.checkPendingInsertion(docID) { - // At the time we handled the `propseChanges` request, there was no doc with this docID + // At the time we handled the `proposeChanges` request, there was no doc with this docID // in the bucket. As an optimization, tell PutExistingRev to assume the doc still doesn't // exist and bypass getting it from the bucket during the save. If we're wrong, the save // will fail with a CAS mismatch and the retry will fetch the existing doc. @@ -1213,7 +1251,9 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err // If the doc is a tombstone we want to allow conflicts when running SGR2 // bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2 forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2) - if bh.conflictResolver != nil { + if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 { + _, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc) + } else if bh.conflictResolver != nil { _, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV) } else { _, _, err = bh.collection.PutExistingRev(bh.loggingCtx, newDoc, history, revNoConflicts, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV) @@ -1228,7 +1268,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err if err != nil { base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqProperty, err) } else { - bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID}) + bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: rev}) } } diff --git a/db/crud.go b/db/crud.go index 0fcd6aaac5..3e01286c86 100644 --- a/db/crud.go +++ b/db/crud.go @@ -1070,7 +1070,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod return newRevID, doc, err } -func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, docHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) { +func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) { var matchRev string if existingDoc != nil { doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs[base.SyncXattrName], existingDoc.Xattrs[db.userXattrKey()], existingDoc.Cas, DocUnmarshalRev) @@ -1109,20 +1109,28 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont } } + // set up revTreeID for backward compatibility + previousRevTreeID := doc.CurrentRev + prevGeneration, _ := ParseRevID(ctx, previousRevTreeID) + newGeneration := prevGeneration + 1 + // Conflict check here // if doc has no HLV defined this is a new doc we haven't seen before, skip conflict check if doc.HLV == nil { - doc.HLV = &HybridLogicalVector{} - addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV) + newHLV := NewHybridLogicalVector() + doc.HLV = &newHLV + addNewerVersionsErr := doc.HLV.AddNewerVersions(newDocHLV) if addNewerVersionsErr != nil { return nil, nil, false, nil, addNewerVersionsErr } } else { - incomingDecodedHLV := docHLV.ToDecodedHybridLogicalVector() - localDecodedHLV := doc.HLV.ToDecodedHybridLogicalVector() - if !incomingDecodedHLV.IsInConflict(localDecodedHLV) { + if doc.HLV.isDominating(newDocHLV) { + base.DebugfCtx(ctx, base.KeyCRUD, "PutExistingCurrentVersion(%q): No new versions to add", base.UD(newDoc.ID)) + return nil, nil, false, nil, base.ErrUpdateCancel // No new revisions to add + } + if newDocHLV.isDominating(*doc.HLV) { // update hlv for all newer incoming source version pairs - addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV) + addNewerVersionsErr := doc.HLV.AddNewerVersions(newDocHLV) if addNewerVersionsErr != nil { return nil, nil, false, nil, addNewerVersionsErr } @@ -1132,9 +1140,13 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont return nil, nil, false, nil, base.HTTPErrorf(http.StatusConflict, "Document revision conflict") } } + // populate merge versions + if newDocHLV.MergeVersions != nil { + doc.HLV.MergeVersions = newDocHLV.MergeVersions + } // Process the attachments, replacing bodies with digests. - newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, generation, matchRev, nil) + newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, newGeneration, previousRevTreeID, nil) if err != nil { return nil, nil, false, nil, err } @@ -1145,9 +1157,9 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont if err != nil { return nil, nil, false, nil, err } - newRev := CreateRevIDWithBytes(generation, matchRev, encoding) + newRev := CreateRevIDWithBytes(newGeneration, previousRevTreeID, encoding) - if err := doc.History.addRevision(newDoc.ID, RevInfo{ID: newRev, Parent: matchRev, Deleted: newDoc.Deleted}); err != nil { + if err := doc.History.addRevision(newDoc.ID, RevInfo{ID: newRev, Parent: previousRevTreeID, Deleted: newDoc.Deleted}); err != nil { base.InfofCtx(ctx, base.KeyCRUD, "Failed to add revision ID: %s, for doc: %s, error: %v", newRev, base.UD(newDoc.ID), err) return nil, nil, false, nil, base.ErrRevTreeAddRevFailure } @@ -2277,6 +2289,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do } } + // ErrUpdateCancel is returned when the incoming revision is already known if err == base.ErrUpdateCancel { return nil, "", nil } else if err != nil { @@ -2888,6 +2901,54 @@ func (db *DatabaseCollectionWithUser) CheckProposedRev(ctx context.Context, doci } } +// CheckProposedVersion - given DocID and a version in string form, check whether it can be added without conflict. +func (db *DatabaseCollectionWithUser) CheckProposedVersion(ctx context.Context, docid, proposedVersionStr string, previousVersionStr string) (status ProposedRevStatus, currentVersion string) { + + proposedVersion, err := ParseVersion(proposedVersionStr) + if err != nil { + base.WarnfCtx(ctx, "Couldn't parse proposed version for doc %q / %q: %v", base.UD(docid), proposedVersionStr, err) + return ProposedRev_Error, "" + } + + var previousVersion Version + if previousVersionStr != "" { + var err error + previousVersion, err = ParseVersion(previousVersionStr) + if err != nil { + base.WarnfCtx(ctx, "Couldn't parse previous version for doc %q / %q: %v", base.UD(docid), previousVersionStr, err) + return ProposedRev_Error, "" + } + } + + localDocCV := Version{} + doc, err := db.GetDocSyncDataNoImport(ctx, docid, DocUnmarshalNoHistory) + if doc.HLV != nil { + localDocCV.SourceID, localDocCV.Value = doc.HLV.GetCurrentVersion() + } + if err != nil { + if !base.IsDocNotFoundError(err) && err != base.ErrXattrNotFound { + base.WarnfCtx(ctx, "CheckProposedRev(%q) --> %T %v", base.UD(docid), err, err) + return ProposedRev_Error, "" + } + // New document not found on server + return ProposedRev_OK_IsNew, "" + } else if localDocCV == previousVersion { + // Non-conflicting update, client's previous version is server's CV + return ProposedRev_OK, "" + } else if doc.HLV.DominatesSource(proposedVersion) { + // SGW already has this version + return ProposedRev_Exists, "" + } else if localDocCV.SourceID == proposedVersion.SourceID && localDocCV.Value < proposedVersion.Value { + // previousVersion didn't match, but proposed version and server CV have matching source, and proposed version is newer + return ProposedRev_OK, "" + } else { + // Conflict, return the current cv. This may be a false positive conflict if the client has replicated + // the server cv via a different peer. Client is responsible for performing this check based on the + // returned localDocCV + return ProposedRev_Conflict, localDocCV.String() + } +} + const ( xattrMacroCas = "cas" // SyncData.Cas xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c diff --git a/db/crud_test.go b/db/crud_test.go index 9a0f8e3af0..3e40fa6fee 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -1729,10 +1729,11 @@ func TestPutExistingCurrentVersion(t *testing.T) { body = Body{"key1": "value2"} newDoc := createTestDocument(key, "", body, false, 0) - // construct a HLV that simulates a doc update happening on a client - // this means moving the current source version pair to PV and adding new sourceID and version pair to CV + // Simulate a conflicting doc update happening from a client that + // has only replicated the initial version of the document pv := make(map[string]string) - pv[bucketUUID] = originalDocVersion + pv[syncData.HLV.SourceID] = originalDocVersion + // create a version larger than the allocated version above incomingVersion := string(base.Uint64CASToLittleEndianHex(docUpdateVersionInt + 10)) incomingHLV := HybridLogicalVector{ @@ -1741,14 +1742,18 @@ func TestPutExistingCurrentVersion(t *testing.T) { PreviousVersions: pv, } - // grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function for the above simulation of - // doc update arriving over replicator - _, rawDoc, err := collection.GetDocumentWithRaw(ctx, key, DocUnmarshalSync) + doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil) + assertHTTPError(t, err, 409) + require.Nil(t, doc) + require.Nil(t, cv) + + // Update the client's HLV to include the latest SGW version. + incomingHLV.PreviousVersions[syncData.HLV.SourceID] = docUpdateVersion + // TODO: because currentRev isn't being updated, storeOldBodyInRevTreeAndUpdateCurrent isn't + // updating the document body. Need to review whether it makes sense to keep using + // storeOldBodyInRevTreeAndUpdateCurrent, or if this needs a larger overhaul to support VV + doc, cv, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil) require.NoError(t, err) - - doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, rawDoc) - require.NoError(t, err) - // assert on returned CV assert.Equal(t, "test", cv.SourceID) assert.Equal(t, incomingVersion, cv.Value) assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody) @@ -1766,6 +1771,16 @@ func TestPutExistingCurrentVersion(t *testing.T) { pv[bucketUUID] = docUpdateVersion assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv)) assert.Equal(t, "3-60b024c44c283b369116c2c2570e8088", syncData.CurrentRev) + + // Attempt to push the same client update, validate server rejects as an already known version and cancels the update. + // This case doesn't return error, verify that SyncData hasn't been changed. + _, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil) + require.NoError(t, err) + syncData2, err := collection.GetDocSyncData(ctx, "doc1") + require.NoError(t, err) + require.Equal(t, syncData.TimeSaved, syncData2.TimeSaved) + require.Equal(t, syncData.CurrentRev, syncData2.CurrentRev) + } // TestPutExistingCurrentVersionWithConflict: @@ -1804,16 +1819,11 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) { Version: string(base.Uint64CASToLittleEndianHex(1234)), } - // grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function - _, rawDoc, err := collection.GetDocumentWithRaw(ctx, key, DocUnmarshalSync) - require.NoError(t, err) - - // assert that a conflict is correctly identified and the resulting doc and cv are nil - doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, rawDoc) - require.Error(t, err) - assert.ErrorContains(t, err, "Document revision conflict") - assert.Nil(t, cv) - assert.Nil(t, doc) + // assert that a conflict is correctly identified and the doc and cv are nil + doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil) + assertHTTPError(t, err, 409) + require.Nil(t, doc) + require.Nil(t, cv) // assert persisted doc hlv hasn't been updated syncData, err = collection.GetDocSyncData(ctx, "doc1") diff --git a/db/database_test.go b/db/database_test.go index 433196e383..5acdb2512a 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -307,6 +307,140 @@ func TestDatabase(t *testing.T) { } +// TestCheckProposedVersion ensures that a given CV will return the appropriate status based on the information present in the HLV. +func TestCheckProposedVersion(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll) + + db, ctx := setupTestDB(t) + defer db.Close(ctx) + collection := GetSingleDatabaseCollectionWithUser(t, db) + + // create a doc + body := Body{"key1": "value1", "key2": 1234} + _, doc, err := collection.Put(ctx, "doc1", body) + require.NoError(t, err) + cvSource, cvValue := doc.HLV.GetCurrentVersion() + currentVersion := Version{cvSource, cvValue} + + testCases := []struct { + name string + newVersion Version + previousVersion *Version + expectedStatus ProposedRevStatus + expectedRev string + }{ + { + // proposed version matches the current server version + // Already known + name: "version exists", + newVersion: currentVersion, + previousVersion: nil, + expectedStatus: ProposedRev_Exists, + expectedRev: "", + }, + { + // proposed version is newer than server cv (same source), and previousVersion matches server cv + // Not a conflict + name: "new version,same source,prev matches", + newVersion: Version{cvSource, incrementStringCas(cvValue, 100)}, + previousVersion: ¤tVersion, + expectedStatus: ProposedRev_OK, + expectedRev: "", + }, + { + // proposed version is newer than server cv (same source), and previousVersion is not specified. + // Not a conflict, even without previousVersion, because of source match + name: "new version,same source,prev not specified", + newVersion: Version{cvSource, incrementStringCas(cvValue, 100)}, + previousVersion: nil, + expectedStatus: ProposedRev_OK, + expectedRev: "", + }, + { + // proposed version is from a source not present in server HLV, and previousVersion matches server cv + // Not a conflict, due to previousVersion match + name: "new version,new source,prev matches", + newVersion: Version{"other", incrementStringCas(cvValue, 100)}, + previousVersion: ¤tVersion, + expectedStatus: ProposedRev_OK, + expectedRev: "", + }, + { + // proposed version is newer than server cv (same source), but previousVersion does not match server cv. + // Not a conflict, regardless of previousVersion mismatch, because of source match between proposed + // version and cv + name: "new version,prev mismatch,new matches cv", + newVersion: Version{cvSource, incrementStringCas(cvValue, 100)}, + previousVersion: &Version{"other", incrementStringCas(cvValue, 50)}, + expectedStatus: ProposedRev_OK, + expectedRev: "", + }, + { + // proposed version is already known, source matches cv + name: "proposed version already known, no prev version", + newVersion: Version{cvSource, incrementStringCas(cvValue, -100)}, + expectedStatus: ProposedRev_Exists, + expectedRev: "", + }, + { + // conflict - previous version is older than CV + name: "conflict,same source,server updated", + newVersion: Version{"other", incrementStringCas(cvValue, -100)}, + previousVersion: &Version{cvSource, incrementStringCas(cvValue, -50)}, + expectedStatus: ProposedRev_Conflict, + expectedRev: Version{cvSource, cvValue}.String(), + }, + { + // conflict - previous version is older than CV + name: "conflict,new source,server updated", + newVersion: Version{"other", incrementStringCas(cvValue, 100)}, + previousVersion: &Version{"other", incrementStringCas(cvValue, -50)}, + expectedStatus: ProposedRev_Conflict, + expectedRev: Version{cvSource, cvValue}.String(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + previousVersionStr := "" + if tc.previousVersion != nil { + previousVersionStr = tc.previousVersion.String() + } + status, rev := collection.CheckProposedVersion(ctx, "doc1", tc.newVersion.String(), previousVersionStr) + assert.Equal(t, tc.expectedStatus, status) + assert.Equal(t, tc.expectedRev, rev) + }) + } + + t.Run("invalid hlv", func(t *testing.T) { + hlvString := "" + status, _ := collection.CheckProposedVersion(ctx, "doc1", hlvString, "") + assert.Equal(t, ProposedRev_Error, status) + }) + + // New doc cases - standard insert + t.Run("new doc", func(t *testing.T) { + newVersion := Version{"other", base.CasToString(100)}.String() + status, _ := collection.CheckProposedVersion(ctx, "doc2", newVersion, "") + assert.Equal(t, ProposedRev_OK_IsNew, status) + }) + + // New doc cases - insert with prev version (previous version purged from SGW) + t.Run("new doc with prev version", func(t *testing.T) { + newVersion := Version{"other", base.CasToString(100)}.String() + prevVersion := Version{"another other", base.CasToString(50)}.String() + status, _ := collection.CheckProposedVersion(ctx, "doc2", newVersion, prevVersion) + assert.Equal(t, ProposedRev_OK_IsNew, status) + }) + +} + +func incrementStringCas(cas string, delta int) (casOut string) { + casValue := base.HexCasToUint64(cas) + casValue = casValue + uint64(delta) + return base.CasToString(casValue) +} + func TestGetDeleted(t *testing.T) { db, ctx := setupTestDB(t) diff --git a/db/document.go b/db/document.go index 23e067de2c..6e4032d07b 100644 --- a/db/document.go +++ b/db/document.go @@ -37,11 +37,10 @@ type DocumentUnmarshalLevel uint8 const ( DocUnmarshalAll = DocumentUnmarshalLevel(iota) // Unmarshals sync metadata and body DocUnmarshalSync // Unmarshals all sync metadata - DocUnmarshalNoHistory // Unmarshals sync metadata excluding history - DocUnmarshalHistory // Unmarshals history + rev + CAS only + DocUnmarshalNoHistory // Unmarshals sync metadata excluding revtree history + DocUnmarshalHistory // Unmarshals revtree history + rev + CAS only DocUnmarshalRev // Unmarshals rev + CAS only DocUnmarshalCAS // Unmarshals CAS (for import check) only - DocUnmarshalVV // Unmarshals Version Vector only DocUnmarshalNone // No unmarshalling (skips import/upgrade check) ) @@ -1192,14 +1191,6 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata Cas: casOnlyMeta.Cas, } doc._rawBody = data - case DocUnmarshalVV: - tmpData := SyncData{} - unmarshalErr := base.JSONUnmarshal(xdata, &tmpData) - if unmarshalErr != nil { - return base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalVV). Error: %w", base.UD(doc.ID), unmarshalErr) - } - doc.SyncData.HLV = tmpData.HLV - doc._rawBody = data } // If there's no body, but there is an xattr, set deleted flag and initialize an empty body diff --git a/db/document_test.go b/db/document_test.go index 52820e1389..bd558ea206 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -260,7 +260,7 @@ func TestParseVersionVectorSyncData(t *testing.T) { ctx := base.TestCtx(t) doc_meta := []byte(doc_meta_with_vv) - doc, err := unmarshalDocumentWithXattr(ctx, "doc_1k", nil, doc_meta, nil, 1, DocUnmarshalVV) + doc, err := unmarshalDocumentWithXattr(ctx, "doc_1k", nil, doc_meta, nil, 1, DocUnmarshalNoHistory) require.NoError(t, err) strCAS := string(base.Uint64CASToLittleEndianHex(123456)) diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 2648c1a560..c77ba61886 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -22,7 +22,7 @@ const hlvExpandMacroCASValue = "expand" // HybridLogicalVectorInterface is an interface to contain methods that will operate on both a decoded HLV and encoded HLV type HybridLogicalVectorInterface interface { - GetVersion(sourceID string) (uint64, bool) + GetValue(sourceID string) (uint64, bool) } var _ HybridLogicalVectorInterface = &HybridLogicalVector{} @@ -145,25 +145,32 @@ func (hlv *HybridLogicalVector) GetCurrentVersionString() string { return version.String() } -// IsInConflict tests to see if in memory HLV is conflicting with another HLV -func (hlv *DecodedHybridLogicalVector) IsInConflict(otherVector DecodedHybridLogicalVector) bool { - // test if either HLV(A) or HLV(B) are dominating over each other. If so they are not in conflict - if hlv.isDominating(otherVector) || otherVector.isDominating(*hlv) { +// IsVersionInConflict tests to see if a given version would be in conflict with the in memory HLV. +func (hlv *HybridLogicalVector) IsVersionInConflict(version Version) bool { + v1 := Version{hlv.SourceID, hlv.Version} + if v1.isVersionDominating(version) || version.isVersionDominating(v1) { return false } - // if the version vectors aren't dominating over one another then conflict is present return true } +// IsVersionKnown checks to see whether the HLV already contains a Version for the provided +// source with a matching or newer value +func (hlv *HybridLogicalVector) DominatesSource(version Version) bool { + existingValueForSource, found := hlv.GetValue(version.SourceID) + if !found { + return false + } + return existingValueForSource >= base.HexCasToUint64(version.Value) + +} + // AddVersion adds newVersion to the in memory representation of the HLV. func (hlv *HybridLogicalVector) AddVersion(newVersion Version) error { var newVersionCAS uint64 hlvVersionCAS := base.HexCasToUint64(hlv.Version) if newVersion.Value != hlvExpandMacroCASValue { newVersionCAS = base.HexCasToUint64(newVersion.Value) - if newVersionCAS < hlvVersionCAS { - return fmt.Errorf("attempting to add new version vector entry with a CAS that is less than the current version CAS value. Current cas: %s new cas %s", hlv.Version, newVersion.Value) - } } // check if this is the first time we're adding a source - version pair if hlv.SourceID == "" { @@ -173,6 +180,9 @@ func (hlv *HybridLogicalVector) AddVersion(newVersion Version) error { } // if new entry has the same source we simple just update the version if newVersion.SourceID == hlv.SourceID { + if newVersion.Value != hlvExpandMacroCASValue && newVersionCAS < hlvVersionCAS { + return fmt.Errorf("attempting to add new version vector entry with a CAS that is less than the current version CAS value for the same source. Current cas: %s new cas %s", hlv.Version, newVersion.Value) + } hlv.Version = newVersion.Value return nil } @@ -210,19 +220,22 @@ func (hlv *HybridLogicalVector) Remove(source string) error { return nil } -// isDominating tests if in memory HLV is dominating over another -func (hlv *DecodedHybridLogicalVector) isDominating(otherVector DecodedHybridLogicalVector) bool { - // Dominating Criteria: - // HLV A dominates HLV B if source(A) == source(B) and version(A) > version(B) - // If there is an entry in pv(B) for A's current source and version(A) > B's version for that pv entry then A is dominating - // if there is an entry in mv(B) for A's current source and version(A) > B's version for that pv entry then A is dominating +// isDominating tests if in memory HLV is dominating over another. +// If HLV A dominates CV of HLV B, it can be assumed to dominate the entire HLV, since +// CV dominates PV for a given HLV. Given this, it's sufficient to check whether HLV A +// has a version for HLV B's current source that's greater than or equal to HLV B's current version. +func (hlv *HybridLogicalVector) isDominating(otherVector HybridLogicalVector) bool { + return hlv.DominatesSource(Version{otherVector.SourceID, otherVector.Version}) +} - // Grab the latest CAS version for HLV(A)'s sourceID in HLV(B), if HLV(A) version CAS is > HLV(B)'s then it is dominating - // If 0 CAS is returned then the sourceID does not exist on HLV(B) - if latestCAS, found := otherVector.GetVersion(hlv.SourceID); found && hlv.Version > latestCAS { +// isVersionDominating tests if v2 is dominating v1 +func (v1 *Version) isVersionDominating(v2 Version) bool { + if v1.SourceID != v2.SourceID { + return false + } + if v1.Value > v2.Value { return true } - // HLV A is not dominating over HLV B return false } @@ -274,9 +287,9 @@ func (hlv *DecodedHybridLogicalVector) equalPreviousVectors(otherVector DecodedH return true } -// GetVersion returns the latest CAS value in the HLV for a given sourceID along with boolean value to +// GetValue returns the latest CAS value in the HLV for a given sourceID along with boolean value to // indicate if sourceID is found in the HLV, if the sourceID is not present in the HLV it will return 0 CAS value and false -func (hlv *DecodedHybridLogicalVector) GetVersion(sourceID string) (uint64, bool) { +func (hlv *DecodedHybridLogicalVector) GetValue(sourceID string) (uint64, bool) { if sourceID == "" { return 0, false } @@ -298,7 +311,7 @@ func (hlv *DecodedHybridLogicalVector) GetVersion(sourceID string) (uint64, bool } // GetVersion returns the latest decoded CAS value in the HLV for a given sourceID -func (hlv *HybridLogicalVector) GetVersion(sourceID string) (uint64, bool) { +func (hlv *HybridLogicalVector) GetValue(sourceID string) (uint64, bool) { if sourceID == "" { return 0, false } @@ -386,7 +399,7 @@ func (hlv *HybridLogicalVector) setPreviousVersion(source string, version string } func (hlv *HybridLogicalVector) IsVersionKnown(otherVersion Version) bool { - value, found := hlv.GetVersion(otherVersion.SourceID) + value, found := hlv.GetValue(otherVersion.SourceID) if !found { return false } @@ -467,4 +480,109 @@ func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec, currentSpec = append(currentSpec, spec) } return currentSpec + +} + +// extractHLVFromBlipMessage extracts the full HLV a string in the format seen over Blip +// blip string may be the following formats +// 1. cv only: cv +// 2. cv and pv: cv;pv +// 3. cv, pv, and mv: cv;mv;pv +// +// TODO: CBG-3662 - Optimise once we've settled on and tested the format with CBL +func extractHLVFromBlipMessage(versionVectorStr string) (HybridLogicalVector, error) { + hlv := HybridLogicalVector{} + + vectorFields := strings.Split(versionVectorStr, ";") + vectorLength := len(vectorFields) + if (vectorLength == 1 && vectorFields[0] == "") || vectorLength > 3 { + return HybridLogicalVector{}, fmt.Errorf("invalid hlv in changes message received") + } + + // add current version (should always be present) + cvStr := vectorFields[0] + version := strings.Split(cvStr, "@") + if len(version) < 2 { + return HybridLogicalVector{}, fmt.Errorf("invalid version in changes message received") + } + + err := hlv.AddVersion(Version{SourceID: version[1], Value: version[0]}) + if err != nil { + return HybridLogicalVector{}, err + } + + switch vectorLength { + case 1: + // cv only + return hlv, nil + case 2: + // only cv and pv present + sourceVersionListPV, err := parseVectorValues(vectorFields[1]) + if err != nil { + return HybridLogicalVector{}, err + } + hlv.PreviousVersions = make(map[string]string) + for _, v := range sourceVersionListPV { + hlv.PreviousVersions[v.SourceID] = v.Value + } + return hlv, nil + case 3: + // cv, mv and pv present + sourceVersionListPV, err := parseVectorValues(vectorFields[2]) + hlv.PreviousVersions = make(map[string]string) + if err != nil { + return HybridLogicalVector{}, err + } + for _, pv := range sourceVersionListPV { + hlv.PreviousVersions[pv.SourceID] = pv.Value + } + + sourceVersionListMV, err := parseVectorValues(vectorFields[1]) + hlv.MergeVersions = make(map[string]string) + if err != nil { + return HybridLogicalVector{}, err + } + for _, mv := range sourceVersionListMV { + hlv.MergeVersions[mv.SourceID] = mv.Value + } + return hlv, nil + default: + return HybridLogicalVector{}, fmt.Errorf("invalid hlv in changes message received") + } +} + +// parseVectorValues takes an HLV section (cv, pv or mv) in string form and splits into +// source and version pairs +func parseVectorValues(vectorStr string) (versions []Version, err error) { + versionsStr := strings.Split(vectorStr, ",") + versions = make([]Version, 0, len(versionsStr)) + + for _, v := range versionsStr { + // remove any leading whitespace form the string value + // TODO: Can avoid by restricting spec + if len(v) > 0 && v[0] == ' ' { + v = v[1:] + } + version, err := ParseVersion(v) + if err != nil { + return nil, err + } + versions = append(versions, version) + } + + return versions, nil +} + +// Helper functions for version source and value encoding +func EncodeSource(source string) string { + return base64.StdEncoding.EncodeToString([]byte(source)) +} + +func EncodeValue(value uint64) string { + return base.CasToString(value) +} + +// EncodeValueStr converts a simplified number ("1") to a hex-encoded string +func EncodeValueStr(value string) (string, error) { + return base.StringDecimalToLittleEndianHex(strings.TrimSpace(value)) } diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index 3e4a94b885..c006f6989d 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -26,19 +26,19 @@ import ( // - Tests methods GetCurrentVersion, AddVersion and Remove func TestInternalHLVFunctions(t *testing.T) { pv := make(map[string]string) - currSourceId := base64.StdEncoding.EncodeToString([]byte("5pRi8Piv1yLcLJ1iVNJIsA")) - currVersion := string(base.Uint64CASToLittleEndianHex(12345678)) - pv[base64.StdEncoding.EncodeToString([]byte("YZvBpEaztom9z5V/hDoeIw"))] = string(base.Uint64CASToLittleEndianHex(64463204720)) + currSourceId := EncodeSource("5pRi8Piv1yLcLJ1iVNJIsA") + currVersion := EncodeValue(12345678) + pv[EncodeSource("YZvBpEaztom9z5V/hDoeIw")] = EncodeValue(64463204720) inputHLV := []string{"5pRi8Piv1yLcLJ1iVNJIsA@12345678", "YZvBpEaztom9z5V/hDoeIw@64463204720", "m_NqiIe0LekFPLeX4JvTO6Iw@345454"} hlv := createHLVForTest(t, inputHLV) - newCAS := string(base.Uint64CASToLittleEndianHex(123456789)) + newCAS := EncodeValue(123456789) const newSource = "s_testsource" // create a new version vector entry that will error method AddVersion badNewVector := Version{ - Value: string(base.Uint64CASToLittleEndianHex(123345)), + Value: EncodeValue(123345), SourceID: currSourceId, } // create a new version vector entry that should be added to HLV successfully @@ -78,7 +78,7 @@ func TestInternalHLVFunctions(t *testing.T) { } // TestConflictDetectionDominating: -// - Tests two cases where one HLV's is said to be 'dominating' over another and thus not in conflict +// - Tests cases where one HLV's is said to be 'dominating' over another // - Test case 1: where sourceID is the same between HLV's but HLV(A) has higher version CAS than HLV(B) thus A dominates // - Test case 2: where sourceID is different and HLV(A) sourceID is present in HLV(B) PV and HLV(A) has dominating version // - Test case 3: where sourceID is different and HLV(A) sourceID is present in HLV(B) MV and HLV(A) has dominating version @@ -86,39 +86,71 @@ func TestInternalHLVFunctions(t *testing.T) { // - Assert that all scenarios returns false from IsInConflict method, as we have a HLV that is dominating in each case func TestConflictDetectionDominating(t *testing.T) { testCases := []struct { - name string - inputListHLVA []string - inputListHLVB []string + name string + inputListHLVA []string + inputListHLVB []string + expectedResult bool }{ { - name: "Test case 1", - inputListHLVA: []string{"cluster1@20", "cluster2@2"}, - inputListHLVB: []string{"cluster1@10", "cluster2@1"}, + name: "Matching current source, newer version", + inputListHLVA: []string{"cluster1@20", "cluster2@2"}, + inputListHLVB: []string{"cluster1@10", "cluster2@1"}, + expectedResult: true, + }, { + name: "Matching current source and version", + inputListHLVA: []string{"cluster1@20", "cluster2@2"}, + inputListHLVB: []string{"cluster1@20", "cluster2@1"}, + expectedResult: true, }, { - name: "Test case 2", - inputListHLVA: []string{"cluster1@20", "cluster3@3"}, - inputListHLVB: []string{"cluster2@10", "cluster1@15"}, + name: "B CV found in A's PV", + inputListHLVA: []string{"cluster1@20", "cluster2@10"}, + inputListHLVB: []string{"cluster2@10", "cluster1@15"}, + expectedResult: true, }, { - name: "Test case 3", - inputListHLVA: []string{"cluster1@20", "cluster3@3"}, - inputListHLVB: []string{"cluster2@10", "m_cluster1@12", "m_cluster2@11"}, + name: "B CV older than A's PV for same source", + inputListHLVA: []string{"cluster1@20", "cluster2@10"}, + inputListHLVB: []string{"cluster2@10", "cluster1@15"}, + expectedResult: true, }, { - name: "Test case 4", - inputListHLVA: []string{"cluster2@10", "cluster1@15"}, - - inputListHLVB: []string{"cluster1@20", "cluster3@3"}, + name: "Unique sources in A", + inputListHLVA: []string{"cluster1@20", "cluster2@15", "cluster3@3"}, + inputListHLVB: []string{"cluster2@10", "cluster1@10"}, + expectedResult: true, + }, + { + name: "Unique sources in B", + inputListHLVA: []string{"cluster1@20"}, + inputListHLVB: []string{"cluster1@15", "cluster3@3"}, + expectedResult: true, + }, + { + name: "B has newer cv", + inputListHLVA: []string{"cluster1@10"}, + inputListHLVB: []string{"cluster1@15"}, + expectedResult: false, + }, + { + name: "B has newer cv than A pv", + inputListHLVA: []string{"cluster2@20", "cluster1@10"}, + inputListHLVB: []string{"cluster1@15", "cluster2@20"}, + expectedResult: false, + }, + { + name: "B's cv not found in A", + inputListHLVA: []string{"cluster2@20", "cluster1@10"}, + inputListHLVB: []string{"cluster3@5"}, + expectedResult: false, }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { hlvA := createHLVForTest(t, testCase.inputListHLVA) hlvB := createHLVForTest(t, testCase.inputListHLVB) - decHLVA := hlvA.ToDecodedHybridLogicalVector() - decHLVB := hlvB.ToDecodedHybridLogicalVector() - require.False(t, decHLVA.IsInConflict(decHLVB)) + require.True(t, hlvA.isDominating(hlvB) == testCase.expectedResult) + }) } } @@ -163,21 +195,6 @@ func TestConflictEqualHLV(t *testing.T) { require.False(t, decHLVA.isEqual(decHLVB)) } -// TestConflictExample: -// - Takes example conflict scenario from PRD to see if we correctly identify conflict in that scenario -// - Creates two HLV's similar to ones in example and calls IsInConflict to assert it returns true -func TestConflictExample(t *testing.T) { - input := []string{"cluster1@11", "cluster3@2", "cluster2@4"} - inMemoryHLV := createHLVForTest(t, input) - - input = []string{"cluster2@2", "cluster3@3"} - otherVector := createHLVForTest(t, input) - - inMemoryHLVDec := inMemoryHLV.ToDecodedHybridLogicalVector() - otherVectorDec := otherVector.ToDecodedHybridLogicalVector() - require.True(t, inMemoryHLVDec.IsInConflict(otherVectorDec)) -} - // createHLVForTest is a helper function to create a HLV for use in a test. Takes a list of strings in the format of and assumes // first entry is current version. For merge version entries you must specify 'm_' as a prefix to sourceID NOTE: it also sets cvCAS to the current version func createHLVForTest(tb *testing.T, inputList []string) HybridLogicalVector { @@ -186,25 +203,25 @@ func createHLVForTest(tb *testing.T, inputList []string) HybridLogicalVector { // first element will be current version and source pair currentVersionPair := strings.Split(inputList[0], "@") hlvOutput.SourceID = base64.StdEncoding.EncodeToString([]byte(currentVersionPair[0])) - version, err := strconv.ParseUint(currentVersionPair[1], 10, 64) + value, err := strconv.ParseUint(currentVersionPair[1], 10, 64) require.NoError(tb, err) - vrsEncoded := string(base.Uint64CASToLittleEndianHex(version)) + vrsEncoded := EncodeValue(value) hlvOutput.Version = vrsEncoded hlvOutput.CurrentVersionCAS = vrsEncoded // remove current version entry in list now we have parsed it into the HLV inputList = inputList[1:] - for _, value := range inputList { - currentVersionPair = strings.Split(value, "@") - version, err = strconv.ParseUint(currentVersionPair[1], 10, 64) + for _, version := range inputList { + currentVersionPair = strings.Split(version, "@") + value, err = strconv.ParseUint(currentVersionPair[1], 10, 64) require.NoError(tb, err) if strings.HasPrefix(currentVersionPair[0], "m_") { // add entry to merge version removing the leading prefix for sourceID - hlvOutput.MergeVersions[base64.StdEncoding.EncodeToString([]byte(currentVersionPair[0][2:]))] = string(base.Uint64CASToLittleEndianHex(version)) + hlvOutput.MergeVersions[EncodeSource(currentVersionPair[0][2:])] = EncodeValue(value) } else { // if it's not got the prefix we assume it's a previous version entry - hlvOutput.PreviousVersions[base64.StdEncoding.EncodeToString([]byte(currentVersionPair[0]))] = string(base.Uint64CASToLittleEndianHex(version)) + hlvOutput.PreviousVersions[EncodeSource(currentVersionPair[0])] = EncodeValue(value) } } return hlvOutput @@ -283,7 +300,7 @@ func TestHLVImport(t *testing.T) { existingBody, existingXattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, existingHLVKey, []string{base.SyncXattrName}) require.NoError(t, err) existingXattr := existingXattrs[base.SyncXattrName] - encodedCAS = string(base.Uint64CASToLittleEndianHex(cas)) + encodedCAS = EncodeValue(cas) _, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattr, nil, false, cas, nil, ImportFromFeed) require.NoError(t, err, "import error") @@ -347,3 +364,142 @@ func TestHLVMapToCBLString(t *testing.T) { }) } } + +// TestInvalidHLVOverChangesMessage: +// - Test hlv string that has too many sections to it (parts delimited by ;) +// - Test hlv string that is empty +// - Assert that extractHLVFromBlipMessage will return error in both cases +func TestInvalidHLVInBlipMessageForm(t *testing.T) { + hlvStr := "25@def; 22@def,21@eff; 20@abc,18@hij; 222@hiowdwdew, 5555@dhsajidfgd" + + hlv, err := extractHLVFromBlipMessage(hlvStr) + require.Error(t, err) + assert.ErrorContains(t, err, "invalid hlv in changes message received") + assert.Equal(t, HybridLogicalVector{}, hlv) + + hlvStr = "" + hlv, err = extractHLVFromBlipMessage(hlvStr) + require.Error(t, err) + assert.ErrorContains(t, err, "invalid hlv in changes message received") + assert.Equal(t, HybridLogicalVector{}, hlv) +} + +var extractHLVFromBlipMsgBMarkCases = []struct { + name string + hlvString string + expectedHLV []string + mergeVersions bool + previousVersions bool +}{ + { + name: "mv and pv, leading spaces", // with spaces + hlvString: "25@def; 22@def, 21@eff, 500@x, 501@xx, 4000@xxx, 700@y, 701@yy, 702@yyy; 20@abc, 18@hij, 3@x, 4@xx, 5@xxx, 6@xxxx, 7@xxxxx, 3@y, 4@yy, 5@yyy, 6@yyyy, 7@yyyyy, 2@xy, 3@xyy, 4@xxy", // 15 pv 8 mv + expectedHLV: []string{"def@25", "abc@20", "hij@18", "x@3", "xx@4", "xxx@5", "xxxx@6", "xxxxx@7", "y@3", "yy@4", "yyy@5", "yyyy@6", "yyyyy@7", "xy@2", "xyy@3", "xxy@4", "m_def@22", "m_eff@21", "m_x@500", "m_xx@501", "m_xxx@4000", "m_y@700", "m_yy@701", "m_yyy@702"}, + previousVersions: true, + mergeVersions: true, + }, + { + name: "mv and pv, no spaces", // without spaces + hlvString: "25@def;22@def,21@eff,500@x,501@xx,4000@xxx,700@y,701@yy,702@yyy;20@abc,18@hij,3@x,4@xx,5@xxx,6@xxxx,7@xxxxx,3@y,4@yy,5@yyy,6@yyyy,7@yyyyy,2@xy,3@xyy,4@xxy", // 15 pv 8 mv + expectedHLV: []string{"def@25", "abc@20", "hij@18", "x@3", "xx@4", "xxx@5", "xxxx@6", "xxxxx@7", "y@3", "yy@4", "yyy@5", "yyyy@6", "yyyyy@7", "xy@2", "xyy@3", "xxy@4", "m_def@22", "m_eff@21", "m_x@500", "m_xx@501", "m_xxx@4000", "m_y@700", "m_yy@701", "m_yyy@702"}, + previousVersions: true, + mergeVersions: true, + }, + { + name: "pv only", + hlvString: "25@def; 20@abc,18@hij", + expectedHLV: []string{"def@25", "abc@20", "hij@18"}, + previousVersions: true, + }, + { + name: "mv and pv, mixed spacing", + hlvString: "25@def; 22@def,21@eff; 20@abc,18@hij,3@x,4@xx,5@xxx,6@xxxx,7@xxxxx,3@y,4@yy,5@yyy,6@yyyy,7@yyyyy,2@xy,3@xyy,4@xxy", // 15 + expectedHLV: []string{"def@25", "abc@20", "hij@18", "x@3", "xx@4", "xxx@5", "xxxx@6", "xxxxx@7", "y@3", "yy@4", "yyy@5", "yyyy@6", "yyyyy@7", "xy@2", "xyy@3", "xxy@4", "m_def@22", "m_eff@21"}, + mergeVersions: true, + previousVersions: true, + }, + { + name: "cv only", + hlvString: "24@def", + expectedHLV: []string{"def@24"}, + }, + { + name: "cv and mv,base64 encoded", + hlvString: "1@Hell0CA; 1@1Hr0k43xS662TToxODDAxQ", + expectedHLV: []string{"Hell0CA@1", "1Hr0k43xS662TToxODDAxQ@1"}, + previousVersions: true, + }, + { + name: "cv and mv - small", + hlvString: "25@def; 22@def,21@eff; 20@abc,18@hij", + expectedHLV: []string{"def@25", "abc@20", "hij@18", "m_def@22", "m_eff@21"}, + mergeVersions: true, + previousVersions: true, + }, +} + +// TestExtractHLVFromChangesMessage: +// - Test case 1: CV entry and 1 PV entry +// - Test case 2: CV entry and 2 PV entries +// - Test case 3: CV entry, 2 MV entries and 2 PV entries +// - Test case 4: just CV entry +// - Each test case gets run through extractHLVFromBlipMessage and assert that the resulting HLV +// is correct to what is expected +func TestExtractHLVFromChangesMessage(t *testing.T) { + for _, test := range extractHLVFromBlipMsgBMarkCases { + t.Run(test.name, func(t *testing.T) { + expectedVector := createHLVForTest(t, test.expectedHLV) + + // TODO: When CBG-3662 is done, should be able to simplify base64 handling to treat source as a string + // that may represent a base64 encoding + base64EncodedHlvString := cblEncodeTestSources(test.hlvString) + hlv, err := extractHLVFromBlipMessage(base64EncodedHlvString) + require.NoError(t, err) + + assert.Equal(t, expectedVector.SourceID, hlv.SourceID) + assert.Equal(t, expectedVector.Version, hlv.Version) + if test.previousVersions { + assert.True(t, reflect.DeepEqual(expectedVector.PreviousVersions, hlv.PreviousVersions)) + } + if test.mergeVersions { + assert.True(t, reflect.DeepEqual(expectedVector.MergeVersions, hlv.MergeVersions)) + } + }) + } +} + +func BenchmarkExtractHLVFromBlipMessage(b *testing.B) { + for _, bm := range extractHLVFromBlipMsgBMarkCases { + b.Run(bm.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + _, _ = extractHLVFromBlipMessage(bm.hlvString) + } + }) + } +} + +// cblEncodeTestSources converts the simplified versions in test data to CBL-style encoding +func cblEncodeTestSources(hlvString string) (base64HLVString string) { + + vectorFields := strings.Split(hlvString, ";") + vectorLength := len(vectorFields) + if vectorLength == 0 { + return hlvString + } + + // first vector field is single vector, cv + base64HLVString += EncodeTestVersion(vectorFields[0]) + for _, field := range vectorFields[1:] { + base64HLVString += ";" + versions := strings.Split(field, ",") + if len(versions) == 0 { + continue + } + base64HLVString += EncodeTestVersion(versions[0]) + for _, version := range versions[1:] { + base64HLVString += "," + base64HLVString += EncodeTestVersion(version) + } + } + return base64HLVString +} diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index 76204d7cf0..e0277b9600 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -331,8 +331,8 @@ func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBa // nolint:staticcheck func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv Version) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, hlv *HybridLogicalVector, err error) { if bodyBytes, body, attachments, err = backingStore.getCurrentVersion(ctx, doc); err != nil { - // TODO: pending CBG-3213 support of channel removal for CV - // we need implementation of IsChannelRemoval for CV here. + // TODO: CBG-3814 - pending support of channel removal for CV + base.ErrorfCtx(ctx, "pending CBG-3213 support of channel removal for CV: %v", err) } if err = doc.HasCurrentVersion(ctx, cv); err != nil { diff --git a/db/utilities_hlv_testing.go b/db/utilities_hlv_testing.go index e7fafb2b3c..854899897b 100644 --- a/db/utilities_hlv_testing.go +++ b/db/utilities_hlv_testing.go @@ -12,7 +12,8 @@ package db import ( "context" - "encoding/base64" + "fmt" + "strings" "testing" sgbucket "github.com/couchbase/sg-bucket" @@ -34,7 +35,7 @@ func NewHLVAgent(t *testing.T, datastore base.DataStore, source string, xattrNam return &HLVAgent{ t: t, datastore: datastore, - Source: base64.StdEncoding.EncodeToString([]byte(source)), // all writes by the HLVHelper are done as this source + Source: EncodeSource(source), // all writes by the HLVHelper are done as this source xattrName: xattrName, } } @@ -64,3 +65,95 @@ func (h *HLVAgent) InsertWithHLV(ctx context.Context, key string) (casOut uint64 require.NoError(h.t, err) return cas } + +// EncodeTestVersion converts a simplified string version of the form 1@abc to a hex-encoded version and base64 encoded +// source, like 0x0100000000000000@YWJj. Allows use of simplified versions in tests for readability, ease of use. +func EncodeTestVersion(versionString string) (encodedString string) { + timestampString, source, found := strings.Cut(versionString, "@") + if !found { + return versionString + } + hexTimestamp, err := EncodeValueStr(timestampString) + if err != nil { + panic(fmt.Sprintf("unable to encode timestampString %v", timestampString)) + } + base64Source := EncodeSource(source) + return hexTimestamp + "@" + base64Source +} + +// encodeTestHistory converts a simplified version history of the form "1@abc,2@def;3@ghi" to use hex-encoded versions and +// base64 encoded sources +func EncodeTestHistory(historyString string) (encodedString string) { + // possible versionSets are pv;mv + // possible versionSets are pv;mv + versionSets := strings.Split(historyString, ";") + if len(versionSets) == 0 { + return "" + } + for index, versionSet := range versionSets { + // versionSet delimiter + if index > 0 { + encodedString += ";" + } + versions := strings.Split(versionSet, ",") + for index, version := range versions { + // version delimiter + if index > 0 { + encodedString += "," + } + encodedString += EncodeTestVersion(version) + } + } + return encodedString +} + +// ParseTestHistory takes a string test history in the form 1@abc,2@def;3@ghi,4@jkl and formats this +// as pv and mv maps keyed by encoded source, with encoded values +func ParseTestHistory(t *testing.T, historyString string) (pv map[string]string, mv map[string]string) { + versionSets := strings.Split(historyString, ";") + + pv = make(map[string]string) + mv = make(map[string]string) + + var pvString, mvString string + switch len(versionSets) { + case 1: + pvString = versionSets[0] + case 2: + mvString = versionSets[0] + pvString = versionSets[1] + default: + return pv, mv + } + + // pv + for _, versionStr := range strings.Split(pvString, ",") { + version, err := ParseVersion(versionStr) + require.NoError(t, err) + encodedValue, err := EncodeValueStr(version.Value) + require.NoError(t, err) + pv[EncodeSource(version.SourceID)] = encodedValue + } + + // mv + if mvString != "" { + for _, versionStr := range strings.Split(mvString, ",") { + version, err := ParseVersion(versionStr) + require.NoError(t, err) + encodedValue, err := EncodeValueStr(version.Value) + require.NoError(t, err) + mv[EncodeSource(version.SourceID)] = encodedValue + } + } + return pv, mv +} + +// Requires that the CV for the provided HLV matches the expected CV (sent in simplified test format) +func RequireCVEqual(t *testing.T, hlv *HybridLogicalVector, expectedCV string) { + testVersion, err := ParseVersion(expectedCV) + require.NoError(t, err) + require.Equal(t, EncodeSource(testVersion.SourceID), hlv.SourceID) + encodedValue, err := EncodeValueStr(testVersion.Value) + require.NoError(t, err) + require.Equal(t, encodedValue, hlv.Version) +} diff --git a/rest/attachment_test.go b/rest/attachment_test.go index b03b4d41f7..67df6cb566 100644 --- a/rest/attachment_test.go +++ b/rest/attachment_test.go @@ -2265,7 +2265,7 @@ func TestUpdateExistingAttachment(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol (CBG-3797) const ( doc1ID = "doc1" doc2ID = "doc2" @@ -2328,7 +2328,7 @@ func TestPushUnknownAttachmentAsStub(t *testing.T) { } const doc1ID = "doc1" btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol (CBG-3797) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, rtConfig) @@ -2378,7 +2378,7 @@ func TestMinRevPosWorkToAvoidUnnecessaryProveAttachment(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol (CBG-3797) const docID = "doc" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { @@ -2420,7 +2420,7 @@ func TestAttachmentWithErroneousRevPos(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol (CBG-3797) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, rtConfig) @@ -2602,7 +2602,7 @@ func TestCBLRevposHandling(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol (CBG-3797) const ( doc1ID = "doc1" doc2ID = "doc2" diff --git a/rest/blip_api_attachment_test.go b/rest/blip_api_attachment_test.go index d35941800b..6b9a5b486e 100644 --- a/rest/blip_api_attachment_test.go +++ b/rest/blip_api_attachment_test.go @@ -45,8 +45,7 @@ func TestBlipPushPullV2AttachmentV2Client(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - // given this test is for v2 protocol, skip version vector test - btcRunner.SkipSubtest[VersionVectorSubtestName] = true + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Doesn't require HLV - attachment v2 protocol test const docID = "doc1" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { @@ -120,7 +119,7 @@ func TestBlipPushPullV2AttachmentV3Client(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Doesn't require HLV - attachment v2 protocol test const docID = "doc1" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { @@ -193,7 +192,7 @@ func TestBlipProveAttachmentV2(t *testing.T) { ) btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // v2 protocol test + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Doesn't require HLV - attachment v2 protocol test btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, &rtConfig) @@ -251,7 +250,7 @@ func TestBlipProveAttachmentV2Push(t *testing.T) { ) btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // v2 protocol test + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Doesn't require HLV - attachment v2 protocol test btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, &rtConfig) @@ -291,7 +290,7 @@ func TestBlipPushPullNewAttachmentCommonAncestor(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires push replication (CBG-3255) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires HLV revpos handling (CBG-3797) const docID = "doc1" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { @@ -367,7 +366,7 @@ func TestBlipPushPullNewAttachmentNoCommonAncestor(t *testing.T) { const docID = "doc1" btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires push replication (CBG-3255) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires HLV revpos handling (CBG-3797) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, &rtConfig) @@ -527,12 +526,13 @@ func TestPutAttachmentViaBlipGetViaBlip(t *testing.T) { // TestBlipAttachNameChange tests CBL handling - attachments with changed names are sent as stubs, and not new attachments func TestBlipAttachNameChange(t *testing.T) { base.SetUpTestLogging(t, base.LevelInfo, base.KeySync, base.KeySyncMsg, base.KeyWebSocket, base.KeyWebSocketFrame, base.KeyHTTP, base.KeyCRUD) + rtConfig := &RestTesterConfig{ GuestEnabled: true, } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires push replication (CBG-3255) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires HLV revpos handling (CBG-3797) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, rtConfig) @@ -582,7 +582,7 @@ func TestBlipLegacyAttachNameChange(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires push replication (CBG-3255) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires HLV revpos handling (CBG-3797) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, rtConfig) @@ -640,7 +640,7 @@ func TestBlipLegacyAttachDocUpdate(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires push replication (CBG-3255) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires HLV revpos handling (CBG-3797) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, rtConfig) diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index b443f34478..b797c685a5 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -17,6 +17,7 @@ import ( "log" "net/http" "net/url" + "reflect" "strconv" "strings" "sync" @@ -1720,6 +1721,106 @@ func TestPutRevConflictsMode(t *testing.T) { } +// TestPutRevV4: +// - Create blip tester to run with V4 protocol +// - Use send rev with CV defined in rev field and history field with PV/MV defined +// - Retrieve the doc from bucket and assert that the HLV is set to what has been sent over the blip tester +func TestPutRevV4(t *testing.T) { + base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg) + + // Create blip tester with v4 protocol + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + connectingUsername: "user1", + connectingPassword: "1234", + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + require.NoError(t, err, "Unexpected error creating BlipTester") + defer bt.Close() + collection := bt.restTester.GetSingleTestDatabaseCollection() + + // 1. Send rev with history + history := "1@def, 2@abc" + sent, _, resp, err := bt.SendRev("foo", db.EncodeTestVersion("3@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(history)}) + assert.True(t, sent) + require.NoError(t, err) + assert.Equal(t, "", resp.Properties["Error-Code"]) + + // Validate against the bucket doc's HLV + doc, _, err := collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + require.NoError(t, err) + pv, _ := db.ParseTestHistory(t, history) + db.RequireCVEqual(t, doc.HLV, "3@efg") + assert.Equal(t, db.EncodeValue(doc.Cas), doc.HLV.CurrentVersionCAS) + assert.True(t, reflect.DeepEqual(pv, doc.HLV.PreviousVersions)) + + // 2. Update the document with a non-conflicting revision, where only cv is updated + sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("4@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(history)}) + assert.True(t, sent) + require.NoError(t, err) + assert.Equal(t, "", resp.Properties["Error-Code"]) + + // Validate against the bucket doc's HLV + doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + require.NoError(t, err) + db.RequireCVEqual(t, doc.HLV, "4@efg") + assert.Equal(t, db.EncodeValue(doc.Cas), doc.HLV.CurrentVersionCAS) + assert.True(t, reflect.DeepEqual(pv, doc.HLV.PreviousVersions)) + + // 3. Update the document again with a non-conflicting revision from a different source (previous cv moved to pv) + updatedHistory := "1@def, 2@abc, 4@efg" + sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("1@jkl"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) + assert.True(t, sent) + require.NoError(t, err) + assert.Equal(t, "", resp.Properties["Error-Code"]) + + // Validate against the bucket doc's HLV + doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + require.NoError(t, err) + pv, _ = db.ParseTestHistory(t, updatedHistory) + db.RequireCVEqual(t, doc.HLV, "1@jkl") + assert.Equal(t, db.EncodeValue(doc.Cas), doc.HLV.CurrentVersionCAS) + assert.True(t, reflect.DeepEqual(pv, doc.HLV.PreviousVersions)) + + // 4. Update the document again with a non-conflicting revision from a different source, and additional sources in history (previous cv moved to pv, and pv expanded) + updatedHistory = "1@def, 2@abc, 4@efg, 1@jkl, 1@mmm" + sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("1@nnn"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) + assert.True(t, sent) + require.NoError(t, err) + assert.Equal(t, "", resp.Properties["Error-Code"]) + + // Validate against the bucket doc's HLV + doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + require.NoError(t, err) + pv, _ = db.ParseTestHistory(t, updatedHistory) + db.RequireCVEqual(t, doc.HLV, "1@nnn") + assert.Equal(t, db.EncodeValue(doc.Cas), doc.HLV.CurrentVersionCAS) + assert.True(t, reflect.DeepEqual(pv, doc.HLV.PreviousVersions)) + + // 5. Attempt to update the document again with a conflicting revision from a different source (previous cv not in pv), expect conflict + sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("1@pqr"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) + assert.True(t, sent) + require.Error(t, err) + assert.Equal(t, "409", resp.Properties["Error-Code"]) + + // 6. Test sending rev with merge versions included in history (note new key) + mvHistory := "3@def, 3@abc; 1@def, 2@abc" + sent, _, resp, err = bt.SendRev("boo", db.EncodeTestVersion("3@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(mvHistory)}) + assert.True(t, sent) + require.NoError(t, err) + assert.Equal(t, "", resp.Properties["Error-Code"]) + + // assert on bucket doc + doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "boo", db.DocUnmarshalNoHistory) + require.NoError(t, err) + + pv, mv := db.ParseTestHistory(t, mvHistory) + db.RequireCVEqual(t, doc.HLV, "3@efg") + assert.Equal(t, base.CasToString(doc.Cas), doc.HLV.CurrentVersionCAS) + assert.True(t, reflect.DeepEqual(pv, doc.HLV.PreviousVersions)) + assert.True(t, reflect.DeepEqual(mv, doc.HLV.MergeVersions)) +} + // Repro attempt for SG #3281 // // - Set up a user w/ access to channel A @@ -2479,7 +2580,7 @@ func TestBlipInternalPropertiesHandling(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires push replication (CBG-3255) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires HLV revpos handling (CBG-3797) for _attachments subtest btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { // Setup diff --git a/rest/blip_api_delta_sync_test.go b/rest/blip_api_delta_sync_test.go index a1bfe85d7a..9daf82a439 100644 --- a/rest/blip_api_delta_sync_test.go +++ b/rest/blip_api_delta_sync_test.go @@ -23,10 +23,8 @@ import ( ) // TestBlipDeltaSyncPushAttachment tests updating a doc that has an attachment with a delta that doesn't modify the attachment. - func TestBlipDeltaSyncPushAttachment(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll) - if !base.IsEnterpriseEdition() { t.Skip("Delta test requires EE") } @@ -42,7 +40,7 @@ func TestBlipDeltaSyncPushAttachment(t *testing.T) { const docID = "pushAttachmentDoc" btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // Requires push replication (CBG-3255) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication (CBG-3736) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, rtConfig) @@ -113,7 +111,7 @@ func TestBlipDeltaSyncPushPullNewAttachment(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication (CBG-3736) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, &rtConfig) defer rt.Close() @@ -184,7 +182,7 @@ func TestBlipDeltaSyncNewAttachmentPull(t *testing.T) { GuestEnabled: true, } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication (CBG-3736) const doc1ID = "doc1" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { @@ -282,7 +280,7 @@ func TestBlipDeltaSyncPull(t *testing.T) { const docID = "doc1" var deltaSentCount int64 btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication (CBG-3736) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, rtConfig) @@ -358,7 +356,7 @@ func TestBlipDeltaSyncPullResend(t *testing.T) { GuestEnabled: true, } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication (CBG-3736) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, &rtConfig) @@ -428,7 +426,7 @@ func TestBlipDeltaSyncPullRemoved(t *testing.T) { SyncFn: channels.DocChannelsSyncFunction, } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // v2 protocol test + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // requires delta sync - CBG-3736 const docID = "doc1" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { @@ -493,7 +491,7 @@ func TestBlipDeltaSyncPullTombstoned(t *testing.T) { SyncFn: channels.DocChannelsSyncFunction, } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication" + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication (CBG-3736) var deltaCacheHitsStart int64 var deltaCacheMissesStart int64 @@ -589,7 +587,7 @@ func TestBlipDeltaSyncPullTombstonedStarChan(t *testing.T) { sgUseDeltas := base.IsEnterpriseEdition() rtConfig := &RestTesterConfig{DatabaseConfig: &DatabaseConfig{DbConfig: DbConfig{DeltaSync: &DeltaSyncConfig{Enabled: &sgUseDeltas}}}} btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication (CBG-3736) const docID = "doc1" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { @@ -732,7 +730,7 @@ func TestBlipDeltaSyncPullRevCache(t *testing.T) { } const docID = "doc1" btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication (CBG-3736) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, @@ -817,7 +815,7 @@ func TestBlipDeltaSyncPush(t *testing.T) { GuestEnabled: true, } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication (CBG-3736) const docID = "doc1" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { @@ -924,7 +922,7 @@ func TestBlipNonDeltaSyncPush(t *testing.T) { GuestEnabled: true, } btcRunner := NewBlipTesterClientRunner(t) - btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // delta sync not implemented for Version Vectors replication (CBG-3736) const docID = "doc1" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { diff --git a/rest/blip_client_test.go b/rest/blip_client_test.go index 0c0d88816d..3479db1d84 100644 --- a/rest/blip_client_test.go +++ b/rest/blip_client_test.go @@ -848,15 +848,23 @@ func (btr *BlipTesterReplicator) sendMsg(msg *blip.Message) (err error) { // PushRev creates a revision on the client, and immediately sends a changes request for it. // The rev ID is always: "N-abc", where N is rev generation for predictability. func (btc *BlipTesterCollectionClient) PushRev(docID string, parentVersion DocVersion, body []byte) (DocVersion, error) { - revid, err := btc.PushRevWithHistory(docID, parentVersion.RevTreeID, body, 1, 0) + revID, err := btc.PushRevWithHistory(docID, parentVersion.RevTreeID, body, 1, 0) if err != nil { return DocVersion{}, err } docVersion := btc.GetDocVersion(docID) - require.Equal(btc.parent.rt.TB, docVersion.RevTreeID, revid) + btc.requireRevID(docVersion, revID) return docVersion, nil } +func (btc *BlipTesterCollectionClient) requireRevID(expected DocVersion, revID string) { + if btc.UseHLV() { + require.Equal(btc.parent.rt.TB, expected.CV.String(), revID) + } else { + require.Equal(btc.parent.rt.TB, expected.RevTreeID, revID) + } +} + // GetDocVersion fetches revid and cv directly from the bucket. Used to support REST-based verification in btc tests // even while REST only supports revTreeId func (btc *BlipTesterCollectionClient) GetDocVersion(docID string) DocVersion { @@ -865,7 +873,7 @@ func (btc *BlipTesterCollectionClient) GetDocVersion(docID string) DocVersion { ctx := base.DatabaseLogCtx(base.TestCtx(btc.parent.rt.TB), btc.parent.rt.GetDatabase().Name, nil) doc, err := collection.GetDocument(ctx, docID, db.DocUnmarshalSync) require.NoError(btc.parent.rt.TB, err) - if doc.HLV == nil { + if !btc.UseHLV() { return DocVersion{RevTreeID: doc.CurrentRev} } return DocVersion{RevTreeID: doc.CurrentRev, CV: db.Version{SourceID: doc.HLV.SourceID, Value: doc.HLV.Version}}