Skip to content

Commit

Permalink
CBG-3255 Replication protocol support for HLV - push replication (#6700)
Browse files Browse the repository at this point in the history
* CBG-3255 Push replication support for HLV

Adds push replication support for HLV clients. Delta sync and attachments are not yet supported (pending CBG-3736, CBG-3797).

On proposeChanges, checks whether the incoming CV and parent version represent a new document, known version, valid update, or conflict.  Uses the same handling as revTreeID (conflict if parent version isn’t the server’s current version), with the additional non-conflict case where the incoming CV and server CV share the same source and the incoming CV is a newer version.

For the incoming rev, detects conflict based on the incoming cv (based on the implicit hierarchy in an HLV, where cv > pv > mv).

Includes some test helpers to support writing tests with simplified versions (e.g. 1@abc) while still asserting for encoded source and version.

* Test fixes

* Fixes/cleanup based on PR review

---------

Co-authored-by: Gregory Newman-Smith <gregory.newmansmith@couchbase.com>
  • Loading branch information
2 people authored and bbrks committed Apr 16, 2024
1 parent 6e33350 commit 89cb9a3
Show file tree
Hide file tree
Showing 16 changed files with 896 additions and 171 deletions.
15 changes: 15 additions & 0 deletions base/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,10 @@ func HexCasToUint64(cas string) uint64 {
return binary.LittleEndian.Uint64(casBytes[0:8])
}

func CasToString(cas uint64) string {
return string(Uint64CASToLittleEndianHex(cas))
}

func Uint64CASToLittleEndianHex(cas uint64) []byte {
littleEndian := make([]byte, 8)
binary.LittleEndian.PutUint64(littleEndian, cas)
Expand All @@ -1014,6 +1018,17 @@ func Uint64CASToLittleEndianHex(cas uint64) []byte {
return encodedArray
}

// Converts a string decimal representation ("100") to little endian hex string ("0x64")
func StringDecimalToLittleEndianHex(value string) (string, error) {
intValue, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return "", err
}
hexValue := Uint64CASToLittleEndianHex(intValue)
return string(hexValue), nil

}

func Crc32cHash(input []byte) uint32 {
// crc32.MakeTable already ensures singleton table creation, so shouldn't need to cache.
table := crc32.MakeTable(crc32.Castagnoli)
Expand Down
84 changes: 62 additions & 22 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,12 +801,18 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {

for i, change := range changeList {
docID := change[0].(string)
revID := change[1].(string)
rev := change[1].(string) // rev can represent a RevTree ID or HLV current version
parentRevID := ""
if len(change) > 2 {
parentRevID = change[2].(string)
}
status, currentRev := bh.collection.CheckProposedRev(bh.loggingCtx, docID, revID, parentRevID)
var status ProposedRevStatus
var currentRev string
if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 {
status, currentRev = bh.collection.CheckProposedVersion(bh.loggingCtx, docID, rev, parentRevID)
} else {
status, currentRev = bh.collection.CheckProposedRev(bh.loggingCtx, docID, rev, parentRevID)
}
if status == ProposedRev_OK_IsNew {
// Remember that the doc doesn't exist locally, in order to optimize the upcoming Put:
bh.collectionCtx.notePendingInsertion(docID)
Expand Down Expand Up @@ -948,6 +954,10 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
}
}()

if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 && bh.conflictResolver != nil {
return base.HTTPErrorf(http.StatusNotImplemented, "conflict resolver handling (ISGR) not yet implemented for v4 protocol")
}

// throttle concurrent revs
if cap(bh.inFlightRevsThrottle) > 0 {
select {
Expand All @@ -966,13 +976,13 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err

// Doc metadata comes from the BLIP message metadata, not magic document properties:
docID, found := revMessage.ID()
revID, rfound := revMessage.Rev()
rev, rfound := revMessage.Rev()
if !found || !rfound {
return base.HTTPErrorf(http.StatusBadRequest, "Missing docID or revID")
return base.HTTPErrorf(http.StatusBadRequest, "Missing docID or rev")
}

if bh.readOnly {
return base.HTTPErrorf(http.StatusForbidden, "Replication context is read-only, docID: %s, revID:%s", docID, revID)
return base.HTTPErrorf(http.StatusForbidden, "Replication context is read-only, docID: %s, rev:%s", docID, rev)
}

base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s %s", bh.serialNumber, rq.Profile(), revMessage.String())
Expand All @@ -992,7 +1002,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
return err
}
if removed, ok := body[BodyRemoved].(bool); ok && removed {
base.InfofCtx(bh.loggingCtx, base.KeySync, "Purging doc %v - removed at rev %v", base.UD(docID), revID)
base.InfofCtx(bh.loggingCtx, base.KeySync, "Purging doc %v - removed at rev %v", base.UD(docID), rev)
if err := bh.collection.Purge(bh.loggingCtx, docID); err != nil {
return err
}
Expand All @@ -1004,17 +1014,39 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqStr, err)
} else {
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: rev})
}
}
return nil
}
}

