Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3503 Update HLV on import #6572

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,19 +879,37 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU
case ExistingVersion:
// preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space
case Import:
// work to be done to decide if the VV needs updating here, pending CBG-3503
if d.HLV.CurrentVersionCAS == d.Cas {
// if cvCAS = document CAS, the HLV has already been updated for this mutation by another HLV-aware peer.
// Set ImportCAS to the previous document CAS, but don't otherwise modify HLV
d.HLV.ImportCAS = d.Cas
} else {
// Otherwise this is an SDK mutation made by the local cluster that should be added to HLV.
newVVEntry := SourceAndVersion{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.Version = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
}
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = d.Cas
}

case NewVersion, ExistingVersionWithUpdateToHLV:
// add a new entry to the version vector
newVVEntry := CurrentVersionVector{}
newVVEntry := SourceAndVersion{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.VersionCAS = hlvExpandMacroCASValue
newVVEntry.Version = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
}
// update the cvCAS on the SGWrite event too
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space
}
return d, nil
}
Expand Down Expand Up @@ -2062,7 +2080,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
Expiry: doc.Expiry,
Deleted: doc.History[newRevID].Deleted,
_shallowCopyBody: storedDoc.Body(ctx),
CV: &CurrentVersionVector{VersionCAS: doc.HLV.Version, SourceID: doc.HLV.SourceID},
CV: &SourceAndVersion{Version: doc.HLV.Version, SourceID: doc.HLV.SourceID},
}

if createNewRevIDSkipped {
Expand Down
4 changes: 2 additions & 2 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,14 +1239,14 @@ func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) {
}

