Skip to content

Commit

Permalink
CBG-3797 Attachment handling for HLV push replication (#6702)
Browse files Browse the repository at this point in the history
HLV clients don't consider revpos, and evaluate whether they need to request an attachment based on the existing set of attachments on the document.

SGW still needs to persist revpos into _attachments to support revtree clients.  For new attachments added by HLV client, revpos is set to the generation of SGW's computed revTreeID for the incoming revision.

Co-authored-by: Gregory Newman-Smith <gregory.newmansmith@couchbase.com>
  • Loading branch information
adamcfraser and gregns1 committed Aug 11, 2024
1 parent 731f624 commit 3a9d103
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 92 deletions.
64 changes: 36 additions & 28 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
}
var status ProposedRevStatus
var currentRev string
if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 {
if bh.useHLV() {
status, currentRev = bh.collection.CheckProposedVersion(bh.loggingCtx, docID, rev, parentRevID)
} else {
status, currentRev = bh.collection.CheckProposedRev(bh.loggingCtx, docID, rev, parentRevID)
Expand Down Expand Up @@ -977,7 +977,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
}
}()

if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 && bh.conflictResolver != nil {
if bh.useHLV() && bh.conflictResolver != nil {
return base.HTTPErrorf(http.StatusNotImplemented, "conflict resolver handling (ISGR) not yet implemented for v4 protocol")
}

Expand Down Expand Up @@ -1049,17 +1049,18 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
}

