Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4389: extract cv from known revs and store backup rev by cv #7237

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions db/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ func TestBackupOldRevisionWithAttachments(t *testing.T) {
require.NoError(t, base.JSONUnmarshal([]byte(rev1Data), &rev1Body))

collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
rev1ID, _, err := collection.Put(ctx, docID, rev1Body)
rev1ID, docRev1, err := collection.Put(ctx, docID, rev1Body)
require.NoError(t, err)
assert.Equal(t, "1-12ff9ce1dd501524378fe092ce9aee8f", rev1ID)

rev1OldBody, err := collection.getOldRevisionJSON(ctx, docID, rev1ID)
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
rev1OldBody, err := collection.getOldRevisionJSON(ctx, docID, base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString())))
if deltasEnabled && xattrsEnabled {
require.NoError(t, err)
assert.Contains(t, string(rev1OldBody), "hello.txt")
Expand All @@ -67,16 +68,18 @@ func TestBackupOldRevisionWithAttachments(t *testing.T) {
var rev2Body Body
rev2Data := `{"test": true, "updated": true, "_attachments": {"hello.txt": {"stub": true, "revpos": 1}}}`
require.NoError(t, base.JSONUnmarshal([]byte(rev2Data), &rev2Body))
_, rev2ID, err := collection.PutExistingRevWithBody(ctx, docID, rev2Body, []string{"2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV)
docRev2, _, err := collection.PutExistingRevWithBody(ctx, docID, rev2Body, []string{"2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV)
require.NoError(t, err)

// now in any case - we'll have rev 1 backed up
rev1OldBody, err = collection.getOldRevisionJSON(ctx, docID, rev1ID)
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
rev1OldBody, err = collection.getOldRevisionJSON(ctx, docID, base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString())))
require.NoError(t, err)
assert.Contains(t, string(rev1OldBody), "hello.txt")

// and rev 2 should be present only for the xattrs and deltas case
rev2OldBody, err := collection.getOldRevisionJSON(ctx, docID, rev2ID)
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
rev2OldBody, err := collection.getOldRevisionJSON(ctx, docID, base.Crc32cHashString([]byte(docRev2.HLV.GetCurrentVersionString())))
if deltasEnabled && xattrsEnabled {
require.NoError(t, err)
assert.Contains(t, string(rev2OldBody), "hello.txt")
Expand All @@ -100,7 +103,7 @@ func TestAttachments(t *testing.T) {
assert.NoError(t, base.JSONUnmarshal([]byte(rev1input), &body))

collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
revid, _, err := collection.Put(ctx, "doc1", body)
revid, docRev1, err := collection.Put(ctx, "doc1", body)
rev1id := revid
assert.NoError(t, err, "Couldn't create document")

Expand Down Expand Up @@ -188,7 +191,8 @@ func TestAttachments(t *testing.T) {
assert.Equal(t, float64(2), bye["revpos"])

log.Printf("Expire body of rev 1, then add a child...") // test fix of #498
err = collection.dataStore.Delete(oldRevisionKey("doc1", rev1id))
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
err = collection.dataStore.Delete(oldRevisionKey("doc1", base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString()))))
assert.NoError(t, err, "Couldn't compact old revision")
rev2Bstr := `{"_attachments": {"bye.txt": {"stub":true,"revpos":1,"digest":"sha1-gwwPApfQR9bzBKpqoEYwFmKp98A="}}, "_rev": "2-f000"}`
var body2B Body
Expand Down
6 changes: 4 additions & 2 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,8 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
}
output.Write([]byte("]"))
response := rq.Response()
if bh.sgCanUseDeltas {
// Disable delta sync for protocol versions < 4, CBG-3748 (backwards compatibility for revID delta sync)
if bh.sgCanUseDeltas && bh.useHLV() {
base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on handleChanges response")
response.Properties[ChangesResponseDeltas] = trueProperty
bh.replicationStats.HandleChangesDeltaRequestedCount.Add(int64(nRequested))
Expand Down Expand Up @@ -865,7 +866,8 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
}
output.Write([]byte("]"))
response := rq.Response()
if bh.sgCanUseDeltas {
// Disable delta sync for protocol versions < 4, CBG-3748 (backwards compatibility for revID delta sync)
if bh.sgCanUseDeltas && bh.useHLV() {
base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on proposeChanges response")
response.Properties[ChangesResponseDeltas] = trueProperty
}
Expand Down
25 changes: 22 additions & 3 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,33 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
knownRevsByDoc[docID] = knownRevs
}

// The first element of the knownRevsArray returned from CBL is the parent revision to use as deltaSrc
// The first element of the knownRevsArray returned from CBL is the parent revision to use as deltaSrc for
// revtree clients. For HLV clients, use the cv as deltaSrc
if bsc.useDeltas && len(knownRevsArray) > 0 {
if revID, ok := knownRevsArray[0].(string); ok {
deltaSrcRevID = revID
if bsc.useHLV() {
msgHLV, err := extractHLVFromBlipMessage(revID)
if err != nil {
base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", docID)
deltaSrcRevID = "" // will force falling back to full body replication below
} else {
deltaSrcRevID = msgHLV.GetCurrentVersionString()
}
} else {
deltaSrcRevID = revID
}
}
}

for _, rev := range knownRevsArray {
if revID, ok := rev.(string); ok {
msgHLV, err := extractHLVFromBlipMessage(revID)
if err == nil {
// extract cv as string
revID = msgHLV.GetCurrentVersionString()
}
// we can assume here that if we fail to parse hlv, we have received a rev id in known revs. If we don't fail to parse hlv
// then we have extracted cv from it and can assign the cv string to known revs here
knownRevs[revID] = true
} else {
base.ErrorfCtx(ctx, "Invalid response to 'changes' message")
Expand All @@ -372,7 +390,8 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b

var err error

if deltaSrcRevID != "" {
// fallback to sending full revisions for non hlv aware peers, CBG-3748
if deltaSrcRevID != "" && bsc.useHLV() {
err = bsc.sendRevAsDelta(ctx, sender, docID, rev, deltaSrcRevID, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx)
} else {
err = bsc.sendRevision(ctx, sender, docID, rev, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx)
Expand Down
32 changes: 22 additions & 10 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,27 +879,21 @@ func (db *DatabaseCollectionWithUser) getAvailableRevAttachments(ctx context.Con

// Moves a revision's ancestor's body out of the document object and into a separate db doc.
func (db *DatabaseCollectionWithUser) backupAncestorRevs(ctx context.Context, doc *Document, newDoc *Document) {
newBodyBytes, err := newDoc.BodyBytes(ctx)
if err != nil {
base.WarnfCtx(ctx, "Error getting body bytes when backing up ancestor revs")
return
}

// Find an ancestor that still has JSON in the document:
var json []byte
ancestorRevId := newDoc.RevID
for {
if ancestorRevId = doc.History.getParent(ancestorRevId); ancestorRevId == "" {
// No ancestors with JSON found. Check if we need to back up current rev for delta sync, then return
db.backupRevisionJSON(ctx, doc.ID, newDoc.RevID, "", newBodyBytes, nil, doc.Attachments)
// No ancestors with JSON found. Return early
adamcfraser marked this conversation as resolved.
Show resolved Hide resolved
return
} else if json = doc.getRevisionBodyJSON(ctx, ancestorRevId, db.RevisionBodyLoader); json != nil {
break
}
}

// Back up the revision JSON as a separate doc in the bucket:
db.backupRevisionJSON(ctx, doc.ID, newDoc.RevID, ancestorRevId, newBodyBytes, json, doc.Attachments)
db.backupRevisionJSON(ctx, doc.ID, doc.HLV.GetCurrentVersionString(), json)

// Nil out the ancestor rev's body in the document struct:
if ancestorRevId == doc.CurrentRev {
Expand Down Expand Up @@ -2448,7 +2442,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
doc.MetadataOnlyUpdate.HexCAS = base.CasToString(casOut)
}
// update the doc's HLV defined post macro expansion
doc = postWriteUpdateHLV(doc, casOut)
doc = db.postWriteUpdateHLV(ctx, doc, casOut)
}
}

Expand Down Expand Up @@ -2601,7 +2595,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
return doc, newRevID, nil
}

func postWriteUpdateHLV(doc *Document, casOut uint64) *Document {
func (db *DatabaseCollectionWithUser) postWriteUpdateHLV(ctx context.Context, doc *Document, casOut uint64) *Document {
if doc.HLV == nil {
return doc
}
Expand All @@ -2611,6 +2605,24 @@ func postWriteUpdateHLV(doc *Document, casOut uint64) *Document {
if doc.HLV.CurrentVersionCAS == expandMacroCASValueUint64 {
doc.HLV.CurrentVersionCAS = casOut
}
// backup new revision to the bucket now we have a doc assigned a CV (post macro expansion) for delta generation purposes
backupRev := db.deltaSyncEnabled() && db.deltaSyncRevMaxAgeSeconds() != 0
if db.UseXattrs() && backupRev {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the UseXattrs() check is unnecessary here (we don't support HLV with non-xattrs)

var newBodyWithAtts = doc._rawBody
if len(doc.Attachments) > 0 {
var err error
newBodyWithAtts, err = base.InjectJSONProperties(doc._rawBody, base.KVPair{
Key: BodyAttachments,
Val: doc.Attachments,
})
if err != nil {
base.WarnfCtx(ctx, "Unable to marshal new revision body during backupRevisionJSON: doc=%q rev=%q cv=%q err=%v ", base.UD(doc.ID), doc.CurrentRev, doc.HLV.GetCurrentVersionString(), err)
return doc
}
}
revHash := base.Crc32cHashString([]byte(doc.HLV.GetCurrentVersionString()))
_ = db.setOldRevisionJSON(ctx, doc.ID, revHash, newBodyWithAtts, db.deltaSyncRevMaxAgeSeconds())
}
return doc
}

Expand Down
1 change: 1 addition & 0 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,7 @@ func BenchmarkHandleRevDelta(b *testing.B) {
}

func TestGetAvailableRevAttachments(t *testing.T) {
t.Skip("Revs are backed up by hash of CV now, test needs to fetch backup rev by revID, CBG-3748 (backwards compatibility for revID)")
db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
Expand Down
56 changes: 36 additions & 20 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,15 +496,15 @@ func TestGetRemovedAsUser(t *testing.T) {
"key1": 1234,
"channels": []string{"ABC"},
}
rev1id, _, err := collection.Put(ctx, "doc1", rev1body)
rev1id, docRev1, err := collection.Put(ctx, "doc1", rev1body)
assert.NoError(t, err, "Put")

rev2body := Body{
"key1": 1234,
"channels": []string{"NBC"},
BodyRev: rev1id,
}
rev2id, _, err := collection.Put(ctx, "doc1", rev2body)
rev2id, docRev2, err := collection.Put(ctx, "doc1", rev2body)
assert.NoError(t, err, "Put Rev 2")

// Add another revision, so that rev 2 is obsolete
Expand Down Expand Up @@ -542,7 +542,9 @@ func TestGetRemovedAsUser(t *testing.T) {
ShardCount: DefaultRevisionCacheShardCount,
}
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id)
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
cv := docRev2.HLV.GetCurrentVersionString()
err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(cv)))
assert.NoError(t, err, "Purge old revision JSON")

// Try again with a user who doesn't have access to this revision
Expand All @@ -568,7 +570,9 @@ func TestGetRemovedAsUser(t *testing.T) {
assert.Equal(t, expectedResult, body)

// Ensure revision is unavailable for a non-leaf revision that isn't available via the rev cache, and wasn't a channel removal
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev1id)
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
cv = docRev1.HLV.GetCurrentVersionString()
err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(cv)))
assert.NoError(t, err, "Purge old revision JSON")

_, err = collection.Get1xRevBody(ctx, "doc1", rev1id, true, nil)
Expand Down Expand Up @@ -631,7 +635,7 @@ func TestGetRemovalMultiChannel(t *testing.T) {
"channels": []string{"ABC"},
BodyRev: rev1ID,
}
rev2ID, _, err := collection.Put(ctx, "doc1", rev2Body)
rev2ID, docRev2, err := collection.Put(ctx, "doc1", rev2Body)
require.NoError(t, err, "Error creating doc")

// Create the third revision of doc1 on channel ABC.
Expand Down Expand Up @@ -683,7 +687,8 @@ func TestGetRemovalMultiChannel(t *testing.T) {

// Flush the revision cache and purge the old revision backup.
db.FlushRevisionCacheForTest()
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2ID)
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(docRev2.HLV.GetCurrentVersionString())))
require.NoError(t, err, "Error purging old revision JSON")

// Try with a user who has access to this revision.
Expand Down Expand Up @@ -714,10 +719,11 @@ func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
name string
versionVector bool
}{
{
name: "revTree test",
versionVector: false,
},
// Revs are backed up by hash of CV now, now way to fetch backup revs by revID till CBG-3748 (backwards compatibility for revID)
//{
// name: "revTree test",
// versionVector: false,
//},
{
name: "versionVector test",
versionVector: true,
Expand Down Expand Up @@ -758,8 +764,14 @@ func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {

// Flush the revision cache and purge the old revision backup.
db.FlushRevisionCacheForTest()
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2ID)
require.NoError(t, err, "Error purging old revision JSON")
if testCase.versionVector {
cvStr := docRev2.HLV.GetCurrentVersionString()
err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(cvStr)))
require.NoError(t, err, "Error purging old revision JSON")
} else {
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2ID)
require.NoError(t, err, "Error purging old revision JSON")
}

// Request delta between rev2ID and rev3ID (toRevision "rev2ID" is channel removal)
// as a user who doesn't have access to the removed revision via any other channel.
Expand Down Expand Up @@ -923,15 +935,15 @@ func TestGetRemoved(t *testing.T) {
"key1": 1234,
"channels": []string{"ABC"},
}
rev1id, _, err := collection.Put(ctx, "doc1", rev1body)
rev1id, docRev1, err := collection.Put(ctx, "doc1", rev1body)
assert.NoError(t, err, "Put")

rev2body := Body{
"key1": 1234,
"channels": []string{"NBC"},
BodyRev: rev1id,
}
rev2id, _, err := collection.Put(ctx, "doc1", rev2body)
rev2id, docRev2, err := collection.Put(ctx, "doc1", rev2body)
assert.NoError(t, err, "Put Rev 2")

// Add another revision, so that rev 2 is obsolete
Expand Down Expand Up @@ -969,7 +981,8 @@ func TestGetRemoved(t *testing.T) {
ShardCount: DefaultRevisionCacheShardCount,
}
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id)
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(docRev2.HLV.GetCurrentVersionString())))
assert.NoError(t, err, "Purge old revision JSON")

