Skip to content

Commit

Permalink
CBG-3719: convert in memory format of HLV to match XDCR/CBL format (#…
Browse files Browse the repository at this point in the history
…6655)

* CBG-3719: change in memory format of hlv to match XDCR/CBL format

* updates after rebase

* updates to fix missed type swap in channels package

* update to add encoded bucket UUID to db contect, this allows us to avoid overhead associated with encoding bucketUUID each time a HLV is updated

* updates after rebase

* updates based off review
  • Loading branch information
gregns1 authored and adamcfraser committed May 10, 2024
1 parent c83eb88 commit ceafa08
Show file tree
Hide file tree
Showing 25 changed files with 315 additions and 341 deletions.
4 changes: 2 additions & 2 deletions channels/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ type LogEntry struct {
IsPrincipal bool // Whether the log-entry is a tracking entry for a principal doc
CollectionID uint32 // Collection ID
SourceID string // SourceID allocated to the doc's Current Version on the HLV
Version uint64 // Version allocated to the doc's Current Version on the HLV
Version string // Version allocated to the doc's Current Version on the HLV
}

func (l LogEntry) String() string {
return fmt.Sprintf(
"seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d source: %s version: %d",
"seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d source: %s version: %s",
l.Sequence,
l.DocID,
l.RevID,
Expand Down
3 changes: 1 addition & 2 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,7 @@ func (bsc *BlipSyncContext) sendRevision(sender *blip.Sender, docID, rev string,
if vrsErr != nil {
return vrsErr
}
// replace with GetCV pending merge of CBG-3212
docRev, err = handleChangesResponseCollection.revisionCache.GetWithCV(bsc.loggingCtx, docID, &version, RevCacheOmitBody, RevCacheOmitDelta)
docRev, err = handleChangesResponseCollection.GetCV(bsc.loggingCtx, docID, &version, RevCacheOmitBody)
}
if base.IsDocNotFoundError(err) {
return bsc.sendNoRev(sender, docID, rev, collectionIdx, seq, err)
Expand Down
4 changes: 2 additions & 2 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (entry *LogEntry) SetRevAndVersion(rv channels.RevAndVersion) {
entry.RevID = rv.RevTreeID
if rv.CurrentSource != "" {
entry.SourceID = rv.CurrentSource
entry.Version = base.HexCasToUint64(rv.CurrentVersion)
entry.Version = rv.CurrentVersion
}
}

Expand Down Expand Up @@ -492,7 +492,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
change.DocID = docID
change.RevID = atRev.RevTreeID
change.SourceID = atRev.CurrentSource
change.Version = base.HexCasToUint64(atRev.CurrentVersion)
change.Version = atRev.CurrentVersion
change.Channels = channelRemovals
}

Expand Down
2 changes: 1 addition & 1 deletion db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func testLogEntryWithCV(seq uint64, docid string, revid string, channelNames []s
TimeReceived: time.Now(),
CollectionID: collectionID,
SourceID: sourceID,
Version: version,
Version: string(base.Uint64CASToLittleEndianHex(version)),
}
channelMap := make(channels.ChannelMap)
for _, channelName := range channelNames {
Expand Down
2 changes: 1 addition & 1 deletion db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func makeChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID)
// populate CurrentVersion entry if log entry has sourceID and Version populated
// This allows current version to be nil in event of CV not being populated on log entry
// allowing omitempty to work as expected
if logEntry.SourceID != "" && logEntry.Version != 0 {
if logEntry.SourceID != "" && logEntry.Version != "" {
change.CurrentVersion = &Version{SourceID: logEntry.SourceID, Value: logEntry.Version}
}
if logEntry.Flags&channels.Removed != 0 {
Expand Down
10 changes: 5 additions & 5 deletions db/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func TestCVPopulationOnChangeEntry(t *testing.T) {
defer db.Close(ctx)
collection := GetSingleDatabaseCollectionWithUser(t, db)
collectionID := collection.GetCollectionID()
bucketUUID := db.BucketUUID
bucketUUID := db.EncodedBucketUUID

collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

Expand All @@ -318,9 +318,10 @@ func TestCVPopulationOnChangeEntry(t *testing.T) {
changes, err := collection.GetChanges(ctx, base.SetOf("A"), getChangesOptionsWithZeroSeq(t))
require.NoError(t, err)

encodedCAS := string(base.Uint64CASToLittleEndianHex(doc.Cas))
assert.Equal(t, doc.ID, changes[0].ID)
assert.Equal(t, bucketUUID, changes[0].CurrentVersion.SourceID)
assert.Equal(t, doc.Cas, changes[0].CurrentVersion.Value)
assert.Equal(t, encodedCAS, changes[0].CurrentVersion.Value)
}

func TestDocDeletionFromChannelCoalesced(t *testing.T) {
Expand Down Expand Up @@ -567,7 +568,7 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {
defer db.Close(ctx)
collection := GetSingleDatabaseCollectionWithUser(t, db)
collectionID := collection.GetCollectionID()
bucketUUID := db.BucketUUID
bucketUUID := db.EncodedBucketUUID
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

// Make channel active
Expand All @@ -582,7 +583,6 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {

syncData, err := collection.GetDocSyncData(ctx, "doc1")
require.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)

// get entry of above doc from channel cache
entries, err := db.channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), getChangesOptionsWithZeroSeq(t))
Expand All @@ -591,7 +591,7 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {

// assert that the source and version has been populated with the channel cache entry for the doc
assert.Equal(t, "doc1", entries[0].DocID)
assert.Equal(t, uintCAS, entries[0].Version)
assert.Equal(t, syncData.Cas, entries[0].Version)
assert.Equal(t, bucketUUID, entries[0].SourceID)
assert.Equal(t, syncData.HLV.SourceID, entries[0].SourceID)
assert.Equal(t, syncData.HLV.Version, entries[0].Version)
Expand Down
2 changes: 1 addition & 1 deletion db/changes_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIter

if queryRow.RemovalRev != nil {
entry.RevID = queryRow.RemovalRev.RevTreeID
entry.Version = base.HexCasToUint64(queryRow.RemovalRev.CurrentVersion)
entry.Version = queryRow.RemovalRev.CurrentVersion
entry.SourceID = queryRow.RemovalRev.CurrentSource
if queryRow.RemovalDel {
entry.SetDeleted()
Expand Down
3 changes: 2 additions & 1 deletion db/channel_cache_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,8 @@ func verifyCVEntries(entries []*LogEntry, cvs []cvValues) bool {
if entries[index].SourceID != cv.source {
return false
}
if entries[index].Version != cv.version {
encdedVrs := string(base.Uint64CASToLittleEndianHex(cv.version))
if entries[index].Version != encdedVrs {
return false
}
}
Expand Down
24 changes: 14 additions & 10 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,37 +887,38 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU
case ExistingVersion:
// preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space
d.HLV.ImportCAS = "" // remove importCAS for non-imports to save space
case Import:
if d.HLV.CurrentVersionCAS == d.Cas {
encodedCAS := string(base.Uint64CASToLittleEndianHex(d.Cas))
if d.HLV.CurrentVersionCAS == encodedCAS {
// if cvCAS = document CAS, the HLV has already been updated for this mutation by another HLV-aware peer.
// Set ImportCAS to the previous document CAS, but don't otherwise modify HLV
d.HLV.ImportCAS = d.Cas
d.HLV.ImportCAS = encodedCAS
} else {
// Otherwise this is an SDK mutation made by the local cluster that should be added to HLV.
newVVEntry := Version{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.SourceID = db.dbCtx.EncodedBucketUUID
newVVEntry.Value = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
}
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = d.Cas
d.HLV.ImportCAS = encodedCAS
}

case NewVersion, ExistingVersionWithUpdateToHLV:
// add a new entry to the version vector
newVVEntry := Version{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.SourceID = db.dbCtx.EncodedBucketUUID
newVVEntry.Value = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
}
// update the cvCAS on the SGWrite event too
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space
d.HLV.ImportCAS = "" // remove importCAS for non-imports to save space
}
return d, nil
}
Expand Down Expand Up @@ -1120,7 +1121,9 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
return nil, nil, false, nil, addNewerVersionsErr
}
} else {
if !docHLV.IsInConflict(*doc.HLV) {
incomingDecodedHLV := docHLV.ToDecodedHybridLogicalVector()
localDecodedHLV := doc.HLV.ToDecodedHybridLogicalVector()
if !incomingDecodedHLV.IsInConflict(localDecodedHLV) {
// update hlv for all newer incoming source version pairs
addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV)
if addNewerVersionsErr != nil {
Expand Down Expand Up @@ -2396,11 +2399,12 @@ func postWriteUpdateHLV(doc *Document, casOut uint64) *Document {
if doc.HLV == nil {
return doc
}
encodedCAS := string(base.Uint64CASToLittleEndianHex(casOut))
if doc.HLV.Version == hlvExpandMacroCASValue {
doc.HLV.Version = casOut
doc.HLV.Version = encodedCAS
}
if doc.HLV.CurrentVersionCAS == hlvExpandMacroCASValue {
doc.HLV.CurrentVersionCAS = casOut
doc.HLV.CurrentVersionCAS = encodedCAS
}
return doc
}
Expand Down
37 changes: 17 additions & 20 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

bucketUUID := db.BucketUUID
bucketUUID := db.EncodedBucketUUID
collection := GetSingleDatabaseCollectionWithUser(t, db)

// create a new doc
Expand All @@ -1706,10 +1706,9 @@ func TestPutExistingCurrentVersion(t *testing.T) {
// assert on HLV on that above PUT
syncData, err := collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
assert.Equal(t, syncData.Cas, syncData.HLV.Version)
assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS)

// store the cas version allocated to the above doc creation for creation of incoming HLV later in test
originalDocVersion := syncData.HLV.Version
Expand All @@ -1724,17 +1723,18 @@ func TestPutExistingCurrentVersion(t *testing.T) {
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
docUpdateVersion := syncData.HLV.Version
docUpdateVersionInt := base.HexCasToUint64(docUpdateVersion)

// construct a mock doc update coming over a replicator
body = Body{"key1": "value2"}
newDoc := createTestDocument(key, "", body, false, 0)

// construct a HLV that simulates a doc update happening on a client
// this means moving the current source version pair to PV and adding new sourceID and version pair to CV
pv := make(map[string]uint64)
pv := make(map[string]string)
pv[bucketUUID] = originalDocVersion
// create a version larger than the allocated version above
incomingVersion := docUpdateVersion + 10
incomingVersion := string(base.Uint64CASToLittleEndianHex(docUpdateVersionInt + 10))
incomingHLV := HybridLogicalVector{
SourceID: "test",
Version: incomingVersion,
Expand All @@ -1758,11 +1758,10 @@ func TestPutExistingCurrentVersion(t *testing.T) {
// PV should contain the old CV pair
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
uintCAS = base.HexCasToUint64(syncData.Cas)

assert.Equal(t, "test", syncData.HLV.SourceID)
assert.Equal(t, incomingVersion, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS)
// update the pv map so we can assert we have correct pv map in HLV
pv[bucketUUID] = docUpdateVersion
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
Expand All @@ -1780,7 +1779,7 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

bucketUUID := db.BucketUUID
bucketUUID := db.EncodedBucketUUID
collection := GetSingleDatabaseCollectionWithUser(t, db)

// create a new doc
Expand All @@ -1793,17 +1792,16 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
// assert on the HLV values after the above creation of the doc
syncData, err := collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
assert.Equal(t, syncData.Cas, syncData.HLV.Version)
assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS)

// create a new doc update to simulate a doc update arriving over replicator from, client
body = Body{"key1": "value2"}
newDoc := createTestDocument(key, "", body, false, 0)
incomingHLV := HybridLogicalVector{
SourceID: "test",
Version: 1234,
Version: string(base.Uint64CASToLittleEndianHex(1234)),
}

// grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function
Expand All @@ -1821,8 +1819,8 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
assert.Equal(t, syncData.Cas, syncData.HLV.Version)
assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS)
}

// TestPutExistingCurrentVersionWithNoExistingDoc:
Expand All @@ -1842,10 +1840,10 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {

// construct a HLV that simulates a doc update happening on a client
// this means moving the current source version pair to PV and adding new sourceID and version pair to CV
pv := make(map[string]uint64)
pv[bucketUUID] = 2
pv := make(map[string]string)
pv[bucketUUID] = string(base.Uint64CASToLittleEndianHex(uint64(2)))
// create a version larger than the allocated version above
incomingVersion := uint64(2 + 10)
incomingVersion := string(base.Uint64CASToLittleEndianHex(uint64(2 + 10)))
incomingHLV := HybridLogicalVector{
SourceID: "test",
Version: incomingVersion,
Expand All @@ -1865,10 +1863,9 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
// PV should contain the old CV pair
syncData, err := collection.GetDocSyncData(ctx, "doc2")
assert.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, "test", syncData.HLV.SourceID)
assert.Equal(t, incomingVersion, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS)
// update the pv map so we can assert we have correct pv map in HLV
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
Expand Down
3 changes: 3 additions & 0 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package db

import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -98,6 +99,7 @@ type DatabaseContext struct {
Bucket base.Bucket // Storage
BucketSpec base.BucketSpec // The BucketSpec
BucketUUID string // The bucket UUID for the bucket the database is created against
EncodedBucketUUID string // The bucket UUID for the bucket the database is created against but encoded in base64
BucketLock sync.RWMutex // Control Access to the underlying bucket object
mutationListener changeListener // Caching feed listener
ImportListener *importListener // Import feed listener
Expand Down Expand Up @@ -421,6 +423,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
MetadataStore: metadataStore,
Bucket: bucket,
BucketUUID: bucketUUID,
EncodedBucketUUID: base64.StdEncoding.EncodeToString([]byte(bucketUUID)),
StartTime: time.Now(),
autoImport: autoImport,
Options: options,
Expand Down
Loading

0 comments on commit ceafa08

Please sign in to comment.