Skip to content

Commit

Permalink
CBG-3719: convert in memory format of HLV to match XDCR/CBL format (#…
Browse files Browse the repository at this point in the history
…6655)

* CBG-3719: change in memory format of hlv to match XDCR/CBL format

* updates after rebase

* updates to fix missed type swap in channels package

* update to add encoded bucket UUID to db contect, this allows us to avoid overhead associated with encoding bucketUUID each time a HLV is updated

* updates after rebase

* updates based off review
  • Loading branch information
gregns1 authored and adamcfraser committed Aug 11, 2024
1 parent e061428 commit 0943429
Show file tree
Hide file tree
Showing 27 changed files with 332 additions and 349 deletions.
4 changes: 2 additions & 2 deletions channels/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ type LogEntry struct {
IsPrincipal bool // Whether the log-entry is a tracking entry for a principal doc
CollectionID uint32 // Collection ID
SourceID string // SourceID allocated to the doc's Current Version on the HLV
Version uint64 // Version allocated to the doc's Current Version on the HLV
Version string // Version allocated to the doc's Current Version on the HLV
}

func (l LogEntry) String() string {
return fmt.Sprintf(
"seq: %d docid: %s revid: %s collectionID: %d source: %s version: %d",
"seq: %d docid: %s revid: %s collectionID: %d source: %s version: %s",
l.Sequence,
l.DocID,
l.RevID,
Expand Down
3 changes: 1 addition & 2 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,8 +635,7 @@ func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sende
if vrsErr != nil {
return vrsErr
}
// replace with GetCV pending merge of CBG-3212
docRev, originalErr = handleChangesResponseCollection.revisionCache.GetWithCV(ctx, docID, &version, RevCacheOmitDelta)
docRev, originalErr = handleChangesResponseCollection.GetCV(bsc.loggingCtx, docID, &version)
}

// set if we find an alternative revision to send in the event the originally requested rev is unavailable
Expand Down
4 changes: 2 additions & 2 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (entry *LogEntry) SetRevAndVersion(rv channels.RevAndVersion) {
entry.RevID = rv.RevTreeID
if rv.CurrentSource != "" {
entry.SourceID = rv.CurrentSource
entry.Version = base.HexCasToUint64(rv.CurrentVersion)
entry.Version = rv.CurrentVersion
}
}

Expand Down Expand Up @@ -494,7 +494,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
change.DocID = docID
change.RevID = atRev.RevTreeID
change.SourceID = atRev.CurrentSource
change.Version = base.HexCasToUint64(atRev.CurrentVersion)
change.Version = atRev.CurrentVersion
change.Channels = channelRemovals
}