// Get the removal revision with its history; equivalent to GET with ?revs=true
Expand All @@ -978,7 +991,8 @@ func TestGetRemoved(t *testing.T) {
require.Nil(t, body)

// Ensure revision is unavailable for a non-leaf revision that isn't available via the rev cache, and wasn't a channel removal
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev1id)
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString())))
assert.NoError(t, err, "Purge old revision JSON")

_, err = collection.Get1xRevBody(ctx, "doc1", rev1id, true, nil)
Expand All @@ -997,15 +1011,15 @@ func TestGetRemovedAndDeleted(t *testing.T) {
"key1": 1234,
"channels": []string{"ABC"},
}
rev1id, _, err := collection.Put(ctx, "doc1", rev1body)
rev1id, docRev1, err := collection.Put(ctx, "doc1", rev1body)
assert.NoError(t, err, "Put")

rev2body := Body{
"key1": 1234,
BodyDeleted: true,
BodyRev: rev1id,
}
rev2id, _, err := collection.Put(ctx, "doc1", rev2body)
rev2id, docRev2, err := collection.Put(ctx, "doc1", rev2body)
assert.NoError(t, err, "Put Rev 2")

// Add another revision, so that rev 2 is obsolete
Expand Down Expand Up @@ -1043,7 +1057,8 @@ func TestGetRemovedAndDeleted(t *testing.T) {
ShardCount: DefaultRevisionCacheShardCount,
}
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStats)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id)
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(docRev2.HLV.GetCurrentVersionString())))
assert.NoError(t, err, "Purge old revision JSON")

// Get the deleted doc with its history; equivalent to GET with ?revs=true
Expand All @@ -1052,7 +1067,8 @@ func TestGetRemovedAndDeleted(t *testing.T) {
require.Nil(t, body)

// Ensure revision is unavailable for a non-leaf revision that isn't available via the rev cache, and wasn't a channel removal
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev1id)
// Revs are backed up by hash of CV now, switch to fetch by this till CBG-3748 (backwards compatibility for revID)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", base.Crc32cHashString([]byte(docRev1.HLV.GetCurrentVersionString())))
assert.NoError(t, err, "Purge old revision JSON")

_, err = collection.Get1xRevBody(ctx, "doc1", rev1id, true, nil)
Expand Down
Loading
Loading