From 28dd770f7279afc3c142e2585d4880320faf3986 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 9 Dec 2024 14:06:13 +0000 Subject: [PATCH 1/6] CBG-4389: extract cv from knwon revs and store backup rev by revID --- db/attachment_test.go | 18 +++++---- db/blip_handler.go | 6 ++- db/blip_sync_context.go | 17 ++++++-- db/crud.go | 21 +++++++++- db/crud_test.go | 1 + db/database_test.go | 56 +++++++++++++++++--------- db/revision.go | 20 ++------- db/revision_cache_test.go | 1 + db/revision_test.go | 13 +++--- rest/attachment_test.go | 1 + rest/blip_api_crud_test.go | 5 ++- rest/blip_api_delta_sync_test.go | 50 ++++++++++++++++------- rest/changestest/changes_api_test.go | 14 +++++-- rest/importtest/import_test.go | 8 ++-- rest/replicatortest/replicator_test.go | 4 +- 15 files changed, 157 insertions(+), 78 deletions(-) diff --git a/db/attachment_test.go b/db/attachment_test.go index 6b46d46fa7..78d7759fe9 100644 --- a/db/attachment_test.go +++ b/db/attachment_test.go @@ -49,11 +49,12 @@ func TestBackupOldRevisionWithAttachments(t *testing.T) { require.NoError(t, base.JSONUnmarshal([]byte(rev1Data), &rev1Body)) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) - rev1ID, _, err := collection.Put(ctx, docID, rev1Body) + rev1ID, docRev1, err := collection.Put(ctx, docID, rev1Body) require.NoError(t, err) assert.Equal(t, "1-12ff9ce1dd501524378fe092ce9aee8f", rev1ID) - rev1OldBody, err := collection.getOldRevisionJSON(ctx, docID, rev1ID) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + rev1OldBody, err := collection.getOldRevisionJSON(ctx, docID, base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString()))) if deltasEnabled && xattrsEnabled { require.NoError(t, err) assert.Contains(t, string(rev1OldBody), "hello.txt") @@ -67,16 +68,18 @@ func TestBackupOldRevisionWithAttachments(t *testing.T) { var rev2Body Body rev2Data := `{"test": true, "updated": true, "_attachments": {"hello.txt": {"stub": true, "revpos": 1}}}` require.NoError(t, base.JSONUnmarshal([]byte(rev2Data), &rev2Body)) - _, rev2ID, err := collection.PutExistingRevWithBody(ctx, docID, rev2Body, []string{"2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV) + docRev2, _, err := collection.PutExistingRevWithBody(ctx, docID, rev2Body, []string{"2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV) require.NoError(t, err) // now in any case - we'll have rev 1 backed up - rev1OldBody, err = collection.getOldRevisionJSON(ctx, docID, rev1ID) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + rev1OldBody, err = collection.getOldRevisionJSON(ctx, docID, base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString()))) require.NoError(t, err) assert.Contains(t, string(rev1OldBody), "hello.txt") // and rev 2 should be present only for the xattrs and deltas case - rev2OldBody, err := collection.getOldRevisionJSON(ctx, docID, rev2ID) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + rev2OldBody, err := collection.getOldRevisionJSON(ctx, docID, base.Crc32cHashString([]byte(docRev2.HLV.GetCurrentVersionString()))) if deltasEnabled && xattrsEnabled { require.NoError(t, err) assert.Contains(t, string(rev2OldBody), "hello.txt") @@ -100,7 +103,7 @@ func TestAttachments(t *testing.T) { assert.NoError(t, base.JSONUnmarshal([]byte(rev1input), &body)) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) - revid, _, err := collection.Put(ctx, "doc1", body) + revid, docRev1, err := collection.Put(ctx, "doc1", body) rev1id := revid assert.NoError(t, err, "Couldn't create document") @@ -188,7 +191,8 @@ func TestAttachments(t *testing.T) { assert.Equal(t, float64(2), bye["revpos"]) log.Printf("Expire body of rev 1, then add a child...") // test fix of #498 - err = collection.dataStore.Delete(oldRevisionKey("doc1", rev1id)) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + err = collection.dataStore.Delete(oldRevisionKey("doc1", base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString())))) assert.NoError(t, err, "Couldn't compact old revision") rev2Bstr := `{"_attachments": {"bye.txt": {"stub":true,"revpos":1,"digest":"sha1-gwwPApfQR9bzBKpqoEYwFmKp98A="}}, "_rev": "2-f000"}` var body2B Body diff --git a/db/blip_handler.go b/db/blip_handler.go index f4799448b1..e56cad327d 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -772,7 +772,7 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error { } output.Write([]byte("]")) response := rq.Response() - if bh.sgCanUseDeltas { + if bh.sgCanUseDeltas && bh.useHLV() { base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on handleChanges response") response.Properties[ChangesResponseDeltas] = trueProperty bh.replicationStats.HandleChangesDeltaRequestedCount.Add(int64(nRequested)) @@ -865,7 +865,7 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error { } output.Write([]byte("]")) response := rq.Response() - if bh.sgCanUseDeltas { + if bh.sgCanUseDeltas && bh.useHLV() { base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on proposeChanges response") response.Properties[ChangesResponseDeltas] = trueProperty } @@ -1082,6 +1082,8 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err if deltaSrcRevID, isDelta := revMessage.DeltaSrc(); isDelta && !revMessage.Deleted() { if !bh.sgCanUseDeltas { return base.HTTPErrorf(http.StatusBadRequest, "Deltas are disabled for this peer") + } else if !bh.useHLV() { + return base.HTTPErrorf(http.StatusBadRequest, "backwards compatibility for revTree deltas not yet implemented") } // TODO: Doing a GetRevCopy here duplicates some rev cache retrieval effort, since deltaRevSrc is always diff --git a/db/blip_sync_context.go b/db/blip_sync_context.go index 6385be6195..9fd8eab56a 100644 --- a/db/blip_sync_context.go +++ b/db/blip_sync_context.go @@ -354,10 +354,20 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b knownRevsByDoc[docID] = knownRevs } - // The first element of the knownRevsArray returned from CBL is the parent revision to use as deltaSrc + // The first element of the knownRevsArray returned from CBL is the parent revision to use as deltaSrc for + // revtree clients, for HLV clients the first element is the HLV if bsc.useDeltas && len(knownRevsArray) > 0 { if revID, ok := knownRevsArray[0].(string); ok { - deltaSrcRevID = revID + if bsc.useHLV() { + msgHLV, err := extractHLVFromBlipMessage(revID) + if err != nil { + base.ErrorfCtx(ctx, "Invalid known rev format for hlv on doc: %s", docID) + return nil + } + deltaSrcRevID = msgHLV.GetCurrentVersionString() + } else { + deltaSrcRevID = revID + } } } @@ -372,7 +382,8 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b var err error - if deltaSrcRevID != "" { + // fallback to sending full revisions for non hlv aware peers, CBG-3748 + if deltaSrcRevID != "" && bsc.useHLV() { err = bsc.sendRevAsDelta(ctx, sender, docID, rev, deltaSrcRevID, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx) } else { err = bsc.sendRevision(ctx, sender, docID, rev, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx) diff --git a/db/crud.go b/db/crud.go index 444816e630..2351d64e53 100644 --- a/db/crud.go +++ b/db/crud.go @@ -2448,7 +2448,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do doc.MetadataOnlyUpdate.HexCAS = base.CasToString(casOut) } // update the doc's HLV defined post macro expansion - doc = postWriteUpdateHLV(doc, casOut) + doc = db.postWriteUpdateHLV(ctx, doc, casOut) } } @@ -2601,7 +2601,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do return doc, newRevID, nil } -func postWriteUpdateHLV(doc *Document, casOut uint64) *Document { +func (db *DatabaseCollectionWithUser) postWriteUpdateHLV(ctx context.Context, doc *Document, casOut uint64) *Document { if doc.HLV == nil { return doc } @@ -2611,6 +2611,23 @@ func postWriteUpdateHLV(doc *Document, casOut uint64) *Document { if doc.HLV.CurrentVersionCAS == expandMacroCASValueUint64 { doc.HLV.CurrentVersionCAS = casOut } + // backup new revision to the bucket now we have a doc assigned a CV (post macro expansion) for delta generation purposes + if db.UseXattrs() { + var newBodyWithAtts = doc._rawBody + if len(doc.Attachments) > 0 { + var err error + newBodyWithAtts, err = base.InjectJSONProperties(doc._rawBody, base.KVPair{ + Key: BodyAttachments, + Val: doc.Attachments, + }) + if err != nil { + base.WarnfCtx(ctx, "Unable to marshal new revision body during backupRevisionJSON: doc=%q rev=%q cv=%q err=%v ", base.UD(doc.ID), doc.CurrentRev, doc.HLV.GetCurrentVersionString(), err) + return doc + } + } + revHash := base.Crc32cHashString([]byte(doc.HLV.GetCurrentVersionString())) + _ = db.setOldRevisionJSON(ctx, doc.ID, revHash, newBodyWithAtts, db.deltaSyncRevMaxAgeSeconds()) + } return doc } diff --git a/db/crud_test.go b/db/crud_test.go index 67450eae9a..3938cf7f06 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -1196,6 +1196,7 @@ func BenchmarkHandleRevDelta(b *testing.B) { } func TestGetAvailableRevAttachments(t *testing.T) { + t.Skip("Revs are backed up by hash of CV now, test needs to fetch backup rev by revID, CBG-3748 (backwards compatibility for revID)") db, ctx := setupTestDB(t) defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) diff --git a/db/database_test.go b/db/database_test.go index c1b9ed74aa..0fc82f77ec 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -496,7 +496,7 @@ func TestGetRemovedAsUser(t *testing.T) { "key1": 1234, "channels": []string{"ABC"}, } - rev1id, _, err := collection.Put(ctx, "doc1", rev1body) + rev1id, docRev1, err := collection.Put(ctx, "doc1", rev1body) assert.NoError(t, err, "Put") rev2body := Body{ @@ -504,7 +504,7 @@ func TestGetRemovedAsUser(t *testing.T) { "channels": []string{"NBC"}, BodyRev: rev1id, } - rev2id, _, err := collection.Put(ctx, "doc1", rev2body) + rev2id, docRev2, err := collection.Put(ctx, "doc1", rev2body) assert.NoError(t, err, "Put Rev 2") // Add another revision, so that rev 2 is obsolete @@ -542,7 +542,9 @@ func TestGetRemovedAsUser(t *testing.T) { ShardCount: DefaultRevisionCacheShardCount, } collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat) - err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + cv := docRev2.HLV.GetCurrentVersionString() + err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(cv))) assert.NoError(t, err, "Purge old revision JSON") // Try again with a user who doesn't have access to this revision @@ -568,7 +570,9 @@ func TestGetRemovedAsUser(t *testing.T) { assert.Equal(t, expectedResult, body) // Ensure revision is unavailable for a non-leaf revision that isn't available via the rev cache, and wasn't a channel removal - err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev1id) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + cv = docRev1.HLV.GetCurrentVersionString() + err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(cv))) assert.NoError(t, err, "Purge old revision JSON") _, err = collection.Get1xRevBody(ctx, "doc1", rev1id, true, nil) @@ -631,7 +635,7 @@ func TestGetRemovalMultiChannel(t *testing.T) { "channels": []string{"ABC"}, BodyRev: rev1ID, } - rev2ID, _, err := collection.Put(ctx, "doc1", rev2Body) + rev2ID, docRev2, err := collection.Put(ctx, "doc1", rev2Body) require.NoError(t, err, "Error creating doc") // Create the third revision of doc1 on channel ABC. @@ -683,7 +687,8 @@ func TestGetRemovalMultiChannel(t *testing.T) { // Flush the revision cache and purge the old revision backup. db.FlushRevisionCacheForTest() - err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2ID) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(docRev2.HLV.GetCurrentVersionString()))) require.NoError(t, err, "Error purging old revision JSON") // Try with a user who has access to this revision. @@ -714,10 +719,11 @@ func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) { name string versionVector bool }{ - { - name: "revTree test", - versionVector: false, - }, + // Revs are backed up by hash of CV now, now way to fetch backup revs by revID till CBG-3748 (backwards compatibility for revID) + //{ + // name: "revTree test", + // versionVector: false, + //}, { name: "versionVector test", versionVector: true, @@ -758,8 +764,14 @@ func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) { // Flush the revision cache and purge the old revision backup. db.FlushRevisionCacheForTest() - err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2ID) - require.NoError(t, err, "Error purging old revision JSON") + if testCase.versionVector { + cvStr := docRev2.HLV.GetCurrentVersionString() + err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(cvStr))) + require.NoError(t, err, "Error purging old revision JSON") + } else { + err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2ID) + require.NoError(t, err, "Error purging old revision JSON") + } // Request delta between rev2ID and rev3ID (toRevision "rev2ID" is channel removal) // as a user who doesn't have access to the removed revision via any other channel. @@ -923,7 +935,7 @@ func TestGetRemoved(t *testing.T) { "key1": 1234, "channels": []string{"ABC"}, } - rev1id, _, err := collection.Put(ctx, "doc1", rev1body) + rev1id, docRev1, err := collection.Put(ctx, "doc1", rev1body) assert.NoError(t, err, "Put") rev2body := Body{ @@ -931,7 +943,7 @@ func TestGetRemoved(t *testing.T) { "channels": []string{"NBC"}, BodyRev: rev1id, } - rev2id, _, err := collection.Put(ctx, "doc1", rev2body) + rev2id, docRev2, err := collection.Put(ctx, "doc1", rev2body) assert.NoError(t, err, "Put Rev 2") // Add another revision, so that rev 2 is obsolete @@ -969,7 +981,8 @@ func TestGetRemoved(t *testing.T) { ShardCount: DefaultRevisionCacheShardCount, } collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat) - err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(docRev2.HLV.GetCurrentVersionString()))) assert.NoError(t, err, "Purge old revision JSON") // Get the removal revision with its history; equivalent to GET with ?revs=true @@ -978,7 +991,8 @@ func TestGetRemoved(t *testing.T) { require.Nil(t, body) // Ensure revision is unavailable for a non-leaf revision that isn't available via the rev cache, and wasn't a channel removal - err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev1id) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString()))) assert.NoError(t, err, "Purge old revision JSON") _, err = collection.Get1xRevBody(ctx, "doc1", rev1id, true, nil) @@ -997,7 +1011,7 @@ func TestGetRemovedAndDeleted(t *testing.T) { "key1": 1234, "channels": []string{"ABC"}, } - rev1id, _, err := collection.Put(ctx, "doc1", rev1body) + rev1id, docRev1, err := collection.Put(ctx, "doc1", rev1body) assert.NoError(t, err, "Put") rev2body := Body{ @@ -1005,7 +1019,7 @@ func TestGetRemovedAndDeleted(t *testing.T) { BodyDeleted: true, BodyRev: rev1id, } - rev2id, _, err := collection.Put(ctx, "doc1", rev2body) + rev2id, docRev2, err := collection.Put(ctx, "doc1", rev2body) assert.NoError(t, err, "Put Rev 2") // Add another revision, so that rev 2 is obsolete @@ -1043,7 +1057,8 @@ func TestGetRemovedAndDeleted(t *testing.T) { ShardCount: DefaultRevisionCacheShardCount, } collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStats) - err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(docRev2.HLV.GetCurrentVersionString()))) assert.NoError(t, err, "Purge old revision JSON") // Get the deleted doc with its history; equivalent to GET with ?revs=true @@ -1052,7 +1067,8 @@ func TestGetRemovedAndDeleted(t *testing.T) { require.Nil(t, body) // Ensure revision is unavailable for a non-leaf revision that isn't available via the rev cache, and wasn't a channel removal - err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev1id) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString()))) assert.NoError(t, err, "Purge old revision JSON") _, err = collection.Get1xRevBody(ctx, "doc1", rev1id, true, nil) diff --git a/db/revision.go b/db/revision.go index 86434b1a2c..9f1d0891bb 100644 --- a/db/revision.go +++ b/db/revision.go @@ -259,7 +259,8 @@ func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, do // Without delta sync, store the old rev for in-flight replication purposes if !db.deltaSyncEnabled() || db.deltaSyncRevMaxAgeSeconds() == 0 { if len(oldBody) > 0 { - _ = db.setOldRevisionJSON(ctx, docId, oldRev, oldBody, db.oldRevExpirySeconds()) + oldRevHash := base.Crc32cHashString([]byte(oldRev)) + _ = db.setOldRevisionJSON(ctx, docId, oldRevHash, oldBody, db.oldRevExpirySeconds()) } return } @@ -268,20 +269,6 @@ func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, do // Special handling for Xattrs so that SG still has revisions that were updated by an SDK write if db.UseXattrs() { - var newBodyWithAtts = newBody - if len(newAtts) > 0 { - var err error - newBodyWithAtts, err = base.InjectJSONProperties(newBody, base.KVPair{ - Key: BodyAttachments, - Val: newAtts, - }) - if err != nil { - base.WarnfCtx(ctx, "Unable to marshal new revision body during backupRevisionJSON: doc=%q rev=%q err=%v ", base.UD(docId), newRev, err) - return - } - } - _ = db.setOldRevisionJSON(ctx, docId, newRev, newBodyWithAtts, db.deltaSyncRevMaxAgeSeconds()) - // Refresh the expiry on the previous revision backup _ = db.refreshPreviousRevisionBackup(ctx, docId, oldRev, oldBody, db.deltaSyncRevMaxAgeSeconds()) return @@ -289,7 +276,8 @@ func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, do // Non-xattr only need to store the previous revision, as all writes come through SG if len(oldBody) > 0 { - _ = db.setOldRevisionJSON(ctx, docId, oldRev, oldBody, db.deltaSyncRevMaxAgeSeconds()) + oldRevHash := base.Crc32cHashString([]byte(oldRev)) + _ = db.setOldRevisionJSON(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds()) } return } diff --git a/db/revision_cache_test.go b/db/revision_cache_test.go index f53ac74c4c..18305247d5 100644 --- a/db/revision_cache_test.go +++ b/db/revision_cache_test.go @@ -548,6 +548,7 @@ func TestRevisionCacheInternalProperties(t *testing.T) { } func TestBypassRevisionCache(t *testing.T) { + t.Skip("Revs are backed up by hash of CV now, test needs to fetch backup rev by revID, CBG-3748 (backwards compatibility for revID)") base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll) db, ctx := setupTestDB(t) diff --git a/db/revision_test.go b/db/revision_test.go index 431a5d0981..6882482352 100644 --- a/db/revision_test.go +++ b/db/revision_test.go @@ -112,7 +112,7 @@ func TestBackupOldRevision(t *testing.T) { docID := t.Name() - rev1ID, _, err := collection.Put(ctx, docID, Body{"test": true}) + rev1ID, docRev1, err := collection.Put(ctx, docID, Body{"test": true}) require.NoError(t, err) // make sure we didn't accidentally store an empty old revision @@ -121,7 +121,8 @@ func TestBackupOldRevision(t *testing.T) { assert.Equal(t, "404 missing", err.Error()) // check for current rev backup in xattr+delta case (to support deltas by sdk imports) - _, err = collection.getOldRevisionJSON(base.TestCtx(t), docID, rev1ID) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + _, err = collection.getOldRevisionJSON(base.TestCtx(t), docID, base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString()))) if deltasEnabled && xattrsEnabled { require.NoError(t, err) } else { @@ -131,15 +132,17 @@ func TestBackupOldRevision(t *testing.T) { // create rev 2 and check backups for both revs rev2ID := "2-abc" - _, _, err = collection.PutExistingRevWithBody(ctx, docID, Body{"test": true, "updated": true}, []string{rev2ID, rev1ID}, true, ExistingVersionWithUpdateToHLV) + docRev2, _, err := collection.PutExistingRevWithBody(ctx, docID, Body{"test": true, "updated": true}, []string{rev2ID, rev1ID}, true, ExistingVersionWithUpdateToHLV) require.NoError(t, err) // now in all cases we'll have rev 1 backed up (for at least 5 minutes) - _, err = collection.getOldRevisionJSON(base.TestCtx(t), docID, rev1ID) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + _, err = collection.getOldRevisionJSON(base.TestCtx(t), docID, base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString()))) require.NoError(t, err) // check for current rev backup in xattr+delta case (to support deltas by sdk imports) - _, err = collection.getOldRevisionJSON(base.TestCtx(t), docID, rev2ID) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + _, err = collection.getOldRevisionJSON(base.TestCtx(t), docID, base.Crc32cHashString([]byte(docRev2.HLV.GetCurrentVersionString()))) if deltasEnabled && xattrsEnabled { require.NoError(t, err) } else { diff --git a/rest/attachment_test.go b/rest/attachment_test.go index b765793f03..138317b268 100644 --- a/rest/attachment_test.go +++ b/rest/attachment_test.go @@ -684,6 +684,7 @@ func TestBulkGetBadAttachmentReproIssue2528(t *testing.T) { } func TestConflictWithInvalidAttachment(t *testing.T) { + t.Skip("Revs are backed up by hash of CV now, test needs to fetch backup rev by revID, CBG-3748 (backwards compatibility for revID)") rt := NewRestTester(t, nil) defer rt.Close() diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 904b76b2e2..363d7f6406 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -1836,6 +1836,7 @@ func TestPutRevV4(t *testing.T) { // Actual: // - Same as Expected (this test is unable to repro SG #3281, but is being left in as a regression test) func TestGetRemovedDoc(t *testing.T) { + t.Skip("Revs are backed up by hash of CV now, test needs to fetch backup rev by revID, CBG-3748 (backwards compatibility for revID)") base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg) rt := NewRestTester(t, &RestTesterConfig{SyncFn: channels.DocChannelsSyncFunction}) @@ -2036,7 +2037,9 @@ func TestSendReplacementRevision(t *testing.T) { updatedVersion <- rt.UpdateDoc(docID, version1, fmt.Sprintf(`{"foo":"buzz","channels":["%s"]}`, test.replacementRevChannel)) // also purge revision backup and flush cache to ensure request for rev 1-... cannot be fulfilled - err := collection.PurgeOldRevisionJSON(ctx, docID, version1.RevTreeID) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + cvHash := base.Crc32cHashString([]byte(version1.CV.String())) + err := collection.PurgeOldRevisionJSON(ctx, docID, cvHash) require.NoError(t, err) rt.GetDatabase().FlushRevisionCacheForTest() } diff --git a/rest/blip_api_delta_sync_test.go b/rest/blip_api_delta_sync_test.go index 721641c5ce..f2bce3309d 100644 --- a/rest/blip_api_delta_sync_test.go +++ b/rest/blip_api_delta_sync_test.go @@ -223,8 +223,9 @@ func TestBlipDeltaSyncNewAttachmentPull(t *testing.T) { // Check EE is delta, and CE is full-body replication // msg, ok = client.pullReplication.WaitForMessage(5) msg = btcRunner.WaitForBlipRevMessage(client.id, doc1ID, version2) - - if base.IsEnterpriseEdition() { + // Delta sync only works for Version vectors, CBG-3748 (backwards compatibility for revID) + sgCanUseDeltas := base.IsEnterpriseEdition() && client.UseHLV() + if sgCanUseDeltas { // Check the request was sent with the correct deltaSrc property client.AssertDeltaSrcProperty(t, msg, version) // Check the request body was the actual delta @@ -238,7 +239,10 @@ func TestBlipDeltaSyncNewAttachmentPull(t *testing.T) { msgBody, err := msg.Body() assert.NoError(t, err) assert.NotEqual(t, `{"_attachments":[{"hello.txt":{"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=","length":11,"revpos":2,"stub":true}}]}`, string(msgBody)) - assert.Contains(t, string(msgBody), `"_attachments":{"hello.txt":{"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=","length":11,"revpos":2,"stub":true}}`) + assert.Contains(t, string(msgBody), `"stub":true`) + assert.Contains(t, string(msgBody), `"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="`) + assert.Contains(t, string(msgBody), `"revpos":2`) + assert.Contains(t, string(msgBody), `"length":11`) assert.Contains(t, string(msgBody), `"greetings":[{"hello":"world!"},{"hi":"alice"}]`) } @@ -306,7 +310,9 @@ func TestBlipDeltaSyncPull(t *testing.T) { msg := btcRunner.WaitForBlipRevMessage(client.id, docID, version2) // Check EE is delta, and CE is full-body replication - if base.IsEnterpriseEdition() { + // Delta sync only works for Version vectors, CBG-3748 (backwards compatibility for revID) + sgCanUseDeltas := base.IsEnterpriseEdition() && client.UseHLV() + if sgCanUseDeltas { // Check the request was sent with the correct deltaSrc property client.AssertDeltaSrcProperty(t, msg, version) // Check the request body was the actual delta @@ -351,6 +357,7 @@ func TestBlipDeltaSyncPullResend(t *testing.T) { GuestEnabled: true, } btcRunner := NewBlipTesterClientRunner(t) + btcRunner.SkipSubtest[RevtreeSubtestName] = true // delta sync not implemented for rev tree replication, CBG-3748 btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, &rtConfig) @@ -546,8 +553,9 @@ func TestBlipDeltaSyncPullTombstoned(t *testing.T) { deltasRequestedEnd = rt.GetDatabase().DbStats.DeltaSync().DeltasRequested.Value() deltasSentEnd = rt.GetDatabase().DbStats.DeltaSync().DeltasSent.Value() } - - if sgUseDeltas { + // delta sync not implemented for rev tree replication, CBG-3748 + sgCanUseDelta := base.IsEnterpriseEdition() && client.UseHLV() + if sgCanUseDelta { assert.Equal(t, deltaCacheHitsStart, deltaCacheHitsEnd) assert.Equal(t, deltaCacheMissesStart+1, deltaCacheMissesEnd) assert.Equal(t, deltasRequestedStart+1, deltasRequestedEnd) @@ -686,7 +694,9 @@ func TestBlipDeltaSyncPullTombstonedStarChan(t *testing.T) { deltasSentEnd = rt.GetDatabase().DbStats.DeltaSync().DeltasSent.Value() } - if sgUseDeltas { + // delta sync not implemented for rev tree replication, CBG-3748 + sgCanUseDelta := base.IsEnterpriseEdition() && client1.UseHLV() + if sgCanUseDelta { assert.Equal(t, deltaCacheHitsStart+1, deltaCacheHitsEnd) assert.Equal(t, deltaCacheMissesStart+1, deltaCacheMissesEnd) assert.Equal(t, deltasRequestedStart+2, deltasRequestedEnd) @@ -732,6 +742,7 @@ func TestBlipDeltaSyncPullRevCache(t *testing.T) { defer client.Close() client.ClientDeltas = true + sgCanUseDeltas := base.IsEnterpriseEdition() && client.UseHLV() btcRunner.StartPull(client.id) // create doc1 rev 1-0335a345b6ffed05707ccc4cbc1b67f4 @@ -758,11 +769,16 @@ func TestBlipDeltaSyncPullRevCache(t *testing.T) { // Check EE is delta // Check the request was sent with the correct deltaSrc property - client.AssertDeltaSrcProperty(t, msg, version1) + // delta sync not implemented for rev tree replication, CBG-3748 + if sgCanUseDeltas { + client.AssertDeltaSrcProperty(t, msg, version1) + } else { + assert.Equal(t, "", msg.Properties[db.RevMessageDeltaSrc]) + } // Check the request body was the actual delta msgBody, err := msg.Body() assert.NoError(t, err) - if sgUseDeltas { + if sgCanUseDeltas { assert.Equal(t, `{"greetings":{"2-":[{"howdy":"bob"}]}}`, string(msgBody)) } else { assert.Equal(t, `{"greetings":[{"hello":"world!"},{"hi":"alice"},{"howdy":"bob"}]}`, string(msgBody)) @@ -777,11 +793,15 @@ func TestBlipDeltaSyncPullRevCache(t *testing.T) { msg2 := btcRunner.WaitForBlipRevMessage(client2.id, docID, version2) // Check the request was sent with the correct deltaSrc property - client2.AssertDeltaSrcProperty(t, msg2, version1) + if sgCanUseDeltas { + client2.AssertDeltaSrcProperty(t, msg2, version1) + } else { + assert.Equal(t, "", msg2.Properties[db.RevMessageDeltaSrc]) + } // Check the request body was the actual delta msgBody2, err := msg2.Body() assert.NoError(t, err) - if sgUseDeltas { + if sgCanUseDeltas { assert.Equal(t, `{"greetings":{"2-":[{"howdy":"bob"}]}}`, string(msgBody2)) } else { assert.Equal(t, `{"greetings":[{"hello":"world!"},{"hi":"alice"},{"howdy":"bob"}]}`, string(msgBody2)) @@ -790,7 +810,8 @@ func TestBlipDeltaSyncPullRevCache(t *testing.T) { updatedDeltaCacheHits := rt.GetDatabase().DbStats.DeltaSync().DeltaCacheHit.Value() updatedDeltaCacheMisses := rt.GetDatabase().DbStats.DeltaSync().DeltaCacheMiss.Value() - if sgUseDeltas { + // delta sync not implemented for rev tree replication, CBG-3748 + if sgCanUseDeltas { assert.Equal(t, deltaCacheHits+1, updatedDeltaCacheHits) assert.Equal(t, deltaCacheMisses, updatedDeltaCacheMisses) } else { @@ -827,6 +848,7 @@ func TestBlipDeltaSyncPush(t *testing.T) { client := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts) defer client.Close() client.ClientDeltas = true + sgCanUseDeltas := base.IsEnterpriseEdition() && client.UseHLV() btcRunner.StartPull(client.id) @@ -842,7 +864,7 @@ func TestBlipDeltaSyncPush(t *testing.T) { // Check EE is delta, and CE is full-body replication msg := client.waitForReplicationMessage(collection, 2) - if base.IsEnterpriseEdition() && sgUseDeltas { + if base.IsEnterpriseEdition() && sgCanUseDeltas { // Check the request was sent with the correct deltaSrc property client.AssertDeltaSrcProperty(t, msg, version) // Check the request body was the actual delta @@ -887,7 +909,7 @@ func TestBlipDeltaSyncPush(t *testing.T) { _, err = btcRunner.PushRev(client.id, docID, deletedVersion, []byte(`{"undelete":true}`)) - if base.IsEnterpriseEdition() && sgUseDeltas { + if base.IsEnterpriseEdition() && sgCanUseDeltas { // Now make the client push up a delta that has the parent of the tombstone. // This is not a valid scenario, and is actively prevented on the CBL side. assert.Error(t, err) diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index 4359b66203..7969cca748 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -1782,7 +1782,7 @@ func TestChangesIncludeDocs(t *testing.T) { testDB := rt.GetDatabase() testDB.RevsLimit = 3 defer rt.Close() - collection, _ := rt.GetSingleTestDatabaseCollection() + collection, ctx := rt.GetSingleTestDatabaseCollection() rt.CreateUser("user1", []string{"alpha", "beta"}) @@ -1889,9 +1889,15 @@ func TestChangesIncludeDocs(t *testing.T) { // Flush the rev cache, and issue changes again to ensure successful handling for rev cache misses rt.GetDatabase().FlushRevisionCacheForTest() // Also nuke temporary revision backup of doc_pruned. Validates that the body for the pruned revision is generated correctly when no longer resident in the rev cache - data := collection.GetCollectionDatastore() - assert.NoError(t, data.Delete(base.RevPrefix+"doc_pruned:34:2-5afcb73bd3eb50615470e3ba54b80f00")) - + resp := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/doc_pruned?show_cv=true", "") + var r struct { + CV *string `json:"_cv"` + } + require.NoError(rt.TB(), base.JSONUnmarshal(resp.Body.Bytes(), &r)) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + cvHash := base.Crc32cHashString([]byte(*r.CV)) + err = collection.PurgeOldRevisionJSON(ctx, "doc_pruned", cvHash) + require.NoError(t, err) postFlushChanges := rt.GetChanges("/{{.keyspace}}/_changes?include_docs=true", "user1") assert.Equal(t, len(expectedResults), len(postFlushChanges.Results)) diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index cd2f5b9f79..9d8e417199 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -179,15 +179,17 @@ func TestXattrImportOldDocRevHistory(t *testing.T) { // 1. Create revision with history docID := t.Name() version := rt.PutDocDirectly(docID, rest.JsonToMap(t, `{"val":-1}`)) - revID := version.RevTreeID + cv := version.CV.String() collection, ctx := rt.GetSingleTestDatabaseCollectionWithUser() for i := 0; i < 10; i++ { version = rt.UpdateDocDirectly(docID, version, rest.JsonToMap(t, fmt.Sprintf(`{"val":%d}`, i))) // Purge old revision JSON to simulate expiry, and to verify import doesn't attempt multiple retrievals - purgeErr := collection.PurgeOldRevisionJSON(ctx, docID, revID) + // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) + cvHash := base.Crc32cHashString([]byte(cv)) + purgeErr := collection.PurgeOldRevisionJSON(ctx, docID, cvHash) require.NoError(t, purgeErr) - revID = version.RevTreeID + cv = version.CV.String() } // 2. Modify doc via SDK diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 5ad562c202..aec89a9747 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -7553,6 +7553,7 @@ func TestReplicatorIgnoreRemovalBodies(t *testing.T) { }) defer activeRT.Close() activeCtx := activeRT.Context() + collection, _ := activeRT.GetSingleTestDatabaseCollection() docID := t.Name() // Create the docs // @@ -7569,7 +7570,8 @@ func TestReplicatorIgnoreRemovalBodies(t *testing.T) { require.NoError(t, activeRT.WaitForVersion(docID, version3)) activeRT.GetDatabase().FlushRevisionCacheForTest() - err := activeRT.GetSingleDataStore().Delete(fmt.Sprintf("_sync:rev:%s:%d:%s", t.Name(), len(version2.RevTreeID), version2.RevTreeID)) + cvHash := base.Crc32cHashString([]byte(version2.CV.String())) + err := collection.PurgeOldRevisionJSON(activeCtx, docID, cvHash) require.NoError(t, err) // Set-up replicator // passiveDBURL, err := url.Parse(srv.URL + "/db") From 3be948e11d4c3a0757c9ccd1d7b72b64a4426a64 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 9 Dec 2024 14:11:27 +0000 Subject: [PATCH 2/6] update comments --- db/blip_handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/db/blip_handler.go b/db/blip_handler.go index e56cad327d..67dcd00219 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -772,6 +772,7 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error { } output.Write([]byte("]")) response := rq.Response() + // Disable delta sync for protocol versions < 4, CBG-3748 (backwards compatibility for revID delta sync) if bh.sgCanUseDeltas && bh.useHLV() { base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on handleChanges response") response.Properties[ChangesResponseDeltas] = trueProperty @@ -865,6 +866,7 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error { } output.Write([]byte("]")) response := rq.Response() + // Disable delta sync for protocol versions < 4, CBG-3748 (backwards compatibility for revID delta sync) if bh.sgCanUseDeltas && bh.useHLV() { base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on proposeChanges response") response.Properties[ChangesResponseDeltas] = trueProperty @@ -1083,6 +1085,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err if !bh.sgCanUseDeltas { return base.HTTPErrorf(http.StatusBadRequest, "Deltas are disabled for this peer") } else if !bh.useHLV() { + // Disable delta sync for protocol versions < 4, CBG-3748 (backwards compatibility for revID delta sync) return base.HTTPErrorf(http.StatusBadRequest, "backwards compatibility for revTree deltas not yet implemented") } From e706d88f509558fbe3768fb4deb12d6eecee0359 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 9 Dec 2024 16:04:55 +0000 Subject: [PATCH 3/6] fix backup revs --- db/crud.go | 16 +++++++--------- db/revision.go | 6 +++--- rest/changestest/changes_api_test.go | 17 +++++++++-------- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/db/crud.go b/db/crud.go index 2351d64e53..8a44c555ad 100644 --- a/db/crud.go +++ b/db/crud.go @@ -879,19 +879,13 @@ func (db *DatabaseCollectionWithUser) getAvailableRevAttachments(ctx context.Con // Moves a revision's ancestor's body out of the document object and into a separate db doc. func (db *DatabaseCollectionWithUser) backupAncestorRevs(ctx context.Context, doc *Document, newDoc *Document) { - newBodyBytes, err := newDoc.BodyBytes(ctx) - if err != nil { - base.WarnfCtx(ctx, "Error getting body bytes when backing up ancestor revs") - return - } // Find an ancestor that still has JSON in the document: var json []byte ancestorRevId := newDoc.RevID for { if ancestorRevId = doc.History.getParent(ancestorRevId); ancestorRevId == "" { - // No ancestors with JSON found. Check if we need to back up current rev for delta sync, then return - db.backupRevisionJSON(ctx, doc.ID, newDoc.RevID, "", newBodyBytes, nil, doc.Attachments) + // No ancestors with JSON found. Return early return } else if json = doc.getRevisionBodyJSON(ctx, ancestorRevId, db.RevisionBodyLoader); json != nil { break @@ -899,7 +893,7 @@ func (db *DatabaseCollectionWithUser) backupAncestorRevs(ctx context.Context, do } // Back up the revision JSON as a separate doc in the bucket: - db.backupRevisionJSON(ctx, doc.ID, newDoc.RevID, ancestorRevId, newBodyBytes, json, doc.Attachments) + db.backupRevisionJSON(ctx, doc.ID, doc.HLV.GetCurrentVersionString(), json) // Nil out the ancestor rev's body in the document struct: if ancestorRevId == doc.CurrentRev { @@ -2612,7 +2606,11 @@ func (db *DatabaseCollectionWithUser) postWriteUpdateHLV(ctx context.Context, do doc.HLV.CurrentVersionCAS = casOut } // backup new revision to the bucket now we have a doc assigned a CV (post macro expansion) for delta generation purposes - if db.UseXattrs() { + backupRev := db.deltaSyncEnabled() + if backupRev { + backupRev = db.deltaSyncRevMaxAgeSeconds() != 0 + } + if db.UseXattrs() && backupRev { var newBodyWithAtts = doc._rawBody if len(doc.Attachments) > 0 { var err error diff --git a/db/revision.go b/db/revision.go index 9f1d0891bb..4064cfe2b6 100644 --- a/db/revision.go +++ b/db/revision.go @@ -254,7 +254,7 @@ func (c *DatabaseCollection) getOldRevisionJSON(ctx context.Context, docid strin // - new revision stored (as duplicate), with expiry rev_max_age_seconds // delta=true && shared_bucket_access=false // - old revision stored, with expiry rev_max_age_seconds -func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, docId, newRev, oldRev string, newBody, oldBody []byte, newAtts AttachmentsMeta) { +func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, docId, oldRev string, oldBody []byte) { // Without delta sync, store the old rev for in-flight replication purposes if !db.deltaSyncEnabled() || db.deltaSyncRevMaxAgeSeconds() == 0 { @@ -270,7 +270,8 @@ func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, do // Special handling for Xattrs so that SG still has revisions that were updated by an SDK write if db.UseXattrs() { // Refresh the expiry on the previous revision backup - _ = db.refreshPreviousRevisionBackup(ctx, docId, oldRev, oldBody, db.deltaSyncRevMaxAgeSeconds()) + oldRevHash := base.Crc32cHashString([]byte(oldRev)) + _ = db.refreshPreviousRevisionBackup(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds()) return } @@ -279,7 +280,6 @@ func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, do oldRevHash := base.Crc32cHashString([]byte(oldRev)) _ = db.setOldRevisionJSON(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds()) } - return } func (db *DatabaseCollectionWithUser) setOldRevisionJSON(ctx context.Context, docid string, rev string, body []byte, expiry uint32) error { diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index 7969cca748..e2a0b19898 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -1824,9 +1824,15 @@ func TestChangesIncludeDocs(t *testing.T) { assert.NoError(t, err, "Error updating doc") // Generate more revs than revs_limit (3) revid = prunedRevId + var cvs []string for i := 0; i < 5; i++ { - revid, err = updateTestDoc(rt, "doc_pruned", revid, `{"type": "pruned", "channels":["gamma"]}`) - assert.NoError(t, err, "Error updating doc") + body := db.Body{ + "type": "pruned", + "channels": []string{"gamma"}, + } + docVersion := rt.UpdateDocDirectly("doc_pruned", db.DocVersion{RevTreeID: revid}, body) + revid = docVersion.RevTreeID + cvs = append(cvs, docVersion.CV.String()) } // Doc w/ attachment @@ -1889,13 +1895,8 @@ func TestChangesIncludeDocs(t *testing.T) { // Flush the rev cache, and issue changes again to ensure successful handling for rev cache misses rt.GetDatabase().FlushRevisionCacheForTest() // Also nuke temporary revision backup of doc_pruned. Validates that the body for the pruned revision is generated correctly when no longer resident in the rev cache - resp := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/doc_pruned?show_cv=true", "") - var r struct { - CV *string `json:"_cv"` - } - require.NoError(rt.TB(), base.JSONUnmarshal(resp.Body.Bytes(), &r)) // Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID) - cvHash := base.Crc32cHashString([]byte(*r.CV)) + cvHash := base.Crc32cHashString([]byte(cvs[0])) err = collection.PurgeOldRevisionJSON(ctx, "doc_pruned", cvHash) require.NoError(t, err) postFlushChanges := rt.GetChanges("/{{.keyspace}}/_changes?include_docs=true", "user1") From dee04a867b6b29dbdb291de9af0ee957b812d7bb Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 9 Dec 2024 16:10:34 +0000 Subject: [PATCH 4/6] further tidy up --- db/crud.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/db/crud.go b/db/crud.go index 8a44c555ad..09aa12beab 100644 --- a/db/crud.go +++ b/db/crud.go @@ -2606,10 +2606,7 @@ func (db *DatabaseCollectionWithUser) postWriteUpdateHLV(ctx context.Context, do doc.HLV.CurrentVersionCAS = casOut } // backup new revision to the bucket now we have a doc assigned a CV (post macro expansion) for delta generation purposes - backupRev := db.deltaSyncEnabled() - if backupRev { - backupRev = db.deltaSyncRevMaxAgeSeconds() != 0 - } + backupRev := db.deltaSyncEnabled() && db.deltaSyncRevMaxAgeSeconds() != 0 if db.UseXattrs() && backupRev { var newBodyWithAtts = doc._rawBody if len(doc.Attachments) > 0 { From e7ab93c4e36a659070da306b88bf58f3d5ebfa81 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Tue, 10 Dec 2024 12:45:12 +0000 Subject: [PATCH 5/6] udpated to address comments and fix flaking tests --- db/blip_handler.go | 3 --- db/blip_sync_context.go | 16 ++++++++++++---- db/revision_cache_interface.go | 3 ++- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/db/blip_handler.go b/db/blip_handler.go index 67dcd00219..379a5926f9 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -1084,9 +1084,6 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err if deltaSrcRevID, isDelta := revMessage.DeltaSrc(); isDelta && !revMessage.Deleted() { if !bh.sgCanUseDeltas { return base.HTTPErrorf(http.StatusBadRequest, "Deltas are disabled for this peer") - } else if !bh.useHLV() { - // Disable delta sync for protocol versions < 4, CBG-3748 (backwards compatibility for revID delta sync) - return base.HTTPErrorf(http.StatusBadRequest, "backwards compatibility for revTree deltas not yet implemented") } // TODO: Doing a GetRevCopy here duplicates some rev cache retrieval effort, since deltaRevSrc is always diff --git a/db/blip_sync_context.go b/db/blip_sync_context.go index 9fd8eab56a..c16165773c 100644 --- a/db/blip_sync_context.go +++ b/db/blip_sync_context.go @@ -355,16 +355,17 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b } // The first element of the knownRevsArray returned from CBL is the parent revision to use as deltaSrc for - // revtree clients, for HLV clients the first element is the HLV + // revtree clients. For HLV clients, use the cv as deltaSrc if bsc.useDeltas && len(knownRevsArray) > 0 { if revID, ok := knownRevsArray[0].(string); ok { if bsc.useHLV() { msgHLV, err := extractHLVFromBlipMessage(revID) if err != nil { - base.ErrorfCtx(ctx, "Invalid known rev format for hlv on doc: %s", docID) - return nil + base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", docID) + deltaSrcRevID = "" // will force falling back to full body replication below + } else { + deltaSrcRevID = msgHLV.GetCurrentVersionString() } - deltaSrcRevID = msgHLV.GetCurrentVersionString() } else { deltaSrcRevID = revID } @@ -373,6 +374,13 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b for _, rev := range knownRevsArray { if revID, ok := rev.(string); ok { + msgHLV, err := extractHLVFromBlipMessage(revID) + if err == nil { + // extract cv as string + revID = msgHLV.GetCurrentVersionString() + } + // we can assume here that if we fail to parse hlv, we have received a rev id in known revs. If we don't fail to parse hlv + // then we have extracted cv from it and can assign the cv string to known revs here knownRevs[revID] = true } else { base.ErrorfCtx(ctx, "Invalid response to 'changes' message") diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index c43f952c04..46bf3679a9 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -453,6 +453,7 @@ func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCache base.ErrorfCtx(ctx, "pending CBG-3814 support of channel removal for CV: %v", err) } + deleted = doc.Deleted channels = doc.SyncData.getCurrentChannels() revid = doc.CurrentRev hlv = doc.HLV @@ -462,7 +463,7 @@ func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCache func (c *DatabaseCollection) getCurrentVersion(ctx context.Context, doc *Document, cv Version) (bodyBytes []byte, attachments AttachmentsMeta, err error) { if err = doc.HasCurrentVersion(ctx, cv); err != nil { - bodyBytes, err = c.getOldRevisionJSON(ctx, doc.ID, doc.CurrentRev) + bodyBytes, err = c.getOldRevisionJSON(ctx, doc.ID, cv.String()) if err != nil || bodyBytes == nil { return nil, nil, err } From 735237bd9985488ef797eef1e206c2413d9e1f66 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Tue, 10 Dec 2024 12:48:46 +0000 Subject: [PATCH 6/6] fix incorrect fetch format by cv in getCurrentVersion --- db/revision_cache_interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index 46bf3679a9..6a71b2ed8e 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -463,7 +463,7 @@ func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCache func (c *DatabaseCollection) getCurrentVersion(ctx context.Context, doc *Document, cv Version) (bodyBytes []byte, attachments AttachmentsMeta, err error) { if err = doc.HasCurrentVersion(ctx, cv); err != nil { - bodyBytes, err = c.getOldRevisionJSON(ctx, doc.ID, cv.String()) + bodyBytes, err = c.getOldRevisionJSON(ctx, doc.ID, base.Crc32cHashString([]byte(cv.String()))) if err != nil || bodyBytes == nil { return nil, nil, err }