Expand Down
2 changes: 1 addition & 1 deletion db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func testLogEntryWithCV(seq uint64, docid string, revid string, channelNames []s
TimeReceived: time.Now(),
CollectionID: collectionID,
SourceID: sourceID,
Version: version,
Version: string(base.Uint64CASToLittleEndianHex(version)),
}
channelMap := make(channels.ChannelMap)
for _, channelName := range channelNames {
Expand Down
2 changes: 1 addition & 1 deletion db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func makeChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID)
// populate CurrentVersion entry if log entry has sourceID and Version populated
// This allows current version to be nil in event of CV not being populated on log entry
// allowing omitempty to work as expected
if logEntry.SourceID != "" && logEntry.Version != 0 {
if logEntry.SourceID != "" && logEntry.Version != "" {
change.CurrentVersion = &Version{SourceID: logEntry.SourceID, Value: logEntry.Version}
}
if logEntry.Flags&channels.Removed != 0 {
Expand Down
10 changes: 5 additions & 5 deletions db/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func TestCVPopulationOnChangeEntry(t *testing.T) {
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
collectionID := collection.GetCollectionID()
bucketUUID := db.BucketUUID
bucketUUID := db.EncodedBucketUUID

collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

Expand All @@ -314,9 +314,10 @@ func TestCVPopulationOnChangeEntry(t *testing.T) {
changes, err := collection.GetChanges(ctx, base.SetOf("A"), getChangesOptionsWithZeroSeq(t))
require.NoError(t, err)

encodedCAS := string(base.Uint64CASToLittleEndianHex(doc.Cas))
assert.Equal(t, doc.ID, changes[0].ID)
assert.Equal(t, bucketUUID, changes[0].CurrentVersion.SourceID)
assert.Equal(t, doc.Cas, changes[0].CurrentVersion.Value)
assert.Equal(t, encodedCAS, changes[0].CurrentVersion.Value)
}

func TestDocDeletionFromChannelCoalesced(t *testing.T) {
Expand Down Expand Up @@ -563,7 +564,7 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
collectionID := collection.GetCollectionID()
bucketUUID := db.BucketUUID
bucketUUID := db.EncodedBucketUUID
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

// Make channel active
Expand All @@ -578,7 +579,6 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {

syncData, err := collection.GetDocSyncData(ctx, "doc1")
require.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)

// get entry of above doc from channel cache
entries, err := db.channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), getChangesOptionsWithZeroSeq(t))
Expand All @@ -587,7 +587,7 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {

// assert that the source and version has been populated with the channel cache entry for the doc
assert.Equal(t, "doc1", entries[0].DocID)
assert.Equal(t, uintCAS, entries[0].Version)
assert.Equal(t, syncData.Cas, entries[0].Version)
assert.Equal(t, bucketUUID, entries[0].SourceID)
assert.Equal(t, syncData.HLV.SourceID, entries[0].SourceID)
assert.Equal(t, syncData.HLV.Version, entries[0].Version)
Expand Down
2 changes: 1 addition & 1 deletion db/changes_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIter

if queryRow.RemovalRev != nil {
entry.RevID = queryRow.RemovalRev.RevTreeID
entry.Version = base.HexCasToUint64(queryRow.RemovalRev.CurrentVersion)
entry.Version = queryRow.RemovalRev.CurrentVersion
entry.SourceID = queryRow.RemovalRev.CurrentSource
if queryRow.RemovalDel {
entry.SetDeleted()
Expand Down
3 changes: 2 additions & 1 deletion db/channel_cache_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,8 @@ func verifyCVEntries(entries []*LogEntry, cvs []cvValues) bool {
if entries[index].SourceID != cv.source {
return false
}
if entries[index].Version != cv.version {
encdedVrs := string(base.Uint64CASToLittleEndianHex(cv.version))
if entries[index].Version != encdedVrs {
return false
}
}
Expand Down
26 changes: 15 additions & 11 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (db *DatabaseCollectionWithUser) documentRevisionForRequest(ctx context.Con
return revision, nil
}

func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, cv *Version, includeBody bool) (revision DocumentRevision, err error) {
func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, cv *Version) (revision DocumentRevision, err error) {
if cv != nil {
revision, err = db.revisionCache.GetWithCV(ctx, docid, cv, RevCacheOmitDelta)
} else {
Expand Down Expand Up @@ -885,37 +885,38 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU
case ExistingVersion:
// preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space
d.HLV.ImportCAS = "" // remove importCAS for non-imports to save space
case Import:
if d.HLV.CurrentVersionCAS == d.Cas {
encodedCAS := string(base.Uint64CASToLittleEndianHex(d.Cas))
if d.HLV.CurrentVersionCAS == encodedCAS {
// if cvCAS = document CAS, the HLV has already been updated for this mutation by another HLV-aware peer.
// Set ImportCAS to the previous document CAS, but don't otherwise modify HLV
d.HLV.ImportCAS = d.Cas
d.HLV.ImportCAS = encodedCAS
} else {
// Otherwise this is an SDK mutation made by the local cluster that should be added to HLV.
newVVEntry := Version{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.SourceID = db.dbCtx.EncodedBucketUUID
newVVEntry.Value = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
}
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = d.Cas
d.HLV.ImportCAS = encodedCAS
}

case NewVersion, ExistingVersionWithUpdateToHLV:
// add a new entry to the version vector
newVVEntry := Version{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.SourceID = db.dbCtx.EncodedBucketUUID
newVVEntry.Value = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
}
// update the cvCAS on the SGWrite event too
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space
d.HLV.ImportCAS = "" // remove importCAS for non-imports to save space
}
return d, nil
}
Expand Down Expand Up @@ -1118,7 +1119,9 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
return nil, nil, false, nil, addNewerVersionsErr
}
} else {
if !docHLV.IsInConflict(*doc.HLV) {
incomingDecodedHLV := docHLV.ToDecodedHybridLogicalVector()
localDecodedHLV := doc.HLV.ToDecodedHybridLogicalVector()
if !incomingDecodedHLV.IsInConflict(localDecodedHLV) {
// update hlv for all newer incoming source version pairs
addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV)
if addNewerVersionsErr != nil {
Expand Down Expand Up @@ -2458,11 +2461,12 @@ func postWriteUpdateHLV(doc *Document, casOut uint64) *Document {
if doc.HLV == nil {
return doc
}
encodedCAS := string(base.Uint64CASToLittleEndianHex(casOut))
if doc.HLV.Version == hlvExpandMacroCASValue {
doc.HLV.Version = casOut
doc.HLV.Version = encodedCAS
}
if doc.HLV.CurrentVersionCAS == hlvExpandMacroCASValue {
doc.HLV.CurrentVersionCAS = casOut
doc.HLV.CurrentVersionCAS = encodedCAS
}
return doc
}
Expand Down
43 changes: 20 additions & 23 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

bucketUUID := db.BucketUUID
bucketUUID := db.EncodedBucketUUID
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

// create a new doc
Expand All @@ -1791,10 +1791,9 @@ func TestPutExistingCurrentVersion(t *testing.T) {
// assert on HLV on that above PUT
syncData, err := collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
assert.Equal(t, syncData.Cas, syncData.HLV.Version)
assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS)

// store the cas version allocated to the above doc creation for creation of incoming HLV later in test
originalDocVersion := syncData.HLV.Version
Expand All @@ -1809,17 +1808,18 @@ func TestPutExistingCurrentVersion(t *testing.T) {
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
docUpdateVersion := syncData.HLV.Version
docUpdateVersionInt := base.HexCasToUint64(docUpdateVersion)

// construct a mock doc update coming over a replicator
body = Body{"key1": "value2"}
newDoc := createTestDocument(key, "", body, false, 0)

// construct a HLV that simulates a doc update happening on a client
// this means moving the current source version pair to PV and adding new sourceID and version pair to CV
pv := make(map[string]uint64)
pv := make(map[string]string)
pv[bucketUUID] = originalDocVersion
// create a version larger than the allocated version above
incomingVersion := docUpdateVersion + 10
incomingVersion := string(base.Uint64CASToLittleEndianHex(docUpdateVersionInt + 10))
incomingHLV := HybridLogicalVector{
SourceID: "test",
Version: incomingVersion,
Expand All @@ -1843,11 +1843,10 @@ func TestPutExistingCurrentVersion(t *testing.T) {
// PV should contain the old CV pair
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
uintCAS = base.HexCasToUint64(syncData.Cas)

assert.Equal(t, "test", syncData.HLV.SourceID)
assert.Equal(t, incomingVersion, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS)
// update the pv map so we can assert we have correct pv map in HLV
pv[bucketUUID] = docUpdateVersion
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
Expand All @@ -1865,7 +1864,7 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

bucketUUID := db.BucketUUID
bucketUUID := db.EncodedBucketUUID
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

// create a new doc
Expand All @@ -1878,17 +1877,16 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
// assert on the HLV values after the above creation of the doc
syncData, err := collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
assert.Equal(t, syncData.Cas, syncData.HLV.Version)
assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS)

// create a new doc update to simulate a doc update arriving over replicator from, client
body = Body{"key1": "value2"}
newDoc := createTestDocument(key, "", body, false, 0)
incomingHLV := HybridLogicalVector{
SourceID: "test",
Version: 1234,
Version: string(base.Uint64CASToLittleEndianHex(1234)),
}

// grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function
Expand All @@ -1906,8 +1904,8 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
assert.Equal(t, syncData.Cas, syncData.HLV.Version)
assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS)
}

// TestPutExistingCurrentVersionWithNoExistingDoc:
Expand All @@ -1927,10 +1925,10 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {

// construct a HLV that simulates a doc update happening on a client
// this means moving the current source version pair to PV and adding new sourceID and version pair to CV
pv := make(map[string]uint64)
pv[bucketUUID] = 2
pv := make(map[string]string)
pv[bucketUUID] = string(base.Uint64CASToLittleEndianHex(uint64(2)))
// create a version larger than the allocated version above
incomingVersion := uint64(2 + 10)
incomingVersion := string(base.Uint64CASToLittleEndianHex(uint64(2 + 10)))
incomingHLV := HybridLogicalVector{
SourceID: "test",
Version: incomingVersion,
Expand All @@ -1950,10 +1948,9 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
// PV should contain the old CV pair
syncData, err := collection.GetDocSyncData(ctx, "doc2")
assert.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, "test", syncData.HLV.SourceID)
assert.Equal(t, incomingVersion, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS)
// update the pv map so we can assert we have correct pv map in HLV
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
Expand Down Expand Up @@ -2006,7 +2003,7 @@ func TestGetCVWithDocResidentInCache(t *testing.T) {
vrs := doc.HLV.Version
src := doc.HLV.SourceID
sv := &Version{Value: vrs, SourceID: src}
revision, err := collection.GetCV(ctx, docID, sv, true)
revision, err := collection.GetCV(ctx, docID, sv)
require.NoError(t, err)
if testCase.access {
assert.Equal(t, rev, revision.RevID)
Expand Down Expand Up @@ -2065,7 +2062,7 @@ func TestGetByCVForDocNotResidentInCache(t *testing.T) {
vrs := doc.HLV.Version
src := doc.HLV.SourceID
sv := &Version{Value: vrs, SourceID: src}
revision, err := collection.GetCV(ctx, doc1ID, sv, true)
revision, err := collection.GetCV(ctx, doc1ID, sv)
require.NoError(t, err)

// assert the fetched doc is the first doc we added and assert that we did in fact get cache miss
Expand Down Expand Up @@ -2119,7 +2116,7 @@ func TestGetCVActivePathway(t *testing.T) {
revBody := Body{"channels": testCase.docChannels}
rev, doc, err := collection.Put(ctx, docID, revBody)
require.NoError(t, err)
revision, err := collection.GetCV(ctx, docID, nil, true)
revision, err := collection.GetCV(ctx, docID, nil)

if testCase.access == true {
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package db

import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -102,6 +103,7 @@ type DatabaseContext struct {
Bucket base.Bucket // Storage
BucketSpec base.BucketSpec // The BucketSpec
BucketUUID string // The bucket UUID for the bucket the database is created against
EncodedBucketUUID string // The bucket UUID for the bucket the database is created against but encoded in base64
BucketLock sync.RWMutex // Control Access to the underlying bucket object
mutationListener changeListener // Caching feed listener
ImportListener *importListener // Import feed listener
Expand Down Expand Up @@ -424,6 +426,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
MetadataStore: metadataStore,
Bucket: bucket,
BucketUUID: bucketUUID,
EncodedBucketUUID: base64.StdEncoding.EncodeToString([]byte(bucketUUID)),
StartTime: time.Now(),
autoImport: autoImport,
Options: options,
Expand Down
Loading

0 comments on commit 0943429

Please sign in to comment.