From eedbd4a5c46180e75f632202b2463602d7dc92f8 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com> Date: Thu, 1 Feb 2024 14:53:49 +0000 Subject: [PATCH] CBG-3719: convert in memory format of HLV to match XDCR/CBL format (#6655) * CBG-3719: change in memory format of hlv to match XDCR/CBL format * updates after rebase * updates to fix missed type swap in channels package * update to add encoded bucket UUID to db contect, this allows us to avoid overhead associated with encoding bucketUUID each time a HLV is updated * updates after rebase * updates based off review --- channels/log_entry.go | 4 +- db/blip_sync_context.go | 3 +- db/change_cache.go | 4 +- db/change_cache_test.go | 2 +- db/changes.go | 2 +- db/changes_test.go | 10 +- db/changes_view.go | 2 +- db/channel_cache_single_test.go | 3 +- db/crud.go | 24 ++- db/crud_test.go | 37 ++-- db/database.go | 3 + db/database_test.go | 15 +- db/document.go | 2 +- db/document_test.go | 33 +-- db/hybrid_logical_vector.go | 274 ++++++++++++------------- db/hybrid_logical_vector_test.go | 118 +++++------ db/revision_cache_interface.go | 2 +- db/revision_cache_test.go | 46 +++-- db/util_testing.go | 6 +- db/utilities_hlv_testing.go | 7 +- rest/api_test.go | 31 ++- rest/blip_api_crud_test.go | 6 +- rest/changes_test.go | 4 +- rest/changestest/changes_api_test.go | 2 +- rest/replicatortest/replicator_test.go | 16 +- 25 files changed, 315 insertions(+), 341 deletions(-) diff --git a/channels/log_entry.go b/channels/log_entry.go index dbf68a3aa1..2bf772d79c 100644 --- a/channels/log_entry.go +++ b/channels/log_entry.go @@ -57,12 +57,12 @@ type LogEntry struct { IsPrincipal bool // Whether the log-entry is a tracking entry for a principal doc CollectionID uint32 // Collection ID SourceID string // SourceID allocated to the doc's Current Version on the HLV - Version uint64 // Version allocated to the doc's Current Version on the HLV + Version string // Version allocated to the doc's Current Version on the HLV } func (l LogEntry) String() string { return fmt.Sprintf( - "seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d source: %s version: %d", + "seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d source: %s version: %s", l.Sequence, l.DocID, l.RevID, diff --git a/db/blip_sync_context.go b/db/blip_sync_context.go index 36e1aa6c06..ec1146885e 100644 --- a/db/blip_sync_context.go +++ b/db/blip_sync_context.go @@ -618,8 +618,7 @@ func (bsc *BlipSyncContext) sendRevision(sender *blip.Sender, docID, rev string, if vrsErr != nil { return vrsErr } - // replace with GetCV pending merge of CBG-3212 - docRev, err = handleChangesResponseCollection.revisionCache.GetWithCV(bsc.loggingCtx, docID, &version, RevCacheOmitBody, RevCacheOmitDelta) + docRev, err = handleChangesResponseCollection.GetCV(bsc.loggingCtx, docID, &version, RevCacheOmitBody) } if base.IsDocNotFoundError(err) { return bsc.sendNoRev(sender, docID, rev, collectionIdx, seq, err) diff --git a/db/change_cache.go b/db/change_cache.go index 074aa963e1..57e04bf755 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -124,7 +124,7 @@ func (entry *LogEntry) SetRevAndVersion(rv channels.RevAndVersion) { entry.RevID = rv.RevTreeID if rv.CurrentSource != "" { entry.SourceID = rv.CurrentSource - entry.Version = base.HexCasToUint64(rv.CurrentVersion) + entry.Version = rv.CurrentVersion } } @@ -495,7 +495,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) { change.DocID = docID change.RevID = atRev.RevTreeID change.SourceID = atRev.CurrentSource - change.Version = base.HexCasToUint64(atRev.CurrentVersion) + change.Version = atRev.CurrentVersion change.Channels = channelRemovals } diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 12b7ec3529..f898dcc66b 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -82,7 +82,7 @@ func testLogEntryWithCV(seq uint64, docid string, revid string, channelNames []s TimeReceived: time.Now(), CollectionID: collectionID, SourceID: sourceID, - Version: version, + Version: string(base.Uint64CASToLittleEndianHex(version)), } channelMap := make(channels.ChannelMap) for _, channelName := range channelNames { diff --git a/db/changes.go b/db/changes.go index 04e651862c..da27ccf1c5 100644 --- a/db/changes.go +++ b/db/changes.go @@ -485,7 +485,7 @@ func makeChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID) // populate CurrentVersion entry if log entry has sourceID and Version populated // This allows current version to be nil in event of CV not being populated on log entry // allowing omitempty to work as expected - if logEntry.SourceID != "" && logEntry.Version != 0 { + if logEntry.SourceID != "" && logEntry.Version != "" { change.CurrentVersion = &Version{SourceID: logEntry.SourceID, Value: logEntry.Version} } if logEntry.Flags&channels.Removed != 0 { diff --git a/db/changes_test.go b/db/changes_test.go index 7c2cf4ccc3..ea7800b5d3 100644 --- a/db/changes_test.go +++ b/db/changes_test.go @@ -295,7 +295,7 @@ func TestCVPopulationOnChangeEntry(t *testing.T) { defer db.Close(ctx) collection := GetSingleDatabaseCollectionWithUser(t, db) collectionID := collection.GetCollectionID() - bucketUUID := db.BucketUUID + bucketUUID := db.EncodedBucketUUID collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout) @@ -318,9 +318,10 @@ func TestCVPopulationOnChangeEntry(t *testing.T) { changes, err := collection.GetChanges(ctx, base.SetOf("A"), getChangesOptionsWithZeroSeq(t)) require.NoError(t, err) + encodedCAS := string(base.Uint64CASToLittleEndianHex(doc.Cas)) assert.Equal(t, doc.ID, changes[0].ID) assert.Equal(t, bucketUUID, changes[0].CurrentVersion.SourceID) - assert.Equal(t, doc.Cas, changes[0].CurrentVersion.Value) + assert.Equal(t, encodedCAS, changes[0].CurrentVersion.Value) } func TestDocDeletionFromChannelCoalesced(t *testing.T) { @@ -567,7 +568,7 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) { defer db.Close(ctx) collection := GetSingleDatabaseCollectionWithUser(t, db) collectionID := collection.GetCollectionID() - bucketUUID := db.BucketUUID + bucketUUID := db.EncodedBucketUUID collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout) // Make channel active @@ -582,7 +583,6 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) { syncData, err := collection.GetDocSyncData(ctx, "doc1") require.NoError(t, err) - uintCAS := base.HexCasToUint64(syncData.Cas) // get entry of above doc from channel cache entries, err := db.channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), getChangesOptionsWithZeroSeq(t)) @@ -591,7 +591,7 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) { // assert that the source and version has been populated with the channel cache entry for the doc assert.Equal(t, "doc1", entries[0].DocID) - assert.Equal(t, uintCAS, entries[0].Version) + assert.Equal(t, syncData.Cas, entries[0].Version) assert.Equal(t, bucketUUID, entries[0].SourceID) assert.Equal(t, syncData.HLV.SourceID, entries[0].SourceID) assert.Equal(t, syncData.HLV.Version, entries[0].Version) diff --git a/db/changes_view.go b/db/changes_view.go index 4d4e36f03f..8ff138d002 100644 --- a/db/changes_view.go +++ b/db/changes_view.go @@ -69,7 +69,7 @@ func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIter if queryRow.RemovalRev != nil { entry.RevID = queryRow.RemovalRev.RevTreeID - entry.Version = base.HexCasToUint64(queryRow.RemovalRev.CurrentVersion) + entry.Version = queryRow.RemovalRev.CurrentVersion entry.SourceID = queryRow.RemovalRev.CurrentSource if queryRow.RemovalDel { entry.SetDeleted() diff --git a/db/channel_cache_single_test.go b/db/channel_cache_single_test.go index d0431ab4c9..084f0412cc 100644 --- a/db/channel_cache_single_test.go +++ b/db/channel_cache_single_test.go @@ -961,7 +961,8 @@ func verifyCVEntries(entries []*LogEntry, cvs []cvValues) bool { if entries[index].SourceID != cv.source { return false } - if entries[index].Version != cv.version { + encdedVrs := string(base.Uint64CASToLittleEndianHex(cv.version)) + if entries[index].Version != encdedVrs { return false } } diff --git a/db/crud.go b/db/crud.go index d38e6d515d..7a45186802 100644 --- a/db/crud.go +++ b/db/crud.go @@ -906,29 +906,30 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU case ExistingVersion: // preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue - d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space + d.HLV.ImportCAS = "" // remove importCAS for non-imports to save space case Import: - if d.HLV.CurrentVersionCAS == d.Cas { + encodedCAS := string(base.Uint64CASToLittleEndianHex(d.Cas)) + if d.HLV.CurrentVersionCAS == encodedCAS { // if cvCAS = document CAS, the HLV has already been updated for this mutation by another HLV-aware peer. // Set ImportCAS to the previous document CAS, but don't otherwise modify HLV - d.HLV.ImportCAS = d.Cas + d.HLV.ImportCAS = encodedCAS } else { // Otherwise this is an SDK mutation made by the local cluster that should be added to HLV. newVVEntry := Version{} - newVVEntry.SourceID = db.dbCtx.BucketUUID + newVVEntry.SourceID = db.dbCtx.EncodedBucketUUID newVVEntry.Value = hlvExpandMacroCASValue err := d.SyncData.HLV.AddVersion(newVVEntry) if err != nil { return nil, err } d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue - d.HLV.ImportCAS = d.Cas + d.HLV.ImportCAS = encodedCAS } case NewVersion, ExistingVersionWithUpdateToHLV: // add a new entry to the version vector newVVEntry := Version{} - newVVEntry.SourceID = db.dbCtx.BucketUUID + newVVEntry.SourceID = db.dbCtx.EncodedBucketUUID newVVEntry.Value = hlvExpandMacroCASValue err := d.SyncData.HLV.AddVersion(newVVEntry) if err != nil { @@ -936,7 +937,7 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU } // update the cvCAS on the SGWrite event too d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue - d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space + d.HLV.ImportCAS = "" // remove importCAS for non-imports to save space } return d, nil } @@ -1137,7 +1138,9 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont return nil, nil, false, nil, addNewerVersionsErr } } else { - if !docHLV.IsInConflict(*doc.HLV) { + incomingDecodedHLV := docHLV.ToDecodedHybridLogicalVector() + localDecodedHLV := doc.HLV.ToDecodedHybridLogicalVector() + if !incomingDecodedHLV.IsInConflict(localDecodedHLV) { // update hlv for all newer incoming source version pairs addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV) if addNewerVersionsErr != nil { @@ -2314,11 +2317,12 @@ func postWriteUpdateHLV(doc *Document, casOut uint64) *Document { if doc.HLV == nil { return doc } + encodedCAS := string(base.Uint64CASToLittleEndianHex(casOut)) if doc.HLV.Version == hlvExpandMacroCASValue { - doc.HLV.Version = casOut + doc.HLV.Version = encodedCAS } if doc.HLV.CurrentVersionCAS == hlvExpandMacroCASValue { - doc.HLV.CurrentVersionCAS = casOut + doc.HLV.CurrentVersionCAS = encodedCAS } return doc } diff --git a/db/crud_test.go b/db/crud_test.go index b6b1f51764..97e6ebc391 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -1688,7 +1688,7 @@ func TestPutExistingCurrentVersion(t *testing.T) { db, ctx := setupTestDB(t) defer db.Close(ctx) - bucketUUID := db.BucketUUID + bucketUUID := db.EncodedBucketUUID collection := GetSingleDatabaseCollectionWithUser(t, db) // create a new doc @@ -1701,10 +1701,9 @@ func TestPutExistingCurrentVersion(t *testing.T) { // assert on HLV on that above PUT syncData, err := collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) - uintCAS := base.HexCasToUint64(syncData.Cas) assert.Equal(t, bucketUUID, syncData.HLV.SourceID) - assert.Equal(t, uintCAS, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.Version) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) // store the cas version allocated to the above doc creation for creation of incoming HLV later in test originalDocVersion := syncData.HLV.Version @@ -1719,6 +1718,7 @@ func TestPutExistingCurrentVersion(t *testing.T) { syncData, err = collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) docUpdateVersion := syncData.HLV.Version + docUpdateVersionInt := base.HexCasToUint64(docUpdateVersion) // construct a mock doc update coming over a replicator body = Body{"key1": "value2"} @@ -1726,10 +1726,10 @@ func TestPutExistingCurrentVersion(t *testing.T) { // construct a HLV that simulates a doc update happening on a client // this means moving the current source version pair to PV and adding new sourceID and version pair to CV - pv := make(map[string]uint64) + pv := make(map[string]string) pv[bucketUUID] = originalDocVersion // create a version larger than the allocated version above - incomingVersion := docUpdateVersion + 10 + incomingVersion := string(base.Uint64CASToLittleEndianHex(docUpdateVersionInt + 10)) incomingHLV := HybridLogicalVector{ SourceID: "test", Version: incomingVersion, @@ -1753,11 +1753,10 @@ func TestPutExistingCurrentVersion(t *testing.T) { // PV should contain the old CV pair syncData, err = collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) - uintCAS = base.HexCasToUint64(syncData.Cas) assert.Equal(t, "test", syncData.HLV.SourceID) assert.Equal(t, incomingVersion, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) // update the pv map so we can assert we have correct pv map in HLV pv[bucketUUID] = docUpdateVersion assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv)) @@ -1775,7 +1774,7 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) { db, ctx := setupTestDB(t) defer db.Close(ctx) - bucketUUID := db.BucketUUID + bucketUUID := db.EncodedBucketUUID collection := GetSingleDatabaseCollectionWithUser(t, db) // create a new doc @@ -1788,17 +1787,16 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) { // assert on the HLV values after the above creation of the doc syncData, err := collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) - uintCAS := base.HexCasToUint64(syncData.Cas) assert.Equal(t, bucketUUID, syncData.HLV.SourceID) - assert.Equal(t, uintCAS, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.Version) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) // create a new doc update to simulate a doc update arriving over replicator from, client body = Body{"key1": "value2"} newDoc := createTestDocument(key, "", body, false, 0) incomingHLV := HybridLogicalVector{ SourceID: "test", - Version: 1234, + Version: string(base.Uint64CASToLittleEndianHex(1234)), } // grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function @@ -1816,8 +1814,8 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) { syncData, err = collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) assert.Equal(t, bucketUUID, syncData.HLV.SourceID) - assert.Equal(t, uintCAS, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.Version) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) } // TestPutExistingCurrentVersionWithNoExistingDoc: @@ -1837,10 +1835,10 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) { // construct a HLV that simulates a doc update happening on a client // this means moving the current source version pair to PV and adding new sourceID and version pair to CV - pv := make(map[string]uint64) - pv[bucketUUID] = 2 + pv := make(map[string]string) + pv[bucketUUID] = string(base.Uint64CASToLittleEndianHex(uint64(2))) // create a version larger than the allocated version above - incomingVersion := uint64(2 + 10) + incomingVersion := string(base.Uint64CASToLittleEndianHex(uint64(2 + 10))) incomingHLV := HybridLogicalVector{ SourceID: "test", Version: incomingVersion, @@ -1860,10 +1858,9 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) { // PV should contain the old CV pair syncData, err := collection.GetDocSyncData(ctx, "doc2") assert.NoError(t, err) - uintCAS := base.HexCasToUint64(syncData.Cas) assert.Equal(t, "test", syncData.HLV.SourceID) assert.Equal(t, incomingVersion, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) // update the pv map so we can assert we have correct pv map in HLV assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv)) assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev) diff --git a/db/database.go b/db/database.go index be69d5c8a8..23302abedf 100644 --- a/db/database.go +++ b/db/database.go @@ -10,6 +10,7 @@ package db import ( "context" + "encoding/base64" "errors" "fmt" "net/http" @@ -98,6 +99,7 @@ type DatabaseContext struct { Bucket base.Bucket // Storage BucketSpec base.BucketSpec // The BucketSpec BucketUUID string // The bucket UUID for the bucket the database is created against + EncodedBucketUUID string // The bucket UUID for the bucket the database is created against but encoded in base64 BucketLock sync.RWMutex // Control Access to the underlying bucket object mutationListener changeListener // Caching feed listener ImportListener *importListener // Import feed listener @@ -420,6 +422,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket, MetadataStore: metadataStore, Bucket: bucket, BucketUUID: bucketUUID, + EncodedBucketUUID: base64.StdEncoding.EncodeToString([]byte(bucketUUID)), StartTime: time.Now(), autoImport: autoImport, Options: options, diff --git a/db/database_test.go b/db/database_test.go index f8e12256fa..e3c42c72ea 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -1054,7 +1054,7 @@ func TestConflicts(t *testing.T) { db, ctx := setupTestDB(t) defer db.Close(ctx) collection := GetSingleDatabaseCollectionWithUser(t, db) - bucketUUID := db.BucketUUID + bucketUUID := db.EncodedBucketUUID collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout) @@ -1126,8 +1126,7 @@ func TestConflicts(t *testing.T) { Conflicts: true, ChangesCtx: base.TestCtx(t), } - fetchedDoc, _, err := collection.GetDocWithXattr(ctx, "doc", DocUnmarshalCAS) - require.NoError(t, err) + source, version := collection.GetDocumentCurrentVersion(t, "doc") changes, err := collection.GetChanges(ctx, channels.BaseSetOf(t, "all"), options) assert.NoError(t, err, "Couldn't GetChanges") @@ -1138,7 +1137,7 @@ func TestConflicts(t *testing.T) { Changes: []ChangeRev{{"rev": "2-b"}, {"rev": "2-a"}}, branched: true, collectionID: collectionID, - CurrentVersion: &Version{SourceID: bucketUUID, Value: fetchedDoc.Cas}, + CurrentVersion: &Version{SourceID: source, Value: version}, }, changes[0], ) @@ -1174,7 +1173,7 @@ func TestConflicts(t *testing.T) { Changes: []ChangeRev{{"rev": "2-a"}, {"rev": rev3}}, branched: true, collectionID: collectionID, - CurrentVersion: &Version{SourceID: bucketUUID, Value: doc.Cas}, + CurrentVersion: &Version{SourceID: bucketUUID, Value: string(base.Uint64CASToLittleEndianHex(doc.Cas))}, }, changes[0]) } @@ -1467,7 +1466,7 @@ func TestSyncFnOnPush(t *testing.T) { require.NoError(t, err) assert.Equal(t, channels.ChannelMap{ "clibup": nil, - "public": &channels.ChannelRemoval{Seq: 2, Rev: channels.RevAndVersion{RevTreeID: "4-four", CurrentSource: newDoc.HLV.SourceID, CurrentVersion: string(base.Uint64CASToLittleEndianHex(newDoc.HLV.Version))}}, + "public": &channels.ChannelRemoval{Seq: 2, Rev: channels.RevAndVersion{RevTreeID: "4-four", CurrentSource: newDoc.HLV.SourceID, CurrentVersion: newDoc.HLV.Version}}, }, doc.Channels) assert.Equal(t, base.SetOf("clibup"), doc.History["4-four"].Channels) @@ -1904,7 +1903,7 @@ func TestChannelQuery(t *testing.T) { log.Printf("removedDocEntry Version: %v", removedDocEntry.Version) require.Equal(t, testCase.expectedRev.RevTreeID, removedDocEntry.RevID) require.Equal(t, testCase.expectedRev.CurrentSource, removedDocEntry.SourceID) - require.Equal(t, base.HexCasToUint64(testCase.expectedRev.CurrentVersion), removedDocEntry.Version) + require.Equal(t, testCase.expectedRev.CurrentVersion, removedDocEntry.Version) }) } @@ -1983,7 +1982,7 @@ func TestChannelQueryRevocation(t *testing.T) { log.Printf("removedDocEntry Version: %v", removedDocEntry.Version) require.Equal(t, testCase.expectedRev.RevTreeID, removedDocEntry.RevID) require.Equal(t, testCase.expectedRev.CurrentSource, removedDocEntry.SourceID) - require.Equal(t, base.HexCasToUint64(testCase.expectedRev.CurrentVersion), removedDocEntry.Version) + require.Equal(t, testCase.expectedRev.CurrentVersion, removedDocEntry.Version) }) } diff --git a/db/document.go b/db/document.go index a3efdd81a0..71fd1d03c9 100644 --- a/db/document.go +++ b/db/document.go @@ -1297,7 +1297,7 @@ func (s *SyncData) GetRevAndVersion() (rav channels.RevAndVersion) { rav.RevTreeID = s.CurrentRev if s.HLV != nil { rav.CurrentSource = s.HLV.SourceID - rav.CurrentVersion = string(base.Uint64CASToLittleEndianHex(s.HLV.Version)) + rav.CurrentVersion = s.HLV.Version } return rav } diff --git a/db/document_test.go b/db/document_test.go index f28b750397..52820e1389 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -251,11 +251,11 @@ const doc_meta_with_vv = `{ }` func TestParseVersionVectorSyncData(t *testing.T) { - mv := make(map[string]uint64) - pv := make(map[string]uint64) - mv["s_LhRPsa7CpjEvP5zeXTXEBA"] = 1628620455147864000 - mv["s_NqiIe0LekFPLeX4JvTO6Iw"] = 1628620455139868700 - pv["s_YZvBpEaztom9z5V/hDoeIw"] = 1628620455135215600 + mv := make(map[string]string) + pv := make(map[string]string) + mv["s_LhRPsa7CpjEvP5zeXTXEBA"] = "c0ff05d7ac059a16" + mv["s_NqiIe0LekFPLeX4JvTO6Iw"] = "1c008cd6ac059a16" + pv["s_YZvBpEaztom9z5V/hDoeIw"] = "f0ff44d6ac059a16" ctx := base.TestCtx(t) @@ -263,9 +263,10 @@ func TestParseVersionVectorSyncData(t *testing.T) { doc, err := unmarshalDocumentWithXattr(ctx, "doc_1k", nil, doc_meta, nil, 1, DocUnmarshalVV) require.NoError(t, err) + strCAS := string(base.Uint64CASToLittleEndianHex(123456)) // assert on doc version vector values - assert.Equal(t, uint64(123456), doc.SyncData.HLV.CurrentVersionCAS) - assert.Equal(t, uint64(123456), doc.SyncData.HLV.Version) + assert.Equal(t, strCAS, doc.SyncData.HLV.CurrentVersionCAS) + assert.Equal(t, strCAS, doc.SyncData.HLV.Version) assert.Equal(t, "cb06dc003846116d9b66d2ab23887a96", doc.SyncData.HLV.SourceID) assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) @@ -274,8 +275,8 @@ func TestParseVersionVectorSyncData(t *testing.T) { require.NoError(t, err) // assert on doc version vector values - assert.Equal(t, uint64(123456), doc.SyncData.HLV.CurrentVersionCAS) - assert.Equal(t, uint64(123456), doc.SyncData.HLV.Version) + assert.Equal(t, strCAS, doc.SyncData.HLV.CurrentVersionCAS) + assert.Equal(t, strCAS, doc.SyncData.HLV.Version) assert.Equal(t, "cb06dc003846116d9b66d2ab23887a96", doc.SyncData.HLV.SourceID) assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) @@ -284,8 +285,8 @@ func TestParseVersionVectorSyncData(t *testing.T) { require.NoError(t, err) // assert on doc version vector values - assert.Equal(t, uint64(123456), doc.SyncData.HLV.CurrentVersionCAS) - assert.Equal(t, uint64(123456), doc.SyncData.HLV.Version) + assert.Equal(t, strCAS, doc.SyncData.HLV.CurrentVersionCAS) + assert.Equal(t, strCAS, doc.SyncData.HLV.Version) assert.Equal(t, "cb06dc003846116d9b66d2ab23887a96", doc.SyncData.HLV.SourceID) assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) @@ -299,31 +300,31 @@ func TestRevAndVersion(t *testing.T) { testName string revTreeID string source string - version uint64 + version string }{ { testName: "rev_and_version", revTreeID: "1-abc", source: "source1", - version: 1, + version: "1", }, { testName: "both_empty", revTreeID: "", source: "", - version: 0, + version: "0", }, { testName: "revTreeID_only", revTreeID: "1-abc", source: "", - version: 0, + version: "0", }, { testName: "currentVersion_only", revTreeID: "", source: "source1", - version: 1, + version: "1", }, } diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 682d88a073..90d0c572c2 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -11,7 +11,6 @@ package db import ( "encoding/base64" "fmt" - "math" "strings" sgbucket "github.com/couchbase/sg-bucket" @@ -19,27 +18,52 @@ import ( ) // hlvExpandMacroCASValue causes the field to be populated by CAS value by macro expansion -const hlvExpandMacroCASValue = math.MaxUint64 +const hlvExpandMacroCASValue = "expand" -// HybridLogicalVector (HLV) is a type that represents a vector of Hybrid Logical Clocks. -type HybridLogicalVector struct { - CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS at the time of replication +// HybridLogicalVectorInterface is an interface to contain methods that will operate on both a decoded HLV and encoded HLV +type HybridLogicalVectorInterface interface { + GetVersion(sourceID string) (uint64, bool) +} + +var _ HybridLogicalVectorInterface = &HybridLogicalVector{} +var _ HybridLogicalVectorInterface = &DecodedHybridLogicalVector{} + +// DecodedHybridLogicalVector (HLV) is a type that represents a decoded vector of Hybrid Logical Clocks. +type DecodedHybridLogicalVector struct { + CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS in uint64 type at the time of replication ImportCAS uint64 // Set when an import modifies the document CAS but preserves the HLV (import of a version replicated by XDCR) - SourceID string // source bucket uuid of where this entry originated from - Version uint64 // current cas of the current version on the version vector + SourceID string // source bucket uuid in (base64 encoded format) of where this entry originated from + Version uint64 // current cas in uint64 format of the current version on the version vector MergeVersions map[string]uint64 // map of merge versions for fast efficient lookup PreviousVersions map[string]uint64 // map of previous versions for fast efficient lookup } // Version is representative of a single entry in a HybridLogicalVector. type Version struct { + // SourceID is an ID representing the source of the value (e.g. Couchbase Lite ID) + SourceID string `json:"source_id"` + // Value is a Hybrid Logical Clock value (In Couchbase Server, CAS is a HLC) + Value string `json:"version"` +} + +// DecodedVersion is a sourceID and version pair in string/uint64 format for use in conflict detection +type DecodedVersion struct { // SourceID is an ID representing the source of the value (e.g. Couchbase Lite ID) SourceID string `json:"source_id"` // Value is a Hybrid Logical Clock value (In Couchbase Server, CAS is a HLC) Value uint64 `json:"version"` } -func CreateVersion(source string, version uint64) Version { +// CreateDecodedVersion creates a sourceID and version pair in string/uint64 format +func CreateDecodedVersion(source string, version uint64) DecodedVersion { + return DecodedVersion{ + SourceID: source, + Value: version, + } +} + +// CreateVersion creates an encoded sourceID and version pair +func CreateVersion(source, version string) Version { return Version{ SourceID: source, Value: version, @@ -51,22 +75,23 @@ func CreateVersionFromString(versionString string) (version Version, err error) if !found { return version, fmt.Errorf("Malformed version string %s, delimiter not found", versionString) } - sourceBytes, err := base64.StdEncoding.DecodeString(sourceBase64) - if err != nil { - return version, fmt.Errorf("Unable to decode sourceID for version %s: %w", versionString, err) - } - version.SourceID = string(sourceBytes) - version.Value = base.HexCasToUint64(timestampString) + version.SourceID = sourceBase64 + version.Value = timestampString return version, nil } // String returns a Couchbase Lite-compatible string representation of the version. -func (v Version) String() string { +func (v DecodedVersion) String() string { timestamp := string(base.Uint64CASToLittleEndianHex(v.Value)) source := base64.StdEncoding.EncodeToString([]byte(v.SourceID)) return timestamp + "@" + source } +// String returns a version/sourceID pair in CBL string format +func (v Version) String() string { + return v.Value + "@" + v.SourceID +} + // ExtractCurrentVersionFromHLV will take the current version form the HLV struct and return it in the Version struct func (hlv *HybridLogicalVector) ExtractCurrentVersionFromHLV() *Version { src, vrs := hlv.GetCurrentVersion() @@ -76,25 +101,25 @@ func (hlv *HybridLogicalVector) ExtractCurrentVersionFromHLV() *Version { // PersistedHybridLogicalVector is the marshalled format of HybridLogicalVector. // This representation needs to be kept in sync with XDCR. -type PersistedHybridLogicalVector struct { - CurrentVersionCAS string `json:"cvCas,omitempty"` - ImportCAS string `json:"importCAS,omitempty"` - SourceID string `json:"src"` - Version string `json:"vrs"` - MergeVersions map[string]string `json:"mv,omitempty"` - PreviousVersions map[string]string `json:"pv,omitempty"` +type HybridLogicalVector struct { + CurrentVersionCAS string `json:"cvCas,omitempty"` // current version cas (or cvCAS) stores the current CAS in little endian hex format at the time of replication + ImportCAS string `json:"importCAS,omitempty"` // Set when an import modifies the document CAS but preserves the HLV (import of a version replicated by XDCR) + SourceID string `json:"src"` // source bucket uuid in (base64 encoded format) of where this entry originated from + Version string `json:"vrs"` // current cas in little endian hex format of the current version on the version vector + MergeVersions map[string]string `json:"mv,omitempty"` // map of merge versions for fast efficient lookup + PreviousVersions map[string]string `json:"pv,omitempty"` // map of previous versions for fast efficient lookup } // NewHybridLogicalVector returns an initialised HybridLogicalVector. func NewHybridLogicalVector() HybridLogicalVector { return HybridLogicalVector{ - PreviousVersions: make(map[string]uint64), - MergeVersions: make(map[string]uint64), + PreviousVersions: make(map[string]string), + MergeVersions: make(map[string]string), } } // GetCurrentVersion returns the current version from the HLV in memory. -func (hlv *HybridLogicalVector) GetCurrentVersion() (string, uint64) { +func (hlv *HybridLogicalVector) GetCurrentVersion() (string, string) { return hlv.SourceID, hlv.Version } @@ -111,7 +136,7 @@ func (hlv *HybridLogicalVector) GetCurrentVersionString() string { } // IsInConflict tests to see if in memory HLV is conflicting with another HLV -func (hlv *HybridLogicalVector) IsInConflict(otherVector HybridLogicalVector) bool { +func (hlv *DecodedHybridLogicalVector) IsInConflict(otherVector DecodedHybridLogicalVector) bool { // test if either HLV(A) or HLV(B) are dominating over each other. If so they are not in conflict if hlv.isDominating(otherVector) || otherVector.isDominating(*hlv) { return false @@ -122,8 +147,13 @@ func (hlv *HybridLogicalVector) IsInConflict(otherVector HybridLogicalVector) bo // AddVersion adds newVersion to the in memory representation of the HLV. func (hlv *HybridLogicalVector) AddVersion(newVersion Version) error { - if newVersion.Value < hlv.Version { - return fmt.Errorf("attempting to add new version vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.Value) + var newVersionCAS uint64 + hlvVersionCAS := base.HexCasToUint64(hlv.Version) + if newVersion.Value != hlvExpandMacroCASValue { + newVersionCAS = base.HexCasToUint64(newVersion.Value) + if newVersionCAS < hlvVersionCAS { + return fmt.Errorf("attempting to add new version vector entry with a CAS that is less than the current version CAS value. Current cas: %s new cas %s", hlv.Version, newVersion.Value) + } } // check if this is the first time we're adding a source - version pair if hlv.SourceID == "" { @@ -138,16 +168,17 @@ func (hlv *HybridLogicalVector) AddVersion(newVersion Version) error { } // if we get here this is a new version from a different sourceID thus need to move current sourceID to previous versions and update current version if hlv.PreviousVersions == nil { - hlv.PreviousVersions = make(map[string]uint64) + hlv.PreviousVersions = make(map[string]string) } // we need to check if source ID already exists in PV, if so we need to ensure we are only updating with the // sourceID-version pair if incoming version is greater than version already there if currPVVersion, ok := hlv.PreviousVersions[hlv.SourceID]; ok { // if we get here source ID exists in PV, only replace version if it is less than the incoming version - if currPVVersion < hlv.Version { + currPVVersionCAS := base.HexCasToUint64(currPVVersion) + if currPVVersionCAS < hlvVersionCAS { hlv.PreviousVersions[hlv.SourceID] = hlv.Version } else { - return fmt.Errorf("local hlv has current source in previous versiosn with version greater than current version. Current CAS: %d, PV CAS %d", hlv.Version, currPVVersion) + return fmt.Errorf("local hlv has current source in previous versiosn with version greater than current version. Current CAS: %s, PV CAS %s", hlv.Version, currPVVersion) } } else { // source doesn't exist in PV so add @@ -162,7 +193,7 @@ func (hlv *HybridLogicalVector) AddVersion(newVersion Version) error { // TODO: Does this need to remove source from current version as well? Merge Versions? func (hlv *HybridLogicalVector) Remove(source string) error { // if entry is not found in previous versions we return error - if hlv.PreviousVersions[source] == 0 { + if hlv.PreviousVersions[source] == "" { return base.ErrNotFound } delete(hlv.PreviousVersions, source) @@ -170,7 +201,7 @@ func (hlv *HybridLogicalVector) Remove(source string) error { } // isDominating tests if in memory HLV is dominating over another -func (hlv *HybridLogicalVector) isDominating(otherVector HybridLogicalVector) bool { +func (hlv *DecodedHybridLogicalVector) isDominating(otherVector DecodedHybridLogicalVector) bool { // Dominating Criteria: // HLV A dominates HLV B if source(A) == source(B) and version(A) > version(B) // If there is an entry in pv(B) for A's current source and version(A) > B's version for that pv entry then A is dominating @@ -186,7 +217,7 @@ func (hlv *HybridLogicalVector) isDominating(otherVector HybridLogicalVector) bo } // isEqual tests if in memory HLV is equal to another -func (hlv *HybridLogicalVector) isEqual(otherVector HybridLogicalVector) bool { +func (hlv *DecodedHybridLogicalVector) isEqual(otherVector DecodedHybridLogicalVector) bool { // if in HLV(A) sourceID the same as HLV(B) sourceID and HLV(A) CAS is equal to HLV(B) CAS then the two HLV's are equal if hlv.SourceID == otherVector.SourceID && hlv.Version == otherVector.Version { return true @@ -208,7 +239,7 @@ func (hlv *HybridLogicalVector) isEqual(otherVector HybridLogicalVector) bool { } // equalMergeVectors tests if two merge vectors between HLV's are equal or not -func (hlv *HybridLogicalVector) equalMergeVectors(otherVector HybridLogicalVector) bool { +func (hlv *DecodedHybridLogicalVector) equalMergeVectors(otherVector DecodedHybridLogicalVector) bool { if len(hlv.MergeVersions) != len(otherVector.MergeVersions) { return false } @@ -221,7 +252,7 @@ func (hlv *HybridLogicalVector) equalMergeVectors(otherVector HybridLogicalVecto } // equalPreviousVectors tests if two previous versions vectors between two HLV's are equal or not -func (hlv *HybridLogicalVector) equalPreviousVectors(otherVector HybridLogicalVector) bool { +func (hlv *DecodedHybridLogicalVector) equalPreviousVectors(otherVector DecodedHybridLogicalVector) bool { if len(hlv.PreviousVersions) != len(otherVector.PreviousVersions) { return false } @@ -235,7 +266,7 @@ func (hlv *HybridLogicalVector) equalPreviousVectors(otherVector HybridLogicalVe // GetVersion returns the latest CAS value in the HLV for a given sourceID along with boolean value to // indicate if sourceID is found in the HLV, if the sourceID is not present in the HLV it will return 0 CAS value and false -func (hlv *HybridLogicalVector) GetVersion(sourceID string) (uint64, bool) { +func (hlv *DecodedHybridLogicalVector) GetVersion(sourceID string) (uint64, bool) { if sourceID == "" { return 0, false } @@ -256,6 +287,34 @@ func (hlv *HybridLogicalVector) GetVersion(sourceID string) (uint64, bool) { return latestVersion, true } +// GetVersion returns the latest decoded CAS value in the HLV for a given sourceID +func (hlv *HybridLogicalVector) GetVersion(sourceID string) (uint64, bool) { + if sourceID == "" { + return 0, false + } + var latestVersion uint64 + if sourceID == hlv.SourceID { + latestVersion = base.HexCasToUint64(hlv.Version) + } + if pvEntry, ok := hlv.PreviousVersions[sourceID]; ok { + entry := base.HexCasToUint64(pvEntry) + if entry > latestVersion { + latestVersion = entry + } + } + if mvEntry, ok := hlv.MergeVersions[sourceID]; ok { + entry := base.HexCasToUint64(mvEntry) + if entry > latestVersion { + latestVersion = entry + } + } + // if we have 0 cas value, there is no entry for this source ID in the HLV + if latestVersion == 0 { + return latestVersion, false + } + return latestVersion, true +} + // AddNewerVersions will take a hlv and add any newer source/version pairs found across CV and PV found in the other HLV taken as parameter // when both HLV func (hlv *HybridLogicalVector) AddNewerVersions(otherVector HybridLogicalVector) error { @@ -272,8 +331,15 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector HybridLogicalVector // Iterate through incoming vector previous versions, update with the version from other vector // for source if the local version for that source is lower for i, v := range otherVector.PreviousVersions { - if hlv.PreviousVersions[i] == 0 || hlv.PreviousVersions[i] < v { + if hlv.PreviousVersions[i] == "" { hlv.setPreviousVersion(i, v) + } else { + // if we get here then there is entry for this source in PV so we must check if its newer or not + otherHLVPVValue := base.HexCasToUint64(v) + localHLVPVValue := base.HexCasToUint64(hlv.PreviousVersions[i]) + if localHLVPVValue < otherHLVPVValue { + hlv.setPreviousVersion(i, v) + } } } } @@ -284,104 +350,6 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector HybridLogicalVector return nil } -func (hlv HybridLogicalVector) MarshalJSON() ([]byte, error) { - - persistedHLV, err := hlv.convertHLVToPersistedFormat() - if err != nil { - return nil, err - } - - return base.JSONMarshal(*persistedHLV) -} - -func (hlv *HybridLogicalVector) UnmarshalJSON(inputjson []byte) error { - persistedJSON := PersistedHybridLogicalVector{} - err := base.JSONUnmarshal(inputjson, &persistedJSON) - if err != nil { - return err - } - // convert the data to in memory format - hlv.convertPersistedHLVToInMemoryHLV(persistedJSON) - return nil -} - -func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridLogicalVector, error) { - persistedHLV := PersistedHybridLogicalVector{} - var cvCasByteArray []byte - var importCASBytes []byte - var vrsCasByteArray []byte - if hlv.CurrentVersionCAS != 0 { - cvCasByteArray = base.Uint64CASToLittleEndianHex(hlv.CurrentVersionCAS) - } - if hlv.ImportCAS != 0 { - importCASBytes = base.Uint64CASToLittleEndianHex(hlv.ImportCAS) - } - if hlv.Version != 0 { - vrsCasByteArray = base.Uint64CASToLittleEndianHex(hlv.Version) - } - - pvPersistedFormat, err := convertMapToPersistedFormat(hlv.PreviousVersions) - if err != nil { - return nil, err - } - mvPersistedFormat, err := convertMapToPersistedFormat(hlv.MergeVersions) - if err != nil { - return nil, err - } - - persistedHLV.CurrentVersionCAS = string(cvCasByteArray) - persistedHLV.ImportCAS = string(importCASBytes) - persistedHLV.SourceID = hlv.SourceID - persistedHLV.Version = string(vrsCasByteArray) - persistedHLV.PreviousVersions = pvPersistedFormat - persistedHLV.MergeVersions = mvPersistedFormat - return &persistedHLV, nil -} - -func (hlv *HybridLogicalVector) convertPersistedHLVToInMemoryHLV(persistedJSON PersistedHybridLogicalVector) { - hlv.CurrentVersionCAS = base.HexCasToUint64(persistedJSON.CurrentVersionCAS) - if persistedJSON.ImportCAS != "" { - hlv.ImportCAS = base.HexCasToUint64(persistedJSON.ImportCAS) - } - hlv.SourceID = persistedJSON.SourceID - // convert the hex cas to uint64 cas - hlv.Version = base.HexCasToUint64(persistedJSON.Version) - // convert the maps form persisted format to the in memory format - hlv.PreviousVersions = convertMapToInMemoryFormat(persistedJSON.PreviousVersions) - hlv.MergeVersions = convertMapToInMemoryFormat(persistedJSON.MergeVersions) -} - -// convertMapToPersistedFormat will convert in memory map of previous versions or merge versions into the persisted format map -func convertMapToPersistedFormat(memoryMap map[string]uint64) (map[string]string, error) { - if memoryMap == nil { - return nil, nil - } - returnedMap := make(map[string]string) - var persistedCAS string - for source, cas := range memoryMap { - casByteArray := base.Uint64CASToLittleEndianHex(cas) - persistedCAS = string(casByteArray) - // remove the leading '0x' from the CAS value - persistedCAS = persistedCAS[2:] - returnedMap[source] = persistedCAS - } - return returnedMap, nil -} - -// convertMapToInMemoryFormat will convert the persisted format map to an in memory format of that map. -// Used for previous versions and merge versions maps on HLV -func convertMapToInMemoryFormat(persistedMap map[string]string) map[string]uint64 { - if persistedMap == nil { - return nil - } - returnedMap := make(map[string]uint64) - // convert each CAS entry from little endian hex to Uint64 - for key, value := range persistedMap { - returnedMap[key] = base.HexCasToUint64(value) - } - return returnedMap -} - // computeMacroExpansions returns the mutate in spec needed for the document update based off the outcome in updateHLV func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansionSpec { var outputSpec []sgbucket.MacroExpansionSpec @@ -400,9 +368,9 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi } // setPreviousVersion will take a source/version pair and add it to the HLV previous versions map -func (hlv *HybridLogicalVector) setPreviousVersion(source string, version uint64) { +func (hlv *HybridLogicalVector) setPreviousVersion(source string, version string) { if hlv.PreviousVersions == nil { - hlv.PreviousVersions = make(map[string]uint64) + hlv.PreviousVersions = make(map[string]string) } hlv.PreviousVersions[source] = version } @@ -443,6 +411,36 @@ func (hlv *HybridLogicalVector) toHistoryForHLV() string { return s.String() } +// ToDecodedHybridLogicalVector converts the little endian hex values of a HLV to uint64 values +func (hlv *HybridLogicalVector) ToDecodedHybridLogicalVector() DecodedHybridLogicalVector { + var decodedVersion, decodedCVCAS, decodedImportCAS uint64 + if hlv.Version != "" { + decodedVersion = base.HexCasToUint64(hlv.Version) + } + if hlv.ImportCAS != "" { + decodedImportCAS = base.HexCasToUint64(hlv.ImportCAS) + } + if hlv.CurrentVersionCAS != "" { + decodedCVCAS = base.HexCasToUint64(hlv.CurrentVersionCAS) + } + decodedHLV := DecodedHybridLogicalVector{ + CurrentVersionCAS: decodedCVCAS, + Version: decodedVersion, + ImportCAS: decodedImportCAS, + SourceID: hlv.SourceID, + PreviousVersions: make(map[string]uint64, len(hlv.PreviousVersions)), + MergeVersions: make(map[string]uint64, len(hlv.MergeVersions)), + } + + for i, v := range hlv.PreviousVersions { + decodedHLV.PreviousVersions[i] = base.HexCasToUint64(v) + } + for i, v := range hlv.MergeVersions { + decodedHLV.MergeVersions[i] = base.HexCasToUint64(v) + } + return decodedHLV +} + // appendRevocationMacroExpansions adds macro expansions for the channel map. Not strictly an HLV operation // but putting the function here as it's required when the HLV's current version is being macro expanded func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec, channelNames []string) (updatedSpec []sgbucket.MacroExpansionSpec) { diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index c1d665b3fc..28140129ee 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -9,6 +9,7 @@ package db import ( + "encoding/base64" "reflect" "strconv" "strings" @@ -24,20 +25,20 @@ import ( // - Tests internal api methods on the HLV work as expected // - Tests methods GetCurrentVersion, AddVersion and Remove func TestInternalHLVFunctions(t *testing.T) { - pv := make(map[string]uint64) - currSourceId := "s_5pRi8Piv1yLcLJ1iVNJIsA" - const currVersion = 12345678 - pv["s_YZvBpEaztom9z5V/hDoeIw"] = 64463204720 + pv := make(map[string]string) + currSourceId := base64.StdEncoding.EncodeToString([]byte("5pRi8Piv1yLcLJ1iVNJIsA")) + currVersion := string(base.Uint64CASToLittleEndianHex(12345678)) + pv[base64.StdEncoding.EncodeToString([]byte("YZvBpEaztom9z5V/hDoeIw"))] = string(base.Uint64CASToLittleEndianHex(64463204720)) - inputHLV := []string{"s_5pRi8Piv1yLcLJ1iVNJIsA@12345678", "s_YZvBpEaztom9z5V/hDoeIw@64463204720", "m_s_NqiIe0LekFPLeX4JvTO6Iw@345454"} + inputHLV := []string{"5pRi8Piv1yLcLJ1iVNJIsA@12345678", "YZvBpEaztom9z5V/hDoeIw@64463204720", "m_NqiIe0LekFPLeX4JvTO6Iw@345454"} hlv := createHLVForTest(t, inputHLV) - const newCAS = 123456789 + newCAS := string(base.Uint64CASToLittleEndianHex(123456789)) const newSource = "s_testsource" // create a new version vector entry that will error method AddVersion badNewVector := Version{ - Value: 123345, + Value: string(base.Uint64CASToLittleEndianHex(123345)), SourceID: currSourceId, } // create a new version vector entry that should be added to HLV successfully @@ -49,7 +50,7 @@ func TestInternalHLVFunctions(t *testing.T) { // Get current version vector, sourceID and CAS pair source, version := hlv.GetCurrentVersion() assert.Equal(t, currSourceId, source) - assert.Equal(t, uint64(currVersion), version) + assert.Equal(t, currVersion, version) // add new version vector with same sourceID as current sourceID and assert it doesn't add to previous versions then restore HLV to previous state require.NoError(t, hlv.AddVersion(newVersionVector)) @@ -64,7 +65,7 @@ func TestInternalHLVFunctions(t *testing.T) { // Add a new version vector pair to the HLV structure and assert that it moves the current version vector pair to the previous versions section newVersionVector.SourceID = newSource require.NoError(t, hlv.AddVersion(newVersionVector)) - assert.Equal(t, uint64(newCAS), hlv.Version) + assert.Equal(t, newCAS, hlv.Version) assert.Equal(t, newSource, hlv.SourceID) assert.True(t, reflect.DeepEqual(hlv.PreviousVersions, pv)) @@ -115,7 +116,9 @@ func TestConflictDetectionDominating(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { hlvA := createHLVForTest(t, testCase.inputListHLVA) hlvB := createHLVForTest(t, testCase.inputListHLVB) - require.False(t, hlvA.IsInConflict(hlvB)) + decHLVA := hlvA.ToDecodedHybridLogicalVector() + decHLVB := hlvB.ToDecodedHybridLogicalVector() + require.False(t, decHLVA.IsInConflict(decHLVB)) }) } } @@ -131,25 +134,33 @@ func TestConflictEqualHLV(t *testing.T) { inputHLVB := []string{"cluster1@10", "cluster2@4"} hlvA := createHLVForTest(t, inputHLVA) hlvB := createHLVForTest(t, inputHLVB) - require.True(t, hlvA.isEqual(hlvB)) + decHLVA := hlvA.ToDecodedHybridLogicalVector() + decHLVB := hlvB.ToDecodedHybridLogicalVector() + require.True(t, decHLVA.isEqual(decHLVB)) // test conflict detection with different version CAS but same merge versions inputHLVA = []string{"cluster2@12", "cluster3@3", "cluster4@2"} inputHLVB = []string{"cluster1@10", "cluster3@3", "cluster4@2"} hlvA = createHLVForTest(t, inputHLVA) hlvB = createHLVForTest(t, inputHLVB) - require.True(t, hlvA.isEqual(hlvB)) + decHLVA = hlvA.ToDecodedHybridLogicalVector() + decHLVB = hlvB.ToDecodedHybridLogicalVector() + require.True(t, decHLVA.isEqual(decHLVB)) // test conflict detection with different version CAS but same previous version vectors inputHLVA = []string{"cluster3@2", "cluster1@3", "cluster2@5"} hlvA = createHLVForTest(t, inputHLVA) inputHLVB = []string{"cluster4@7", "cluster1@3", "cluster2@5"} hlvB = createHLVForTest(t, inputHLVB) - require.True(t, hlvA.isEqual(hlvB)) + decHLVA = hlvA.ToDecodedHybridLogicalVector() + decHLVB = hlvB.ToDecodedHybridLogicalVector() + require.True(t, decHLVA.isEqual(decHLVB)) + cluster1Encoded := base64.StdEncoding.EncodeToString([]byte("cluster1")) // remove an entry from one of the HLV PVs to assert we get false returned from isEqual - require.NoError(t, hlvA.Remove("cluster1")) - require.False(t, hlvA.isEqual(hlvB)) + require.NoError(t, hlvA.Remove(cluster1Encoded)) + decHLVA = hlvA.ToDecodedHybridLogicalVector() + require.False(t, decHLVA.isEqual(decHLVB)) } // TestConflictExample: @@ -161,7 +172,10 @@ func TestConflictExample(t *testing.T) { input = []string{"cluster2@2", "cluster3@3"} otherVector := createHLVForTest(t, input) - require.True(t, inMemoryHLV.IsInConflict(otherVector)) + + inMemoryHLVDec := inMemoryHLV.ToDecodedHybridLogicalVector() + otherVectorDec := otherVector.ToDecodedHybridLogicalVector() + require.True(t, inMemoryHLVDec.IsInConflict(otherVectorDec)) } // createHLVForTest is a helper function to create a HLV for use in a test. Takes a list of strings in the format of and assumes @@ -171,67 +185,31 @@ func createHLVForTest(tb *testing.T, inputList []string) HybridLogicalVector { // first element will be current version and source pair currentVersionPair := strings.Split(inputList[0], "@") - hlvOutput.SourceID = currentVersionPair[0] - version, err := strconv.Atoi(currentVersionPair[1]) + hlvOutput.SourceID = base64.StdEncoding.EncodeToString([]byte(currentVersionPair[0])) + version, err := strconv.ParseUint(currentVersionPair[1], 10, 64) require.NoError(tb, err) - hlvOutput.Version = uint64(version) - hlvOutput.CurrentVersionCAS = uint64(version) + vrsEncoded := string(base.Uint64CASToLittleEndianHex(version)) + hlvOutput.Version = vrsEncoded + hlvOutput.CurrentVersionCAS = vrsEncoded // remove current version entry in list now we have parsed it into the HLV inputList = inputList[1:] for _, value := range inputList { currentVersionPair = strings.Split(value, "@") - version, err = strconv.Atoi(currentVersionPair[1]) + version, err = strconv.ParseUint(currentVersionPair[1], 10, 64) require.NoError(tb, err) if strings.HasPrefix(currentVersionPair[0], "m_") { // add entry to merge version removing the leading prefix for sourceID - hlvOutput.MergeVersions[currentVersionPair[0][2:]] = uint64(version) + hlvOutput.MergeVersions[base64.StdEncoding.EncodeToString([]byte(currentVersionPair[0][2:]))] = string(base.Uint64CASToLittleEndianHex(version)) } else { - // if its not got the prefix we assume its a previous version entry - hlvOutput.PreviousVersions[currentVersionPair[0]] = uint64(version) + // if it's not got the prefix we assume it's a previous version entry + hlvOutput.PreviousVersions[base64.StdEncoding.EncodeToString([]byte(currentVersionPair[0]))] = string(base.Uint64CASToLittleEndianHex(version)) } } return hlvOutput } -// TestHybridLogicalVectorPersistence: -// - Tests the process of constructing in memory HLV and marshaling it to persisted format -// - Asserts on the format -// - Unmarshal the HLV and assert that the process works as expected -func TestHybridLogicalVectorPersistence(t *testing.T) { - // create HLV - inputHLV := []string{"cb06dc003846116d9b66d2ab23887a96@123456", "s_YZvBpEaztom9z5V/hDoeIw@1628620455135215600", "m_s_NqiIe0LekFPLeX4JvTO6Iw@1628620455139868700", - "m_s_LhRPsa7CpjEvP5zeXTXEBA@1628620455147864000"} - inMemoryHLV := createHLVForTest(t, inputHLV) - - // marshal in memory hlv into persisted form - byteArray, err := inMemoryHLV.MarshalJSON() - require.NoError(t, err) - - // convert to string and assert the in memory struct is converted to persisted form correctly - // no guarantee the order of the marshaling of the mv part so just assert on the values - strHLV := string(byteArray) - assert.Contains(t, strHLV, `"cvCas":"0x40e2010000000000`) - assert.Contains(t, strHLV, `"src":"cb06dc003846116d9b66d2ab23887a96"`) - assert.Contains(t, strHLV, `"vrs":"0x40e2010000000000"`) - assert.Contains(t, strHLV, `"s_LhRPsa7CpjEvP5zeXTXEBA":"c0ff05d7ac059a16"`) - assert.Contains(t, strHLV, `"s_NqiIe0LekFPLeX4JvTO6Iw":"1c008cd6ac059a16"`) - assert.Contains(t, strHLV, `"pv":{"s_YZvBpEaztom9z5V/hDoeIw":"f0ff44d6ac059a16"}`) - - // Unmarshal the in memory constructed HLV above - hlvFromPersistance := HybridLogicalVector{} - err = hlvFromPersistance.UnmarshalJSON(byteArray) - require.NoError(t, err) - - // assertions on values of unmarshaled HLV - assert.Equal(t, inMemoryHLV.CurrentVersionCAS, hlvFromPersistance.CurrentVersionCAS) - assert.Equal(t, inMemoryHLV.SourceID, hlvFromPersistance.SourceID) - assert.Equal(t, inMemoryHLV.Version, hlvFromPersistance.Version) - assert.Equal(t, inMemoryHLV.PreviousVersions, hlvFromPersistance.PreviousVersions) - assert.Equal(t, inMemoryHLV.MergeVersions, hlvFromPersistance.MergeVersions) -} - func TestAddNewerVersionsBetweenTwoVectorsWhenNotInConflict(t *testing.T) { testCases := []struct { name string @@ -277,7 +255,7 @@ func TestHLVImport(t *testing.T) { defer db.Close(ctx) collection := GetSingleDatabaseCollectionWithUser(t, db) - localSource := collection.dbCtx.BucketUUID + localSource := db.EncodedBucketUUID // 1. Test standard import of an SDK write standardImportKey := "standardImport_" + t.Name() @@ -290,9 +268,10 @@ func TestHLVImport(t *testing.T) { importedDoc, _, err := collection.GetDocWithXattr(ctx, standardImportKey, DocUnmarshalAll) require.NoError(t, err) importedHLV := importedDoc.HLV - require.Equal(t, cas, importedHLV.ImportCAS) - require.Equal(t, importedDoc.Cas, importedHLV.CurrentVersionCAS) - require.Equal(t, importedDoc.Cas, importedHLV.Version) + encodedCAS := string(base.Uint64CASToLittleEndianHex(cas)) + require.Equal(t, encodedCAS, importedHLV.ImportCAS) + require.Equal(t, importedDoc.SyncData.Cas, importedHLV.CurrentVersionCAS) + require.Equal(t, importedDoc.SyncData.Cas, importedHLV.Version) require.Equal(t, localSource, importedHLV.SourceID) // 2. Test import of write by HLV-aware peer (HLV is already updated, sync metadata is not). @@ -304,6 +283,7 @@ func TestHLVImport(t *testing.T) { var existingBody, existingXattr []byte cas, err = collection.dataStore.GetWithXattr(ctx, existingHLVKey, "_sync", "", &existingBody, &existingXattr, nil) require.NoError(t, err) + encodedCAS = string(base.Uint64CASToLittleEndianHex(cas)) _, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattr, nil, false, cas, nil, ImportFromFeed) require.NoError(t, err, "import error") @@ -312,10 +292,10 @@ func TestHLVImport(t *testing.T) { require.NoError(t, err) importedHLV = importedDoc.HLV // cas in the HLV's current version and cvCAS should not have changed, and should match importCAS - require.Equal(t, cas, importedHLV.ImportCAS) - require.Equal(t, cas, importedHLV.CurrentVersionCAS) - require.Equal(t, cas, importedHLV.Version) - require.Equal(t, otherSource, importedHLV.SourceID) + require.Equal(t, encodedCAS, importedHLV.ImportCAS) + require.Equal(t, encodedCAS, importedHLV.CurrentVersionCAS) + require.Equal(t, encodedCAS, importedHLV.Version) + require.Equal(t, hlvHelper.Source, importedHLV.SourceID) } // TestHLVMapToCBLString: diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index a3f3458998..76204d7cf0 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -236,7 +236,7 @@ type IDAndRev struct { type IDandCV struct { DocID string - Version uint64 + Version string Source string } diff --git a/db/revision_cache_test.go b/db/revision_cache_test.go index e22913ace3..61c5051e2c 100644 --- a/db/revision_cache_test.go +++ b/db/revision_cache_test.go @@ -53,7 +53,7 @@ func (t *testBackingStore) GetDocument(ctx context.Context, docid string, unmars doc.HLV = &HybridLogicalVector{ SourceID: "test", - Version: 123, + Version: "123", } _, _, err = doc.updateChannels(ctx, base.SetOf("*")) if err != nil { @@ -113,7 +113,7 @@ func TestLRURevisionCacheEviction(t *testing.T) { // Fill up the rev cache with the first 10 docs for docID := 0; docID < 10; docID++ { id := strconv.Itoa(docID) - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &Version{Value: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &Version{Value: id, SourceID: "test"}, History: Revisions{"start": 1}}) } // Get them back out @@ -130,7 +130,7 @@ func TestLRURevisionCacheEviction(t *testing.T) { // Add 3 more docs to the now full revcache for i := 10; i < 13; i++ { docID := strconv.Itoa(i) - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: docID, RevID: "1-abc", CV: &Version{Value: uint64(i), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: docID, RevID: "1-abc", CV: &Version{Value: docID, SourceID: "test"}, History: Revisions{"start": 1}}) } // Check that the first 3 docs were evicted @@ -173,7 +173,7 @@ func TestLRURevisionCacheEvictionMixedRevAndCV(t *testing.T) { // Fill up the rev cache with the first 10 docs for docID := 0; docID < 10; docID++ { id := strconv.Itoa(docID) - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &Version{Value: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &Version{Value: id, SourceID: "test"}, History: Revisions{"start": 1}}) } // assert that the list has 10 elements along with both lookup maps @@ -184,7 +184,7 @@ func TestLRURevisionCacheEvictionMixedRevAndCV(t *testing.T) { // Add 3 more docs to the now full rev cache to trigger eviction for docID := 10; docID < 13; docID++ { id := strconv.Itoa(docID) - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &Version{Value: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &Version{Value: id, SourceID: "test"}, History: Revisions{"start": 1}}) } // assert the cache and associated lookup maps only have 10 items in them (i.e.e is eviction working?) assert.Equal(t, 10, len(cache.hlvCache)) @@ -195,7 +195,7 @@ func TestLRURevisionCacheEvictionMixedRevAndCV(t *testing.T) { prevCacheHitCount := cacheHitCounter.Value() for i := 0; i < 10; i++ { id := strconv.Itoa(i + 3) - cv := Version{Value: uint64(i + 3), SourceID: "test"} + cv := Version{Value: id, SourceID: "test"} docRev, err := cache.GetWithCV(ctx, id, &cv, RevCacheOmitBody, RevCacheOmitDelta) assert.NoError(t, err) assert.NotNil(t, docRev.BodyBytes, "nil body for %s", id) @@ -273,13 +273,13 @@ func TestBackingStoreCV(t *testing.T) { cache := NewLRURevisionCache(10, &testBackingStore{[]string{"not_found"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) // Get Rev for the first time - miss cache, but fetch the doc and revision to store - cv := Version{SourceID: "test", Value: 123} + cv := Version{SourceID: "test", Value: "123"} docRev, err := cache.GetWithCV(base.TestCtx(t), "doc1", &cv, RevCacheOmitBody, RevCacheOmitDelta) assert.NoError(t, err) assert.Equal(t, "doc1", docRev.DocID) assert.NotNil(t, docRev.Channels) assert.Equal(t, "test", docRev.CV.SourceID) - assert.Equal(t, uint64(123), docRev.CV.Value) + assert.Equal(t, "123", docRev.CV.Value) assert.Equal(t, int64(0), cacheHitCounter.Value()) assert.Equal(t, int64(1), cacheMissCounter.Value()) assert.Equal(t, int64(1), getDocumentCounter.Value()) @@ -290,14 +290,14 @@ func TestBackingStoreCV(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "doc1", docRev.DocID) assert.Equal(t, "test", docRev.CV.SourceID) - assert.Equal(t, uint64(123), docRev.CV.Value) + assert.Equal(t, "123", docRev.CV.Value) assert.Equal(t, int64(1), cacheHitCounter.Value()) assert.Equal(t, int64(1), cacheMissCounter.Value()) assert.Equal(t, int64(1), getDocumentCounter.Value()) assert.Equal(t, int64(1), getRevisionCounter.Value()) // Doc doesn't exist, so miss the cache, and fail when getting the doc - cv = Version{SourceID: "test11", Value: 100} + cv = Version{SourceID: "test11", Value: "100"} docRev, err = cache.GetWithCV(base.TestCtx(t), "not_found", &cv, RevCacheOmitBody, RevCacheOmitDelta) assertHTTPError(t, err, 404) assert.Nil(t, docRev.BodyBytes) @@ -560,7 +560,7 @@ func TestSingleLoad(t *testing.T) { cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} cache := NewLRURevisionCache(10, &testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) - cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc123", RevID: "1-abc", CV: &Version{Value: uint64(123), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc123", RevID: "1-abc", CV: &Version{Value: "123", SourceID: "test"}, History: Revisions{"start": 1}}) _, err := cache.GetWithRev(base.TestCtx(t), "doc123", "1-abc", true, false) assert.NoError(t, err) } @@ -570,7 +570,8 @@ func TestConcurrentLoad(t *testing.T) { cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} cache := NewLRURevisionCache(10, &testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) - cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc1", RevID: "1-abc", CV: &Version{Value: uint64(1234), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc1", RevID: "1-abc", CV: &Version{Value: "1234", SourceID: "test"}, History: Revisions{"start": 1}}) + // Trigger load into cache var wg sync.WaitGroup wg.Add(20) @@ -634,7 +635,7 @@ func TestRevCacheOperationsCV(t *testing.T) { cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} cache := NewLRURevisionCache(10, &testBackingStore{[]string{"test_doc"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) - cv := Version{SourceID: "test", Value: 123} + cv := Version{SourceID: "test", Value: "123"} documentRevision := DocumentRevision{ DocID: "doc1", RevID: "1-abc", @@ -650,7 +651,7 @@ func TestRevCacheOperationsCV(t *testing.T) { assert.Equal(t, "doc1", docRev.DocID) assert.Equal(t, base.SetOf("chan1"), docRev.Channels) assert.Equal(t, "test", docRev.CV.SourceID) - assert.Equal(t, uint64(123), docRev.CV.Value) + assert.Equal(t, "123", docRev.CV.Value) assert.Equal(t, int64(1), cacheHitCounter.Value()) assert.Equal(t, int64(0), cacheMissCounter.Value()) @@ -663,7 +664,7 @@ func TestRevCacheOperationsCV(t *testing.T) { assert.Equal(t, "doc1", docRev.DocID) assert.Equal(t, base.SetOf("chan1"), docRev.Channels) assert.Equal(t, "test", docRev.CV.SourceID) - assert.Equal(t, uint64(123), docRev.CV.Value) + assert.Equal(t, "123", docRev.CV.Value) assert.Equal(t, []byte(`{"test":"12345"}`), docRev.BodyBytes) assert.Equal(t, int64(2), cacheHitCounter.Value()) assert.Equal(t, int64(0), cacheMissCounter.Value()) @@ -707,7 +708,7 @@ func TestLoaderMismatchInCV(t *testing.T) { cache := NewLRURevisionCache(10, &testBackingStore{[]string{"test_doc"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) // create cv with incorrect version to the one stored in backing store - cv := Version{SourceID: "test", Value: 1234} + cv := Version{SourceID: "test", Value: "1234"} _, err := cache.GetWithCV(base.TestCtx(t), "doc1", &cv, RevCacheOmitBody, RevCacheOmitDelta) require.Error(t, err) @@ -737,7 +738,7 @@ func TestConcurrentLoadByCVAndRevOnCache(t *testing.T) { wg := sync.WaitGroup{} wg.Add(2) - cv := Version{SourceID: "test", Value: 123} + cv := Version{SourceID: "test", Value: "123"} go func() { _, err := cache.GetWithRev(ctx, "doc1", "1-abc", RevCacheOmitBody, RevCacheIncludeDelta) require.NoError(t, err) @@ -753,7 +754,7 @@ func TestConcurrentLoadByCVAndRevOnCache(t *testing.T) { wg.Wait() revElement := cache.cache[IDAndRev{RevID: "1-abc", DocID: "doc1"}] - cvElement := cache.hlvCache[IDandCV{DocID: "doc1", Source: "test", Version: 123}] + cvElement := cache.hlvCache[IDandCV{DocID: "doc1", Source: "test", Version: "123"}] assert.Equal(t, 1, cache.lruList.Len()) assert.Equal(t, 1, len(cache.cache)) assert.Equal(t, 1, len(cache.hlvCache)) @@ -774,10 +775,11 @@ func TestGetActive(t *testing.T) { rev1id, doc, err := collection.Put(ctx, "doc", Body{"val": 123}) require.NoError(t, err) + syncCAS := string(base.Uint64CASToLittleEndianHex(doc.Cas)) expectedCV := Version{ - SourceID: db.BucketUUID, - Value: doc.Cas, + SourceID: db.EncodedBucketUUID, + Value: syncCAS, } // remove the entry form the rev cache to force the cache to not have the active version in it @@ -803,7 +805,7 @@ func TestConcurrentPutAndGetOnRevCache(t *testing.T) { wg := sync.WaitGroup{} wg.Add(2) - cv := Version{SourceID: "test", Value: 123} + cv := Version{SourceID: "test", Value: "123"} docRev := DocumentRevision{ DocID: "doc1", RevID: "1-abc", @@ -827,7 +829,7 @@ func TestConcurrentPutAndGetOnRevCache(t *testing.T) { wg.Wait() revElement := cache.cache[IDAndRev{RevID: "1-abc", DocID: "doc1"}] - cvElement := cache.hlvCache[IDandCV{DocID: "doc1", Source: "test", Version: 123}] + cvElement := cache.hlvCache[IDandCV{DocID: "doc1", Source: "test", Version: "123"}] assert.Equal(t, 1, cache.lruList.Len()) assert.Equal(t, 1, len(cache.cache)) diff --git a/db/util_testing.go b/db/util_testing.go index 4cfa00d722..992ce35763 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -679,7 +679,7 @@ func createTestDocument(docID string, revID string, body Body, deleted bool, exp } // requireCurrentVersion fetches the document by key, and validates that cv matches. -func (c *DatabaseCollection) RequireCurrentVersion(t *testing.T, key string, source string, version uint64) { +func (c *DatabaseCollection) RequireCurrentVersion(t *testing.T, key string, source string, version string) { ctx := base.TestCtx(t) doc, err := c.GetDocument(ctx, key, DocUnmarshalSync) require.NoError(t, err) @@ -694,12 +694,12 @@ func (c *DatabaseCollection) RequireCurrentVersion(t *testing.T, key string, sou } // GetDocumentCurrentVersion fetches the document by key and returns the current version -func (c *DatabaseCollection) GetDocumentCurrentVersion(t *testing.T, key string) (source string, version uint64) { +func (c *DatabaseCollection) GetDocumentCurrentVersion(t *testing.T, key string) (source string, version string) { ctx := base.TestCtx(t) doc, err := c.GetDocument(ctx, key, DocUnmarshalSync) require.NoError(t, err) if doc.HLV == nil { - return "", 0 + return "", "" } return doc.HLV.SourceID, doc.HLV.Version } diff --git a/db/utilities_hlv_testing.go b/db/utilities_hlv_testing.go index 8c01ee350b..b8e746367d 100644 --- a/db/utilities_hlv_testing.go +++ b/db/utilities_hlv_testing.go @@ -12,6 +12,7 @@ package db import ( "context" + "encoding/base64" "testing" sgbucket "github.com/couchbase/sg-bucket" @@ -23,7 +24,7 @@ import ( type HLVAgent struct { t *testing.T datastore base.DataStore - source string // All writes by the HLVHelper are done as this source + Source string // All writes by the HLVHelper are done as this source xattrName string // xattr name to store the HLV } @@ -33,7 +34,7 @@ func NewHLVAgent(t *testing.T, datastore base.DataStore, source string, xattrNam return &HLVAgent{ t: t, datastore: datastore, - source: source, // all writes by the HLVHelper are done as this source + Source: base64.StdEncoding.EncodeToString([]byte(source)), // all writes by the HLVHelper are done as this source xattrName: xattrName, } } @@ -42,7 +43,7 @@ func NewHLVAgent(t *testing.T, datastore base.DataStore, source string, xattrNam // a different HLV-aware peer) func (h *HLVAgent) InsertWithHLV(ctx context.Context, key string) (casOut uint64) { hlv := &HybridLogicalVector{} - err := hlv.AddVersion(CreateVersion(h.source, hlvExpandMacroCASValue)) + err := hlv.AddVersion(CreateVersion(h.Source, hlvExpandMacroCASValue)) require.NoError(h.t, err) hlv.CurrentVersionCAS = hlvExpandMacroCASValue diff --git a/rest/api_test.go b/rest/api_test.go index 77522730f5..bb8916eeb5 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -2825,19 +2825,17 @@ func TestPutDocUpdateVersionVector(t *testing.T) { rt := NewRestTester(t, nil) defer rt.Close() - bucketUUID, err := rt.GetDatabase().Bucket.UUID() - require.NoError(t, err) + bucketUUID := rt.GetDatabase().EncodedBucketUUID resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", `{"key": "value"}`) RequireStatus(t, resp, http.StatusCreated) syncData, err := rt.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") assert.NoError(t, err) - uintCAS := base.HexCasToUint64(syncData.Cas) assert.Equal(t, bucketUUID, syncData.HLV.SourceID) - assert.Equal(t, uintCAS, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.Version) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) // Put a new revision of this doc and assert that the version vector SourceID and Version is updated resp = rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1?rev="+syncData.CurrentRev, `{"key1": "value1"}`) @@ -2845,11 +2843,10 @@ func TestPutDocUpdateVersionVector(t *testing.T) { syncData, err = rt.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") assert.NoError(t, err) - uintCAS = base.HexCasToUint64(syncData.Cas) assert.Equal(t, bucketUUID, syncData.HLV.SourceID) - assert.Equal(t, uintCAS, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.Version) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) // Delete doc and assert that the version vector SourceID and Version is updated resp = rt.SendAdminRequest(http.MethodDelete, "/{{.keyspace}}/doc1?rev="+syncData.CurrentRev, "") @@ -2857,11 +2854,10 @@ func TestPutDocUpdateVersionVector(t *testing.T) { syncData, err = rt.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") assert.NoError(t, err) - uintCAS = base.HexCasToUint64(syncData.Cas) assert.Equal(t, bucketUUID, syncData.HLV.SourceID) - assert.Equal(t, uintCAS, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.Version) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) } // TestHLVOnPutWithImportRejection: @@ -2880,19 +2876,17 @@ func TestHLVOnPutWithImportRejection(t *testing.T) { rt := NewRestTester(t, &rtConfig) defer rt.Close() - bucketUUID, err := rt.GetDatabase().Bucket.UUID() - require.NoError(t, err) + bucketUUID := rt.GetDatabase().EncodedBucketUUID resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", `{"type": "mobile"}`) RequireStatus(t, resp, http.StatusCreated) syncData, err := rt.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") assert.NoError(t, err) - uintCAS := base.HexCasToUint64(syncData.Cas) assert.Equal(t, bucketUUID, syncData.HLV.SourceID) - assert.Equal(t, uintCAS, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.Version) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) // Put a doc that will be rejected by the import filter on the attempt to perform on demand import for write resp = rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc2", `{"type": "not-mobile"}`) @@ -2901,11 +2895,10 @@ func TestHLVOnPutWithImportRejection(t *testing.T) { // assert that the hlv is correctly updated and in tact after the import was cancelled on the doc syncData, err = rt.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc2") assert.NoError(t, err) - uintCAS = base.HexCasToUint64(syncData.Cas) assert.Equal(t, bucketUUID, syncData.HLV.SourceID) - assert.Equal(t, uintCAS, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.Version) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) } func TestTombstoneCompactionAPI(t *testing.T) { diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 0b9e376aab..c68c2b7f8e 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -1911,8 +1911,8 @@ func TestPullReplicationUpdateOnOtherHLVAwarePeer(t *testing.T) { version1 := DocVersion{ RevID: bucketDoc.CurrentRev, CV: db.Version{ - SourceID: otherSource, - Value: cas, + SourceID: hlvHelper.Source, + Value: string(base.Uint64CASToLittleEndianHex(cas)), }, } _, ok := btcRunner.WaitForVersion(client.id, docID, version1) @@ -2979,7 +2979,7 @@ func TestOnDemandImportBlipFailure(t *testing.T) { }, { name: "_revisions property", - invalidBody: []byte(`{"_revisions": {"start": 0, "ids": ["foo", "def]"}}`), + invalidBody: []byte(`{"_revisions": {"start": 0, "ids": ["foo", "def"]}}`), }, { diff --git a/rest/changes_test.go b/rest/changes_test.go index 411509ebcd..f06e90e4ab 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -283,7 +283,7 @@ func TestCVPopulationOnChangesViaAPI(t *testing.T) { defer rt.Close() ctx := base.TestCtx(t) collection := rt.GetSingleTestDatabaseCollection() - bucketUUID := rt.GetDatabase().BucketUUID + bucketUUID := rt.GetDatabase().EncodedBucketUUID const DocID = "doc1" // activate channel cache @@ -315,7 +315,7 @@ func TestCVPopulationOnDocIDChanges(t *testing.T) { defer rt.Close() ctx := base.TestCtx(t) collection := rt.GetSingleTestDatabaseCollection() - bucketUUID := rt.GetDatabase().BucketUUID + bucketUUID := rt.GetDatabase().EncodedBucketUUID const DocID = "doc1" // activate channel cache diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index 8db6db2e79..5a0da2410d 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -935,7 +935,7 @@ func TestChangesFromCompoundSinceViaDocGrant(t *testing.T) { expectedResults = []string{ `{"seq":"8:2","id":"hbo-1","changes":[{"rev":"1-46f8c67c004681619052ee1a1cc8e104"}]}`, `{"seq":8,"id":"grant-1","changes":[{"rev":"1-c5098bb14d12d647c901850ff6a6292a"}]}`, - fmt.Sprintf(`{"seq":9,"id":"mix-1","changes":[{"rev":"1-32f69cdbf1772a8e064f15e928a18f85"}], "current_version":{"source_id": "%s", "version": %d}}`, mixSource, mixVersion), + fmt.Sprintf(`{"seq":9,"id":"mix-1","changes":[{"rev":"1-32f69cdbf1772a8e064f15e928a18f85"}], "current_version":{"source_id": "%s", "version": "%s"}}`, mixSource, mixVersion), } rt.Run("grant via existing channel", func(t *testing.T) { diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index d252ac9eec..06b311fb1b 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -8323,10 +8323,8 @@ func TestReplicatorUpdateHLVOnPut(t *testing.T) { defer teardown() // Grab the bucket UUIDs for both rest testers - activeBucketUUID, err := activeRT.GetDatabase().Bucket.UUID() - require.NoError(t, err) - passiveBucketUUID, err := passiveRT.GetDatabase().Bucket.UUID() - require.NoError(t, err) + activeBucketUUID := activeRT.GetDatabase().EncodedBucketUUID + passiveBucketUUID := passiveRT.GetDatabase().EncodedBucketUUID const rep = "replication" @@ -8336,11 +8334,10 @@ func TestReplicatorUpdateHLVOnPut(t *testing.T) { syncData, err := activeRT.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") assert.NoError(t, err) - uintCAS := base.HexCasToUint64(syncData.Cas) assert.Equal(t, activeBucketUUID, syncData.HLV.SourceID) - assert.Equal(t, uintCAS, syncData.HLV.Version) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.Version) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) // create the replication to push the doc to the passive node and wait for the doc to be replicated activeRT.CreateReplication(rep, remoteURL, db.ActiveReplicatorTypePush, nil, false, db.ConflictResolverDefault) @@ -8351,9 +8348,8 @@ func TestReplicatorUpdateHLVOnPut(t *testing.T) { // assert on the HLV update on the passive node syncData, err = passiveRT.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") assert.NoError(t, err) - uintCAS = base.HexCasToUint64(syncData.Cas) assert.Equal(t, passiveBucketUUID, syncData.HLV.SourceID) - assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) - assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, syncData.Cas, syncData.HLV.CurrentVersionCAS) + assert.Equal(t, syncData.Cas, syncData.HLV.Version) }