Skip to content

Commit

Permalink
CBG-3503 Update HLV on import (#6572)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcfraser authored and bbrks committed Dec 11, 2024
1 parent 2da3125 commit ef8cc13
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 84 deletions.
26 changes: 22 additions & 4 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,19 +855,37 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU
case ExistingVersion:
// preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space
case Import:
// work to be done to decide if the VV needs updating here, pending CBG-3503
if d.HLV.CurrentVersionCAS == d.Cas {
// if cvCAS = document CAS, the HLV has already been updated for this mutation by another HLV-aware peer.
// Set ImportCAS to the previous document CAS, but don't otherwise modify HLV
d.HLV.ImportCAS = d.Cas
} else {
// Otherwise this is an SDK mutation made by the local cluster that should be added to HLV.
newVVEntry := SourceAndVersion{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.Version = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
}
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = d.Cas
}

case NewVersion, ExistingVersionWithUpdateToHLV:
// add a new entry to the version vector
newVVEntry := CurrentVersionVector{}
newVVEntry := SourceAndVersion{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.VersionCAS = hlvExpandMacroCASValue
newVVEntry.Version = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
}
// update the cvCAS on the SGWrite event too
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space
}
return d, nil
}
Expand Down Expand Up @@ -2206,7 +2224,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
Attachments: doc.Attachments,
Expiry: doc.Expiry,
Deleted: doc.History[newRevID].Deleted,
CV: &CurrentVersionVector{VersionCAS: doc.HLV.Version, SourceID: doc.HLV.SourceID},
CV: &SourceAndVersion{Version: doc.HLV.Version, SourceID: doc.HLV.SourceID},
}