newDoc := &Document{
ID: docID,
RevID: revID,
ID: docID,
}

var history []string
var incomingHLV HybridLogicalVector
// Build history/HLV
if bh.activeCBMobileSubprotocol < CBMobileReplicationV4 {
newDoc.RevID = rev
history = []string{rev}
if historyStr := rq.Properties[RevMessageHistory]; historyStr != "" {
history = append(history, strings.Split(historyStr, ",")...)
}
} else {
versionVectorStr := rev
if historyStr := rq.Properties[RevMessageHistory]; historyStr != "" {
versionVectorStr += ";" + historyStr
}
incomingHLV, err = extractHLVFromBlipMessage(versionVectorStr)
if err != nil {
base.InfofCtx(bh.loggingCtx, base.KeySync, "Error parsing hlv while processing rev for doc %v. HLV:%v Error: %v", base.UD(docID), versionVectorStr, err)
return base.HTTPErrorf(http.StatusUnprocessableEntity, "error extracting hlv from blip message")
}
newDoc.HLV = &incomingHLV
}

newDoc.UpdateBodyBytes(bodyBytes)

injectedAttachmentsForDelta := false
Expand All @@ -1033,7 +1065,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// while retrieving deltaSrcRevID. Couchbase Lite replication guarantees client has access to deltaSrcRevID,
// due to no-conflict write restriction, but we still need to enforce security here to prevent leaking data about previous
// revisions to malicious actors (in the scenario where that user has write but not read access).
deltaSrcRev, err := bh.collection.GetRev(bh.loggingCtx, docID, deltaSrcRevID, false, nil)
var deltaSrcRev DocumentRevision
if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 {
cv := Version{}
cv.SourceID, cv.Value = incomingHLV.GetCurrentVersion()
deltaSrcRev, err = bh.collection.GetCV(bh.loggingCtx, docID, &cv, RevCacheOmitBody)
} else {
deltaSrcRev, err = bh.collection.GetRev(bh.loggingCtx, docID, deltaSrcRevID, false, nil)
}
if err != nil {
return base.HTTPErrorf(http.StatusUnprocessableEntity, "Can't fetch doc %s for deltaSrc=%s %v", base.UD(docID), deltaSrcRevID, err)
}
Expand All @@ -1059,7 +1098,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// err should only ever be a FleeceDeltaError here - but to be defensive, handle other errors too (e.g. somehow reaching this code in a CE build)
if err != nil {
// Something went wrong in the diffing library. We want to know about this!
base.WarnfCtx(bh.loggingCtx, "Error patching deltaSrc %s with %s for doc %s with delta - err: %v", deltaSrcRevID, revID, base.UD(docID), err)
base.WarnfCtx(bh.loggingCtx, "Error patching deltaSrc %s with %s for doc %s with delta - err: %v", deltaSrcRevID, rev, base.UD(docID), err)
return base.HTTPErrorf(http.StatusUnprocessableEntity, "Error patching deltaSrc with delta: %s", err)
}

Expand Down Expand Up @@ -1097,15 +1136,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
}
}

history := []string{revID}
if historyStr := rq.Properties[RevMessageHistory]; historyStr != "" {
history = append(history, strings.Split(historyStr, ",")...)
}

var rawBucketDoc *sgbucket.BucketDocument