var history []string
historyStr := rq.Properties[RevMessageHistory]
var incomingHLV HybridLogicalVector
// Build history/HLV
if bh.activeCBMobileSubprotocol < CBMobileReplicationV4 {
if !bh.useHLV() {
newDoc.RevID = rev
history = []string{rev}
if historyStr := rq.Properties[RevMessageHistory]; historyStr != "" {
if historyStr != "" {
history = append(history, strings.Split(historyStr, ",")...)
}
} else {
versionVectorStr := rev
if historyStr := rq.Properties[RevMessageHistory]; historyStr != "" {
if historyStr != "" {
versionVectorStr += ";" + historyStr
}
incomingHLV, err = extractHLVFromBlipMessage(versionVectorStr)
Expand Down Expand Up @@ -1089,7 +1090,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// due to no-conflict write restriction, but we still need to enforce security here to prevent leaking data about previous
// revisions to malicious actors (in the scenario where that user has write but not read access).
var deltaSrcRev DocumentRevision
if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 {
if bh.useHLV() {
cv := Version{}
cv.SourceID, cv.Value = incomingHLV.GetCurrentVersion()
deltaSrcRev, err = bh.collection.GetCV(bh.loggingCtx, docID, &cv)
Expand Down Expand Up @@ -1161,31 +1162,32 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err

var rawBucketDoc *sgbucket.BucketDocument

// Pull out attachments
// Attachment processing
if injectedAttachmentsForDelta || bytes.Contains(bodyBytes, []byte(BodyAttachments)) {
// temporarily error here if V4
if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 {
return base.HTTPErrorf(http.StatusNotImplemented, "attachment handling not yet supported for v4 protocol")
}

body := newDoc.Body(bh.loggingCtx)

var currentBucketDoc *Document

// Look at attachments with revpos > the last common ancestor's
minRevpos := 1
if len(history) > 0 {
currentDoc, rawDoc, err := bh.collection.GetDocumentWithRaw(bh.loggingCtx, docID, DocUnmarshalSync)
// If we're able to obtain current doc data then we should use the common ancestor generation++ for min revpos
// as we will already have any attachments on the common ancestor so don't need to ask for them.
// Otherwise we'll have to go as far back as we can in the doc history and choose the last entry in there.
if err == nil {
commonAncestor := currentDoc.History.findAncestorFromSet(currentDoc.CurrentRev, history)
minRevpos, _ = ParseRevID(bh.loggingCtx, commonAncestor)
minRevpos++
rawBucketDoc = rawDoc
currentBucketDoc = currentDoc
} else {
minRevpos, _ = ParseRevID(bh.loggingCtx, history[len(history)-1])
minRevpos := 0
if historyStr != "" {
// fetch current bucket doc. Treats error as not found
currentBucketDoc, rawBucketDoc, _ = bh.collection.GetDocumentWithRaw(bh.loggingCtx, docID, DocUnmarshalSync)

// For revtree clients, can use revPos as an optimization. HLV always compares incoming
// attachments with current attachments on the document
if !bh.useHLV() {
// Look at attachments with revpos > the last common ancestor's
// If we're able to obtain current doc data then we should use the common ancestor generation++ for min revpos
// as we will already have any attachments on the common ancestor so don't need to ask for them.
// Otherwise we'll have to go as far back as we can in the doc history and choose the last entry in there.
if currentBucketDoc != nil {
commonAncestor := currentBucketDoc.History.findAncestorFromSet(currentBucketDoc.CurrentRev, history)
minRevpos, _ = ParseRevID(bh.loggingCtx, commonAncestor)
minRevpos++
} else {
minRevpos, _ = ParseRevID(bh.loggingCtx, history[len(history)-1])
}
}
}

Expand All @@ -1203,7 +1205,9 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
if !ok {
// If we don't have this attachment already, ensure incoming revpos is greater than minRevPos, otherwise
// update to ensure it's fetched and uploaded
bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, rev)
if minRevpos > 0 {
bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, rev)
}
continue
}

Expand Down Expand Up @@ -1274,7 +1278,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// If the doc is a tombstone we want to allow conflicts when running SGR2
// bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2
forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2)
if bh.activeCBMobileSubprotocol >= CBMobileReplicationV4 {
if bh.useHLV() {
_, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc)
} else if bh.conflictResolver != nil {
_, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV)
Expand Down Expand Up @@ -1616,3 +1620,7 @@ func allowedAttachmentKey(docID, digest string, activeCBMobileSubprotocol CBMobi
func (bh *blipHandler) logEndpointEntry(profile, endpoint string) {
base.InfofCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s %s", bh.serialNumber, profile, endpoint)
}

func (bh *blipHandler) useHLV() bool {
return bh.activeCBMobileSubprotocol >= CBMobileReplicationV4
}
5 changes: 2 additions & 3 deletions db/revision_cache_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,8 @@ func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBa
// nolint:staticcheck
func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv Version) (bodyBytes []byte, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, hlv *HybridLogicalVector, err error) {
if bodyBytes, attachments, err = backingStore.getCurrentVersion(ctx, doc); err != nil {
// TODO: pending CBG-3213 support of channel removal for CV
// we need implementation of IsChannelRemoval for CV here.
base.ErrorfCtx(ctx, "pending CBG-3213 support of channel removal for CV: %v", err)
// TODO: CBG-3814 - pending support of channel removal for CV
base.ErrorfCtx(ctx, "pending CBG-3814 support of channel removal for CV: %v", err)
}

if err = doc.HasCurrentVersion(ctx, cv); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ func (c *DatabaseCollection) RequireCurrentVersion(t *testing.T, key string, sou
}

// GetDocumentCurrentVersion fetches the document by key and returns the current version
func (c *DatabaseCollection) GetDocumentCurrentVersion(t *testing.T, key string) (source string, version string) {
func (c *DatabaseCollection) GetDocumentCurrentVersion(t testing.TB, key string) (source string, version string) {
ctx := base.TestCtx(t)
doc, err := c.GetDocument(ctx, key, DocUnmarshalSync)
require.NoError(t, err)
Expand Down
19 changes: 9 additions & 10 deletions rest/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2258,7 +2258,6 @@ func TestUpdateExistingAttachment(t *testing.T) {
}

btcRunner := NewBlipTesterClientRunner(t)
btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol (CBG-3797)
const (
doc1ID = "doc1"
doc2ID = "doc2"
Expand All @@ -2272,8 +2271,8 @@ func TestUpdateExistingAttachment(t *testing.T) {
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts)
defer btc.Close()

doc1Version := rt.PutDoc(doc1ID, `{}`)
doc2Version := rt.PutDoc(doc2ID, `{}`)
doc1Version := btc.PutDoc(doc1ID, `{}`)
doc2Version := btc.PutDoc(doc2ID, `{}`)

rt.WaitForPendingChanges()
btcRunner.StartOneshotPull(btc.id)
Expand Down Expand Up @@ -2320,7 +2319,6 @@ func TestPushUnknownAttachmentAsStub(t *testing.T) {
}
const doc1ID = "doc1"
btcRunner := NewBlipTesterClientRunner(t)
btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol (CBG-3797)

btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
rt := NewRestTester(t, rtConfig)
Expand All @@ -2330,7 +2328,7 @@ func TestPushUnknownAttachmentAsStub(t *testing.T) {
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &opts)
defer btc.Close()
// Add doc1 and doc2
doc1Version := btc.rt.PutDoc(doc1ID, `{}`)
doc1Version := btc.PutDoc(doc1ID, `{}`)

btc.rt.WaitForPendingChanges()
btcRunner.StartOneshotPull(btc.id)
Expand Down Expand Up @@ -2368,7 +2366,6 @@ func TestMinRevPosWorkToAvoidUnnecessaryProveAttachment(t *testing.T) {
}

btcRunner := NewBlipTesterClientRunner(t)
btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol (CBG-3797)
const docID = "doc"

btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
Expand All @@ -2380,6 +2377,7 @@ func TestMinRevPosWorkToAvoidUnnecessaryProveAttachment(t *testing.T) {
defer btc.Close()
// Push an initial rev with attachment data
initialVersion := btc.rt.PutDoc(docID, `{"_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}}`)
btc.rt.WaitForPendingChanges()

// Replicate data to client and ensure doc arrives
btc.rt.WaitForPendingChanges()
Expand All @@ -2389,7 +2387,7 @@ func TestMinRevPosWorkToAvoidUnnecessaryProveAttachment(t *testing.T) {
// Push a revision with a bunch of history simulating doc updated on mobile device
// Note this references revpos 1 and therefore SGW has it - Shouldn't need proveAttachment
proveAttachmentBefore := btc.pushReplication.replicationStats.ProveAttachment.Value()
revid, err := btcRunner.PushRevWithHistory(btc.id, docID, initialVersion.RevTreeID, []byte(`{"_attachments": {"hello.txt": {"revpos":1,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`), 25, 5)
revid, err := btcRunner.PushRevWithHistory(btc.id, docID, initialVersion.GetRev(btc.UseHLV()), []byte(`{"_attachments": {"hello.txt": {"revpos":1,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`), 25, 5)
assert.NoError(t, err)
proveAttachmentAfter := btc.pushReplication.replicationStats.ProveAttachment.Value()
assert.Equal(t, proveAttachmentBefore, proveAttachmentAfter)
Expand All @@ -2408,7 +2406,6 @@ func TestAttachmentWithErroneousRevPos(t *testing.T) {
}

btcRunner := NewBlipTesterClientRunner(t)
btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol (CBG-3797)

btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
rt := NewRestTester(t, rtConfig)
Expand All @@ -2419,7 +2416,9 @@ func TestAttachmentWithErroneousRevPos(t *testing.T) {
defer btc.Close()
// Create rev 1 with the hello.txt attachment
const docID = "doc"

version := btc.rt.PutDoc(docID, `{"val": "val", "_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}}`)
btc.rt.WaitForPendingChanges()

// Pull rev and attachment down to client
btc.rt.WaitForPendingChanges()
Expand All @@ -2433,7 +2432,7 @@ func TestAttachmentWithErroneousRevPos(t *testing.T) {
btcRunner.AttachmentsLock(btc.id).Unlock()

// Put doc with an erroneous revpos 1 but with a different digest, referring to the above attachment
_, err := btcRunner.PushRevWithHistory(btc.id, docID, version.RevTreeID, []byte(`{"_attachments": {"hello.txt": {"revpos":1,"stub":true,"length": 19,"digest":"sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc="}}}`), 1, 0)
_, err = btcRunner.PushRevWithHistory(btc.id, docID, version.GetRev(btc.UseHLV()), []byte(`{"_attachments": {"hello.txt": {"revpos":1,"stub":true,"length": 19,"digest":"sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc="}}}`), 1, 0)
require.NoError(t, err)

// Ensure message and attachment is pushed up
Expand Down Expand Up @@ -2587,7 +2586,6 @@ func TestCBLRevposHandling(t *testing.T) {
}

btcRunner := NewBlipTesterClientRunner(t)
btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol (CBG-3797)
const (
doc1ID = "doc1"
doc2ID = "doc2"
Expand All @@ -2603,6 +2601,7 @@ func TestCBLRevposHandling(t *testing.T) {

doc1Version := btc.rt.PutDoc(doc1ID, `{}`)
doc2Version := btc.rt.PutDoc(doc2ID, `{}`)
btc.rt.WaitForPendingChanges()

btc.rt.WaitForPendingChanges()
btcRunner.StartOneshotPull(btc.id)
Expand Down
Loading

0 comments on commit 3a9d103

Please sign in to comment.