// HasCurrentVersion Compares the specified CV with the fetched documents CV, returns error on mismatch between the two
func (d *Document) HasCurrentVersion(cv CurrentVersionVector) error {
func (d *Document) HasCurrentVersion(cv SourceAndVersion) error {
if d.HLV == nil {
return base.RedactErrorf("no HLV present in fetched doc %s", base.UD(d.ID))
}

// fetch the current version for the loaded doc and compare against the CV specified in the IDandCV key
fetchedDocSource, fetchedDocVersion := d.HLV.GetCurrentVersion()
if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.VersionCAS {
if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.Version {
return base.RedactErrorf("mismatch between specified current version and fetched document current version for doc %s", base.UD(d.ID))
}
return nil
Expand Down
41 changes: 29 additions & 12 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,31 @@ const hlvExpandMacroCASValue = math.MaxUint64

type HybridLogicalVector struct {
CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS at the time of replication
ImportCAS uint64 // Set when an import modifies the document CAS but preserves the HLV (import of a version replicated by XDCR)
SourceID string // source bucket uuid of where this entry originated from
Version uint64 // current cas of the current version on the version vector
MergeVersions map[string]uint64 // map of merge versions for fast efficient lookup
PreviousVersions map[string]uint64 // map of previous versions for fast efficient lookup
}

// CurrentVersionVector is a structure used to add a new sourceID:CAS entry to a HLV
type CurrentVersionVector struct {
VersionCAS uint64
SourceID string
// SourceAndVersion is a structure used to add a new entry to a HLV
type SourceAndVersion struct {
SourceID string
Version uint64
}

func CreateVersion(source string, version uint64) SourceAndVersion {
return SourceAndVersion{
SourceID: source,
Version: version,
}
}

type PersistedHybridLogicalVector struct {
CurrentVersionCAS string `json:"cvCas,omitempty"`
SourceID string `json:"src,omitempty"`
Version string `json:"vrs,omitempty"`
ImportCAS string `json:"importCAS,omitempty"`
SourceID string `json:"src"`
Version string `json:"vrs"`
MergeVersions map[string]string `json:"mv,omitempty"`
PreviousVersions map[string]string `json:"pv,omitempty"`
}
Expand Down Expand Up @@ -66,27 +75,27 @@ func (hlv *HybridLogicalVector) IsInConflict(otherVector HybridLogicalVector) bo

// AddVersion adds a version vector to the in memory representation of a HLV and moves current version vector to
// previous versions on the HLV if needed
func (hlv *HybridLogicalVector) AddVersion(newVersion CurrentVersionVector) error {
if newVersion.VersionCAS < hlv.Version {
return fmt.Errorf("attempting to add new verison vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.VersionCAS)
func (hlv *HybridLogicalVector) AddVersion(newVersion SourceAndVersion) error {
if newVersion.Version < hlv.Version {
return fmt.Errorf("attempting to add new verison vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.Version)
}
// check if this is the first time we're adding a source - version pair
if hlv.SourceID == "" {
hlv.Version = newVersion.VersionCAS
hlv.Version = newVersion.Version
hlv.SourceID = newVersion.SourceID
return nil
}
// if new entry has the same source we simple just update the version
if newVersion.SourceID == hlv.SourceID {
hlv.Version = newVersion.VersionCAS
hlv.Version = newVersion.Version
return nil
}
// if we get here this is a new version from a different sourceID thus need to move current sourceID to previous versions and update current version
if hlv.PreviousVersions == nil {
hlv.PreviousVersions = make(map[string]uint64)
}
hlv.PreviousVersions[hlv.SourceID] = hlv.Version
hlv.Version = newVersion.VersionCAS
hlv.Version = newVersion.Version
hlv.SourceID = newVersion.SourceID
return nil
}
Expand Down Expand Up @@ -204,10 +213,14 @@ func (hlv *HybridLogicalVector) UnmarshalJSON(inputjson []byte) error {
func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridLogicalVector, error) {
persistedHLV := PersistedHybridLogicalVector{}
var cvCasByteArray []byte
var importCASBytes []byte
var vrsCasByteArray []byte
if hlv.CurrentVersionCAS != 0 {
cvCasByteArray = base.Uint64CASToLittleEndianHex(hlv.CurrentVersionCAS)
}
if hlv.ImportCAS != 0 {
importCASBytes = base.Uint64CASToLittleEndianHex(hlv.ImportCAS)
}
if hlv.Version != 0 {
vrsCasByteArray = base.Uint64CASToLittleEndianHex(hlv.Version)
}
Expand All @@ -222,6 +235,7 @@ func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridL
}

persistedHLV.CurrentVersionCAS = string(cvCasByteArray)
persistedHLV.ImportCAS = string(importCASBytes)
persistedHLV.SourceID = hlv.SourceID
persistedHLV.Version = string(vrsCasByteArray)
persistedHLV.PreviousVersions = pvPersistedFormat
Expand All @@ -231,6 +245,9 @@ func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridL

func (hlv *HybridLogicalVector) convertPersistedHLVToInMemoryHLV(persistedJSON PersistedHybridLogicalVector) {
hlv.CurrentVersionCAS = base.HexCasToUint64(persistedJSON.CurrentVersionCAS)
if persistedJSON.ImportCAS != "" {
hlv.ImportCAS = base.HexCasToUint64(persistedJSON.ImportCAS)
}
hlv.SourceID = persistedJSON.SourceID
// convert the hex cas to uint64 cas
hlv.Version = base.HexCasToUint64(persistedJSON.Version)
Expand Down
105 changes: 99 additions & 6 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
package db

import (
"context"
"reflect"
"strconv"
"strings"
"testing"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -34,14 +37,14 @@ func TestInternalHLVFunctions(t *testing.T) {
const newSource = "s_testsource"

// create a new version vector entry that will error method AddVersion
badNewVector := CurrentVersionVector{
VersionCAS: 123345,
SourceID: currSourceId,
badNewVector := SourceAndVersion{
Version: 123345,
SourceID: currSourceId,
}
// create a new version vector entry that should be added to HLV successfully
newVersionVector := CurrentVersionVector{
VersionCAS: newCAS,
SourceID: currSourceId,
newVersionVector := SourceAndVersion{
Version: newCAS,
SourceID: currSourceId,
}

// Get current version vector, sourceID and CAS pair
Expand Down Expand Up @@ -229,3 +232,93 @@ func TestHybridLogicalVectorPersistence(t *testing.T) {
assert.Equal(t, inMemoryHLV.PreviousVersions, hlvFromPersistance.PreviousVersions)
assert.Equal(t, inMemoryHLV.MergeVersions, hlvFromPersistance.MergeVersions)
}

// Tests import of server-side mutations made by HLV-aware and non-HLV-aware peers
func TestHLVImport(t *testing.T) {

base.SetUpTestLogging(t, base.LevelInfo, base.KeyMigrate, base.KeyImport)

db, ctx := setupTestDB(t)
defer db.Close(ctx)

collection := GetSingleDatabaseCollectionWithUser(t, db)
localSource := collection.dbCtx.BucketUUID

// 1. Test standard import of an SDK write
standardImportKey := "standardImport_" + t.Name()
standardImportBody := []byte(`{"prop":"value"}`)
cas, err := collection.dataStore.WriteCas(standardImportKey, 0, 0, 0, standardImportBody, sgbucket.Raw)
require.NoError(t, err, "write error")
_, err = collection.ImportDocRaw(ctx, standardImportKey, standardImportBody, nil, nil, false, cas, nil, ImportFromFeed)
require.NoError(t, err, "import error")

importedDoc, _, err := collection.GetDocWithXattr(ctx, standardImportKey, DocUnmarshalAll)
require.NoError(t, err)
importedHLV := importedDoc.HLV
require.Equal(t, cas, importedHLV.ImportCAS)
require.Equal(t, importedDoc.Cas, importedHLV.CurrentVersionCAS)
require.Equal(t, importedDoc.Cas, importedHLV.Version)
require.Equal(t, localSource, importedHLV.SourceID)

// 2. Test import of write by HLV-aware peer (HLV is already updated, sync metadata is not).
otherSource := "otherSource"
hlvHelper := NewHLVAgent(t, collection.dataStore, otherSource, "_sync")
existingHLVKey := "existingHLV_" + t.Name()
_ = hlvHelper.insertWithHLV(ctx, existingHLVKey)

var existingBody, existingXattr []byte
cas, err = collection.dataStore.GetWithXattr(ctx, existingHLVKey, "_sync", "", &existingBody, &existingXattr, nil)
require.NoError(t, err)

_, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattr, nil, false, cas, nil, ImportFromFeed)
require.NoError(t, err, "import error")

importedDoc, _, err = collection.GetDocWithXattr(ctx, existingHLVKey, DocUnmarshalAll)
require.NoError(t, err)
importedHLV = importedDoc.HLV
// cas in the HLV's current version and cvCAS should not have changed, and should match importCAS
require.Equal(t, cas, importedHLV.ImportCAS)
require.Equal(t, cas, importedHLV.CurrentVersionCAS)
require.Equal(t, cas, importedHLV.Version)
require.Equal(t, otherSource, importedHLV.SourceID)
}

// HLVAgent performs HLV updates directly (not via SG) for simulating/testing interaction with non-SG HLV agents
type HLVAgent struct {
t *testing.T
datastore base.DataStore
source string // All writes by the HLVHelper are done as this source
xattrName string // xattr name to store the HLV
}

var defaultHelperBody = map[string]interface{}{"version": 1}

func NewHLVAgent(t *testing.T, datastore base.DataStore, source string, xattrName string) *HLVAgent {
return &HLVAgent{
t: t,
datastore: datastore,
source: source, // all writes by the HLVHelper are done as this source
xattrName: xattrName,
}
}

// insertWithHLV inserts a new document into the bucket with a populated HLV (matching a write from
// a different HLV-aware peer)
func (h *HLVAgent) insertWithHLV(ctx context.Context, key string) (casOut uint64) {
hlv := &HybridLogicalVector{}
err := hlv.AddVersion(CreateVersion(h.source, hlvExpandMacroCASValue))
require.NoError(h.t, err)
hlv.CurrentVersionCAS = hlvExpandMacroCASValue

syncData := &SyncData{HLV: hlv}
syncDataBytes, err := base.JSONMarshal(syncData)
require.NoError(h.t, err)

mutateInOpts := &sgbucket.MutateInOptions{
MacroExpansion: hlv.computeMacroExpansions(),
}

cas, err := h.datastore.WriteCasWithXattr(ctx, key, h.xattrName, 0, 0, defaultHelperBody, syncDataBytes, mutateInOpts)
require.NoError(h.t, err)
return cas
}
4 changes: 2 additions & 2 deletions db/revision_cache_bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (rc *BypassRevisionCache) GetWithRev(ctx context.Context, docID, revID stri
}

// GetWithCV fetches the Current Version for the given docID and CV immediately from the bucket.
func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (docRev DocumentRevision, err error) {
func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, includeBody, includeDelta bool) (docRev DocumentRevision, err error) {

unmarshalLevel := DocUnmarshalSync
if includeBody {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (rc *BypassRevisionCache) RemoveWithRev(docID, revID string) {
// nop
}

func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *CurrentVersionVector) {
func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *SourceAndVersion) {
// nop
}

Expand Down
20 changes: 10 additions & 10 deletions db/revision_cache_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type RevisionCache interface {
// GetWithCV returns the given revision by CV, and stores if not already cached.
// When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body.
// When includeDelta=true, the returned DocumentRevision will include delta - requires additional locking during retrieval.
GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (DocumentRevision, error)
GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, includeBody, includeDelta bool) (DocumentRevision, error)

// GetActive returns the current revision for the given doc ID, and stores if not already cached.
// When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body.
Expand All @@ -55,7 +55,7 @@ type RevisionCache interface {
RemoveWithRev(docID, revID string)

// RemoveWithCV evicts a revision from the cache using its current version.
RemoveWithCV(docID string, cv *CurrentVersionVector)
RemoveWithCV(docID string, cv *SourceAndVersion)

// UpdateDelta stores the given toDelta value in the given rev if cached
UpdateDelta(ctx context.Context, docID, revID string, toDelta RevisionDelta)
Expand Down Expand Up @@ -128,7 +128,7 @@ type DocumentRevision struct {
Delta *RevisionDelta
Deleted bool
Removed bool // True if the revision is a removal.
CV *CurrentVersionVector
CV *SourceAndVersion

_shallowCopyBody Body // an unmarshalled body that can produce shallow copies
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func newRevCacheDelta(deltaBytes []byte, fromRevID string, toRevision DocumentRe

// This is the RevisionCacheLoaderFunc callback for the context's RevisionCache.
// Its job is to load a revision from the bucket when there's a cache miss.
func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, id IDAndRev, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *CurrentVersionVector, err error) {
func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, id IDAndRev, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *SourceAndVersion, err error) {
var doc *Document
unmarshalLevel := DocUnmarshalSync
if unmarshalBody {
Expand All @@ -278,9 +278,9 @@ func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore,
// revCacheLoaderForCv will load a document from the bucket using the CV, comapre the fetched doc and the CV specified in the function,
// and will still return revid for purpose of populating the Rev ID lookup map on the cache
func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingStore, id IDandCV, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
cv := CurrentVersionVector{
VersionCAS: id.Version,
SourceID: id.Source,
cv := SourceAndVersion{
Version: id.Version,
SourceID: id.Source,
}
var doc *Document
unmarshalLevel := DocUnmarshalSync
Expand All @@ -295,7 +295,7 @@ func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingS
}

// Common revCacheLoader functionality used either during a cache miss (from revCacheLoader), or directly when retrieving current rev from cache
func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, revid string) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *CurrentVersionVector, err error) {
func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, revid string) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *SourceAndVersion, err error) {
if bodyBytes, body, attachments, err = backingStore.getRevision(ctx, doc, revid); err != nil {
// If we can't find the revision (either as active or conflicted body from the document, or as old revision body backup), check whether
// the revision was a channel removal. If so, we want to store as removal in the revision cache
Expand All @@ -320,14 +320,14 @@ func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBa
history = encodeRevisions(ctx, doc.ID, validatedHistory)
channels = doc.History[revid].Channels
if doc.HLV != nil {
fetchedCV = &CurrentVersionVector{SourceID: doc.HLV.SourceID, VersionCAS: doc.HLV.Version}
fetchedCV = &SourceAndVersion{SourceID: doc.HLV.SourceID, Version: doc.HLV.Version}
}

return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, fetchedCV, err
}

// revCacheLoaderForDocumentCV used either during cache miss (from revCacheLoaderForCv), or used directly when getting current active CV from cache
func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv CurrentVersionVector) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv SourceAndVersion) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
if bodyBytes, body, attachments, err = backingStore.getCurrentVersion(ctx, doc); err != nil {
// we need implementation of IsChannelRemoval for CV here.
// pending CBG-3213 support of channel removal for CV
Expand Down
Loading