Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4331: legacy rev handling for version 4 replication protocol #7239

Merged
merged 5 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,8 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
defer func() {
bh.replicationStats.HandleChangesTime.Add(time.Since(startTime).Nanoseconds())
}()
changesContainLegacyRevs := false // keep track if proposed changes have legacy revs for delta sync purposes
versionVectorProtocol := bh.useHLV()

for i, change := range changeList {
docID := change[0].(string)
Expand All @@ -832,9 +834,16 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
}
var status ProposedRevStatus
var currentRev string
if bh.useHLV() {

changeIsVector := false
if versionVectorProtocol {
// only check if rev is vector in VV replication mode
changeIsVector = strings.Contains(rev, "@")
}
if versionVectorProtocol && changeIsVector {
status, currentRev = bh.collection.CheckProposedVersion(bh.loggingCtx, docID, rev, parentRevID)
} else {
changesContainLegacyRevs = true
status, currentRev = bh.collection.CheckProposedRev(bh.loggingCtx, docID, rev, parentRevID)
}
if status == ProposedRev_OK_IsNew {
Expand Down Expand Up @@ -866,8 +875,8 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
}
output.Write([]byte("]"))
response := rq.Response()
// Disable delta sync for protocol versions < 4, CBG-3748 (backwards compatibility for revID delta sync)
if bh.sgCanUseDeltas && bh.useHLV() {
// Disable delta sync for protocol versions < 4 or changes batches that have legacy revs in them, CBG-3748 (backwards compatibility for revID delta sync)
if bh.sgCanUseDeltas && bh.useHLV() && !changesContainLegacyRevs {
base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on proposeChanges response")
response.Properties[ChangesResponseDeltas] = trueProperty
}
Expand All @@ -887,13 +896,13 @@ func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sen
} else if base.IsFleeceDeltaError(err) {
// Something went wrong in the diffing library. We want to know about this!
base.WarnfCtx(ctx, "Falling back to full body replication. Error generating delta from %s to %s for key %s - err: %v", deltaSrcRevID, revID, base.UD(docID), err)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
} else if err == base.ErrDeltaSourceIsTombstone {
base.TracefCtx(ctx, base.KeySync, "Falling back to full body replication. Delta source %s is tombstone. Unable to generate delta to %s for key %s", deltaSrcRevID, revID, base.UD(docID))
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
} else if err != nil {
base.DebugfCtx(ctx, base.KeySync, "Falling back to full body replication. Couldn't get delta from %s to %s for key %s - err: %v", deltaSrcRevID, revID, base.UD(docID), err)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
}

if redactedRev != nil {
Expand All @@ -909,12 +918,12 @@ func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sen

if revDelta == nil {
base.DebugfCtx(ctx, base.KeySync, "Falling back to full body replication. Couldn't get delta from %s to %s for key %s", deltaSrcRevID, revID, base.UD(docID))
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
}

resendFullRevisionFunc := func() error {
base.InfofCtx(ctx, base.KeySync, "Resending revision as full body. Peer couldn't process delta %s from %s to %s for key %s", base.UD(revDelta.DeltaBytes), deltaSrcRevID, revID, base.UD(docID))
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
}

base.TracefCtx(ctx, base.KeySync, "docID: %s - delta: %v", base.UD(docID), base.UD(string(revDelta.DeltaBytes)))
Expand Down Expand Up @@ -1059,7 +1068,8 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
historyStr := rq.Properties[RevMessageHistory]
var incomingHLV *HybridLogicalVector
// Build history/HLV
if !bh.useHLV() {
changeIsVector := strings.Contains(rev, "@")
if !bh.useHLV() || !changeIsVector {
newDoc.RevID = rev
history = []string{rev}
if historyStr != "" {
Expand Down Expand Up @@ -1287,7 +1297,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// If the doc is a tombstone we want to allow conflicts when running SGR2
// bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2
forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2)
if bh.useHLV() {
if bh.useHLV() && changeIsVector {
_, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc)
} else if bh.conflictResolver != nil {
_, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV)
Expand Down
27 changes: 20 additions & 7 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,15 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
if err != nil {
return err
}
versionVectorProtocol := bsc.useHLV()

for i, knownRevsArrayInterface := range answer {
seq := changeArray[i][0].(SequenceID)
docID := changeArray[i][1].(string)
rev := changeArray[i][2].(string)

if knownRevsArray, ok := knownRevsArrayInterface.([]interface{}); ok {
legacyRev := false
deltaSrcRevID := ""
knownRevs := knownRevsByDoc[docID]
if knownRevs == nil {
Expand All @@ -358,10 +360,10 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
// revtree clients. For HLV clients, use the cv as deltaSrc
if bsc.useDeltas && len(knownRevsArray) > 0 {
if revID, ok := knownRevsArray[0].(string); ok {
if bsc.useHLV() {
if versionVectorProtocol {
msgHLV, err := extractHLVFromBlipMessage(revID)
if err != nil {
base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", docID)
base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", base.UD(docID))
deltaSrcRevID = "" // will force falling back to full body replication below
} else {
deltaSrcRevID = msgHLV.GetCurrentVersionString()
Expand All @@ -375,7 +377,12 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
for _, rev := range knownRevsArray {
if revID, ok := rev.(string); ok {
msgHLV, err := extractHLVFromBlipMessage(revID)
if err == nil {
if err != nil {
// assume we have received legacy rev if we cannot parse hlv from known revs, and we are in vv replication
if versionVectorProtocol {
legacyRev = true
}
} else {
// extract cv as string
revID = msgHLV.GetCurrentVersionString()
}
Expand All @@ -394,7 +401,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
if deltaSrcRevID != "" && bsc.useHLV() {
err = bsc.sendRevAsDelta(ctx, sender, docID, rev, deltaSrcRevID, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx)
} else {
err = bsc.sendRevision(ctx, sender, docID, rev, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx)
err = bsc.sendRevision(ctx, sender, docID, rev, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx, legacyRev)
}
if err != nil {
return err
Expand Down Expand Up @@ -652,11 +659,11 @@ func (bsc *BlipSyncContext) sendNoRev(sender *blip.Sender, docID, revID string,
}

// Pushes a revision body to the client
func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sender, docID, revID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int) error {
func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sender, docID, revID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int, legacyRev bool) error {

var originalErr error
var docRev DocumentRevision
if bsc.activeCBMobileSubprotocol <= CBMobileReplicationV3 {
if !bsc.useHLV() {
docRev, originalErr = handleChangesResponseCollection.GetRev(ctx, docID, revID, true, nil)
} else {
// extract CV string rev representation
Expand Down Expand Up @@ -743,13 +750,19 @@ func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sende
bsc.replicationStats.SendReplacementRevCount.Add(1)
}
var history []string
if bsc.activeCBMobileSubprotocol <= CBMobileReplicationV3 {
if !bsc.useHLV() {
history = toHistory(docRev.History, knownRevs, maxHistory)
} else {
if docRev.hlvHistory != "" {
history = append(history, docRev.hlvHistory)
}
}
if legacyRev {
// append current revID and rest of rev tree after hlv history
revTreeHistory := toHistory(docRev.History, knownRevs, maxHistory)
history = append(history, docRev.RevID)
history = append(history, revTreeHistory...)
}

properties := blipRevMessageProperties(history, docRev.Deleted, seq, replacedRevID)
if base.LogDebugEnabled(ctx, base.KeySync) {
Expand Down
Loading
Loading