// Pull out attachments
if injectedAttachmentsForDelta || bytes.Contains(bodyBytes, []byte(BodyAttachments)) {
// temporarily error here if V4
if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 {
return base.HTTPErrorf(http.StatusNotImplemented, "attachment handling not yet supported for v4 protocol")
}
body := newDoc.Body(bh.loggingCtx)

var currentBucketDoc *Document
Expand Down Expand Up @@ -1142,7 +1180,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
if !ok {
// If we don't have this attachment already, ensure incoming revpos is greater than minRevPos, otherwise
// update to ensure it's fetched and uploaded
bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, revID)
bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, rev)
continue
}

Expand Down Expand Up @@ -1182,15 +1220,15 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// digest is different we need to override the revpos and set it to the current revision to ensure
// the attachment is requested and stored
if int(incomingAttachmentRevpos) <= minRevpos && currentAttachmentDigest != incomingAttachmentDigest {
bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, revID)
bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, rev)
}
}

body[BodyAttachments] = bodyAtts
}

if err := bh.downloadOrVerifyAttachments(rq.Sender, body, minRevpos, docID, currentDigests); err != nil {
base.ErrorfCtx(bh.loggingCtx, "Error during downloadOrVerifyAttachments for doc %s/%s: %v", base.UD(docID), revID, err)
base.ErrorfCtx(bh.loggingCtx, "Error during downloadOrVerifyAttachments for doc %s/%s: %v", base.UD(docID), rev, err)
return err
}

Expand All @@ -1200,7 +1238,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
}

if rawBucketDoc == nil && bh.collectionCtx.checkPendingInsertion(docID) {
// At the time we handled the `propseChanges` request, there was no doc with this docID
// At the time we handled the `proposeChanges` request, there was no doc with this docID
// in the bucket. As an optimization, tell PutExistingRev to assume the doc still doesn't
// exist and bypass getting it from the bucket during the save. If we're wrong, the save
// will fail with a CAS mismatch and the retry will fetch the existing doc.
Expand All @@ -1213,7 +1251,9 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// If the doc is a tombstone we want to allow conflicts when running SGR2
// bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2
forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2)
if bh.conflictResolver != nil {
if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 {
_, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc)
} else if bh.conflictResolver != nil {
_, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV)
} else {
_, _, err = bh.collection.PutExistingRev(bh.loggingCtx, newDoc, history, revNoConflicts, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV)
Expand All @@ -1228,7 +1268,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqProperty, err)
} else {
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: rev})
}
}

Expand Down
81 changes: 71 additions & 10 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
return newRevID, doc, err
}

func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, docHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) {
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) {
var matchRev string
if existingDoc != nil {
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs[base.SyncXattrName], existingDoc.Xattrs[db.userXattrKey()], existingDoc.Cas, DocUnmarshalRev)
Expand Down Expand Up @@ -1109,20 +1109,28 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
}
}

// set up revTreeID for backward compatibility
previousRevTreeID := doc.CurrentRev
prevGeneration, _ := ParseRevID(ctx, previousRevTreeID)
newGeneration := prevGeneration + 1

// Conflict check here
// if doc has no HLV defined this is a new doc we haven't seen before, skip conflict check
if doc.HLV == nil {
doc.HLV = &HybridLogicalVector{}
addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV)
newHLV := NewHybridLogicalVector()
doc.HLV = &newHLV
addNewerVersionsErr := doc.HLV.AddNewerVersions(newDocHLV)
if addNewerVersionsErr != nil {
return nil, nil, false, nil, addNewerVersionsErr
}
} else {
incomingDecodedHLV := docHLV.ToDecodedHybridLogicalVector()
localDecodedHLV := doc.HLV.ToDecodedHybridLogicalVector()
if !incomingDecodedHLV.IsInConflict(localDecodedHLV) {
if doc.HLV.isDominating(newDocHLV) {
base.DebugfCtx(ctx, base.KeyCRUD, "PutExistingCurrentVersion(%q): No new versions to add", base.UD(newDoc.ID))
return nil, nil, false, nil, base.ErrUpdateCancel // No new revisions to add
}
if newDocHLV.isDominating(*doc.HLV) {
// update hlv for all newer incoming source version pairs
addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV)
addNewerVersionsErr := doc.HLV.AddNewerVersions(newDocHLV)
if addNewerVersionsErr != nil {
return nil, nil, false, nil, addNewerVersionsErr
}
Expand All @@ -1132,9 +1140,13 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
return nil, nil, false, nil, base.HTTPErrorf(http.StatusConflict, "Document revision conflict")
}
}
// populate merge versions
if newDocHLV.MergeVersions != nil {
doc.HLV.MergeVersions = newDocHLV.MergeVersions
}