if createNewRevIDSkipped {
Expand Down
4 changes: 2 additions & 2 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,14 +1212,14 @@ func computeMetadataOnlyUpdate(currentCas uint64, currentMou *MetadataOnlyUpdate
}

// HasCurrentVersion Compares the specified CV with the fetched documents CV, returns error on mismatch between the two
func (d *Document) HasCurrentVersion(cv CurrentVersionVector) error {
func (d *Document) HasCurrentVersion(cv SourceAndVersion) error {
if d.HLV == nil {
return base.RedactErrorf("no HLV present in fetched doc %s", base.UD(d.ID))
}

// fetch the current version for the loaded doc and compare against the CV specified in the IDandCV key
fetchedDocSource, fetchedDocVersion := d.HLV.GetCurrentVersion()
if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.VersionCAS {
if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.Version {
return base.RedactErrorf("mismatch between specified current version and fetched document current version for doc %s", base.UD(d.ID))
}
return nil
Expand Down
41 changes: 29 additions & 12 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,31 @@ const hlvExpandMacroCASValue = math.MaxUint64

type HybridLogicalVector struct {
CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS at the time of replication
ImportCAS uint64 // Set when an import modifies the document CAS but preserves the HLV (import of a version replicated by XDCR)
SourceID string // source bucket uuid of where this entry originated from
Version uint64 // current cas of the current version on the version vector
MergeVersions map[string]uint64 // map of merge versions for fast efficient lookup
PreviousVersions map[string]uint64 // map of previous versions for fast efficient lookup
}

// CurrentVersionVector is a structure used to add a new sourceID:CAS entry to a HLV
type CurrentVersionVector struct {
VersionCAS uint64
SourceID string
// SourceAndVersion is a structure used to add a new entry to a HLV
type SourceAndVersion struct {
SourceID string
Version uint64
}

func CreateVersion(source string, version uint64) SourceAndVersion {
return SourceAndVersion{
SourceID: source,
Version: version,
}
}

type PersistedHybridLogicalVector struct {
CurrentVersionCAS string `json:"cvCas,omitempty"`
SourceID string `json:"src,omitempty"`
Version string `json:"vrs,omitempty"`
ImportCAS string `json:"importCAS,omitempty"`
SourceID string `json:"src"`
Version string `json:"vrs"`
MergeVersions map[string]string `json:"mv,omitempty"`
PreviousVersions map[string]string `json:"pv,omitempty"`
}
Expand Down Expand Up @@ -66,27 +75,27 @@ func (hlv *HybridLogicalVector) IsInConflict(otherVector HybridLogicalVector) bo

// AddVersion adds a version vector to the in memory representation of a HLV and moves current version vector to
// previous versions on the HLV if needed
func (hlv *HybridLogicalVector) AddVersion(newVersion CurrentVersionVector) error {
if newVersion.VersionCAS < hlv.Version {
return fmt.Errorf("attempting to add new version vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.VersionCAS)
func (hlv *HybridLogicalVector) AddVersion(newVersion SourceAndVersion) error {
if newVersion.Version < hlv.Version {
return fmt.Errorf("attempting to add new version vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.Version)
}
// check if this is the first time we're adding a source - version pair
if hlv.SourceID == "" {
hlv.Version = newVersion.VersionCAS
hlv.Version = newVersion.Version
hlv.SourceID = newVersion.SourceID
return nil
}
// if new entry has the same source we simple just update the version
if newVersion.SourceID == hlv.SourceID {
hlv.Version = newVersion.VersionCAS
hlv.Version = newVersion.Version
return nil
}
// if we get here this is a new version from a different sourceID thus need to move current sourceID to previous versions and update current version
if hlv.PreviousVersions == nil {
hlv.PreviousVersions = make(map[string]uint64)
}
hlv.PreviousVersions[hlv.SourceID] = hlv.Version
hlv.Version = newVersion.VersionCAS
hlv.Version = newVersion.Version
hlv.SourceID = newVersion.SourceID
return nil
}
Expand Down Expand Up @@ -204,10 +213,14 @@ func (hlv *HybridLogicalVector) UnmarshalJSON(inputjson []byte) error {
func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridLogicalVector, error) {
persistedHLV := PersistedHybridLogicalVector{}
var cvCasByteArray []byte
var importCASBytes []byte
var vrsCasByteArray []byte
if hlv.CurrentVersionCAS != 0 {
cvCasByteArray = base.Uint64CASToLittleEndianHex(hlv.CurrentVersionCAS)
}
if hlv.ImportCAS != 0 {
importCASBytes = base.Uint64CASToLittleEndianHex(hlv.ImportCAS)
}
if hlv.Version != 0 {
vrsCasByteArray = base.Uint64CASToLittleEndianHex(hlv.Version)
}
Expand All @@ -222,6 +235,7 @@ func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridL
}

persistedHLV.CurrentVersionCAS = string(cvCasByteArray)
persistedHLV.ImportCAS = string(importCASBytes)
persistedHLV.SourceID = hlv.SourceID
persistedHLV.Version = string(vrsCasByteArray)
persistedHLV.PreviousVersions = pvPersistedFormat
Expand All @@ -231,6 +245,9 @@ func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridL

func (hlv *HybridLogicalVector) convertPersistedHLVToInMemoryHLV(persistedJSON PersistedHybridLogicalVector) {
hlv.CurrentVersionCAS = base.HexCasToUint64(persistedJSON.CurrentVersionCAS)
if persistedJSON.ImportCAS != "" {
hlv.ImportCAS = base.HexCasToUint64(persistedJSON.ImportCAS)
}
hlv.SourceID = persistedJSON.SourceID
// convert the hex cas to uint64 cas
hlv.Version = base.HexCasToUint64(persistedJSON.Version)
Expand Down
105 changes: 99 additions & 6 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
package db

import (
"context"
"reflect"
"strconv"
"strings"
"testing"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -34,14 +37,14 @@ func TestInternalHLVFunctions(t *testing.T) {
const newSource = "s_testsource"

// create a new version vector entry that will error method AddVersion
badNewVector := CurrentVersionVector{
VersionCAS: 123345,
SourceID: currSourceId,
badNewVector := SourceAndVersion{
Version: 123345,
SourceID: currSourceId,
}
// create a new version vector entry that should be added to HLV successfully
newVersionVector := CurrentVersionVector{
VersionCAS: newCAS,
SourceID: currSourceId,
newVersionVector := SourceAndVersion{
Version: newCAS,
SourceID: currSourceId,
}

// Get current version vector, sourceID and CAS pair
Expand Down Expand Up @@ -229,3 +232,93 @@ func TestHybridLogicalVectorPersistence(t *testing.T) {
assert.Equal(t, inMemoryHLV.PreviousVersions, hlvFromPersistance.PreviousVersions)
assert.Equal(t, inMemoryHLV.MergeVersions, hlvFromPersistance.MergeVersions)
}

// Tests import of server-side mutations made by HLV-aware and non-HLV-aware peers
func TestHLVImport(t *testing.T) {

base.SetUpTestLogging(t, base.LevelInfo, base.KeyMigrate, base.KeyImport)

db, ctx := setupTestDB(t)
defer db.Close(ctx)

collection := GetSingleDatabaseCollectionWithUser(t, db)
localSource := collection.dbCtx.BucketUUID

// 1. Test standard import of an SDK write
standardImportKey := "standardImport_" + t.Name()
standardImportBody := []byte(`{"prop":"value"}`)
cas, err := collection.dataStore.WriteCas(standardImportKey, 0, 0, 0, standardImportBody, sgbucket.Raw)
require.NoError(t, err, "write error")
_, err = collection.ImportDocRaw(ctx, standardImportKey, standardImportBody, nil, nil, false, cas, nil, ImportFromFeed)
require.NoError(t, err, "import error")

importedDoc, _, err := collection.GetDocWithXattr(ctx, standardImportKey, DocUnmarshalAll)
require.NoError(t, err)
importedHLV := importedDoc.HLV
require.Equal(t, cas, importedHLV.ImportCAS)
require.Equal(t, importedDoc.Cas, importedHLV.CurrentVersionCAS)
require.Equal(t, importedDoc.Cas, importedHLV.Version)
require.Equal(t, localSource, importedHLV.SourceID)

// 2. Test import of write by HLV-aware peer (HLV is already updated, sync metadata is not).
otherSource := "otherSource"
hlvHelper := NewHLVAgent(t, collection.dataStore, otherSource, "_sync")
existingHLVKey := "existingHLV_" + t.Name()
_ = hlvHelper.insertWithHLV(ctx, existingHLVKey)

var existingBody, existingXattr []byte
cas, err = collection.dataStore.GetWithXattr(ctx, existingHLVKey, "_sync", "", &existingBody, &existingXattr, nil)
require.NoError(t, err)

_, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattr, nil, false, cas, nil, ImportFromFeed)
require.NoError(t, err, "import error")

importedDoc, _, err = collection.GetDocWithXattr(ctx, existingHLVKey, DocUnmarshalAll)
require.NoError(t, err)
importedHLV = importedDoc.HLV
// cas in the HLV's current version and cvCAS should not have changed, and should match importCAS
require.Equal(t, cas, importedHLV.ImportCAS)
require.Equal(t, cas, importedHLV.CurrentVersionCAS)
require.Equal(t, cas, importedHLV.Version)
require.Equal(t, otherSource, importedHLV.SourceID)
}

// HLVAgent performs HLV updates directly (not via SG) for simulating/testing interaction with non-SG HLV agents
type HLVAgent struct {
t *testing.T
datastore base.DataStore
source string // All writes by the HLVHelper are done as this source
xattrName string // xattr name to store the HLV
}

var defaultHelperBody = map[string]interface{}{"version": 1}

func NewHLVAgent(t *testing.T, datastore base.DataStore, source string, xattrName string) *HLVAgent {
return &HLVAgent{
t: t,
datastore: datastore,
source: source, // all writes by the HLVHelper are done as this source
xattrName: xattrName,
}
}

// insertWithHLV inserts a new document into the bucket with a populated HLV (matching a write from
// a different HLV-aware peer)
func (h *HLVAgent) insertWithHLV(ctx context.Context, key string) (casOut uint64) {
hlv := &HybridLogicalVector{}
err := hlv.AddVersion(CreateVersion(h.source, hlvExpandMacroCASValue))
require.NoError(h.t, err)
hlv.CurrentVersionCAS = hlvExpandMacroCASValue

syncData := &SyncData{HLV: hlv}
syncDataBytes, err := base.JSONMarshal(syncData)
require.NoError(h.t, err)

mutateInOpts := &sgbucket.MutateInOptions{
MacroExpansion: hlv.computeMacroExpansions(),
}

cas, err := h.datastore.WriteCasWithXattr(ctx, key, h.xattrName, 0, 0, defaultHelperBody, syncDataBytes, mutateInOpts)
require.NoError(h.t, err)
return cas
}
4 changes: 2 additions & 2 deletions db/revision_cache_bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (rc *BypassRevisionCache) GetWithRev(ctx context.Context, docID, revID stri
}

// GetWithCV fetches the Current Version for the given docID and CV immediately from the bucket.
func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, collectionID uint32, includeDelta bool) (docRev DocumentRevision, err error) {
func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, collectionID uint32, includeDelta bool) (docRev DocumentRevision, err error) {

docRev = DocumentRevision{
CV: cv,
Expand Down Expand Up @@ -113,7 +113,7 @@ func (rc *BypassRevisionCache) RemoveWithRev(docID, revID string, collectionID u
// no-op
}

func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *CurrentVersionVector, collectionID uint32) {
func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *SourceAndVersion, collectionID uint32) {
// no-op
}

Expand Down
Loading

0 comments on commit ef8cc13

Please sign in to comment.