// Process the attachments, replacing bodies with digests.
newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, generation, matchRev, nil)
newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, newGeneration, previousRevTreeID, nil)
if err != nil {
return nil, nil, false, nil, err
}
Expand All @@ -1145,9 +1157,9 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
if err != nil {
return nil, nil, false, nil, err
}
newRev := CreateRevIDWithBytes(generation, matchRev, encoding)
newRev := CreateRevIDWithBytes(newGeneration, previousRevTreeID, encoding)

if err := doc.History.addRevision(newDoc.ID, RevInfo{ID: newRev, Parent: matchRev, Deleted: newDoc.Deleted}); err != nil {
if err := doc.History.addRevision(newDoc.ID, RevInfo{ID: newRev, Parent: previousRevTreeID, Deleted: newDoc.Deleted}); err != nil {
base.InfofCtx(ctx, base.KeyCRUD, "Failed to add revision ID: %s, for doc: %s, error: %v", newRev, base.UD(newDoc.ID), err)
return nil, nil, false, nil, base.ErrRevTreeAddRevFailure
}
Expand Down Expand Up @@ -2277,6 +2289,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
}
}

// ErrUpdateCancel is returned when the incoming revision is already known
if err == base.ErrUpdateCancel {
return nil, "", nil
} else if err != nil {
Expand Down Expand Up @@ -2888,6 +2901,54 @@ func (db *DatabaseCollectionWithUser) CheckProposedRev(ctx context.Context, doci
}
}

// CheckProposedVersion - given DocID and a version in string form, check whether it can be added without conflict.
func (db *DatabaseCollectionWithUser) CheckProposedVersion(ctx context.Context, docid, proposedVersionStr string, previousVersionStr string) (status ProposedRevStatus, currentVersion string) {

proposedVersion, err := ParseVersion(proposedVersionStr)
if err != nil {
base.WarnfCtx(ctx, "Couldn't parse proposed version for doc %q / %q: %v", base.UD(docid), proposedVersionStr, err)
return ProposedRev_Error, ""
}

var previousVersion Version
if previousVersionStr != "" {
var err error
previousVersion, err = ParseVersion(previousVersionStr)
if err != nil {
base.WarnfCtx(ctx, "Couldn't parse previous version for doc %q / %q: %v", base.UD(docid), previousVersionStr, err)
return ProposedRev_Error, ""
}
}

localDocCV := Version{}
doc, err := db.GetDocSyncDataNoImport(ctx, docid, DocUnmarshalNoHistory)
if doc.HLV != nil {
localDocCV.SourceID, localDocCV.Value = doc.HLV.GetCurrentVersion()
}
if err != nil {
if !base.IsDocNotFoundError(err) && err != base.ErrXattrNotFound {
base.WarnfCtx(ctx, "CheckProposedRev(%q) --> %T %v", base.UD(docid), err, err)
return ProposedRev_Error, ""
}
// New document not found on server
return ProposedRev_OK_IsNew, ""
} else if localDocCV == previousVersion {
// Non-conflicting update, client's previous version is server's CV
return ProposedRev_OK, ""
} else if doc.HLV.DominatesSource(proposedVersion) {
// SGW already has this version
return ProposedRev_Exists, ""
} else if localDocCV.SourceID == proposedVersion.SourceID && localDocCV.Value < proposedVersion.Value {
// previousVersion didn't match, but proposed version and server CV have matching source, and proposed version is newer
return ProposedRev_OK, ""
} else {
// Conflict, return the current cv. This may be a false positive conflict if the client has replicated
// the server cv via a different peer. Client is responsible for performing this check based on the
// returned localDocCV
return ProposedRev_Conflict, localDocCV.String()
}
}

const (
xattrMacroCas = "cas" // SyncData.Cas
xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c
Expand Down
Loading

0 comments on commit 89cb9a3

Please sign in to comment.