diff --git a/db/crud.go b/db/crud.go index fabe70e218..dc4430753b 100644 --- a/db/crud.go +++ b/db/crud.go @@ -876,19 +876,37 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU case ExistingVersion: // preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue + d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space case Import: - // work to be done to decide if the VV needs updating here, pending CBG-3503 + if d.HLV.CurrentVersionCAS == d.Cas { + // if cvCAS = document CAS, the HLV has already been updated for this mutation by another HLV-aware peer. + // Set ImportCAS to the previous document CAS, but don't otherwise modify HLV + d.HLV.ImportCAS = d.Cas + } else { + // Otherwise this is an SDK mutation made by the local cluster that should be added to HLV. + newVVEntry := SourceAndVersion{} + newVVEntry.SourceID = db.dbCtx.BucketUUID + newVVEntry.Version = hlvExpandMacroCASValue + err := d.SyncData.HLV.AddVersion(newVVEntry) + if err != nil { + return nil, err + } + d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue + d.HLV.ImportCAS = d.Cas + } + case NewVersion, ExistingVersionWithUpdateToHLV: // add a new entry to the version vector - newVVEntry := CurrentVersionVector{} + newVVEntry := SourceAndVersion{} newVVEntry.SourceID = db.dbCtx.BucketUUID - newVVEntry.VersionCAS = hlvExpandMacroCASValue + newVVEntry.Version = hlvExpandMacroCASValue err := d.SyncData.HLV.AddVersion(newVVEntry) if err != nil { return nil, err } // update the cvCAS on the SGWrite event too d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue + d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space } return d, nil } @@ -2059,7 +2077,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do Expiry: doc.Expiry, Deleted: doc.History[newRevID].Deleted, _shallowCopyBody: storedDoc.Body(ctx), - CV: &CurrentVersionVector{VersionCAS: doc.HLV.Version, SourceID: doc.HLV.SourceID}, + CV: &SourceAndVersion{Version: doc.HLV.Version, SourceID: doc.HLV.SourceID}, } if createNewRevIDSkipped { diff --git a/db/document.go b/db/document.go index 94fc62ad48..b00fcbb0b6 100644 --- a/db/document.go +++ b/db/document.go @@ -1239,14 +1239,14 @@ func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) { } // HasCurrentVersion Compares the specified CV with the fetched documents CV, returns error on mismatch between the two -func (d *Document) HasCurrentVersion(cv CurrentVersionVector) error { +func (d *Document) HasCurrentVersion(cv SourceAndVersion) error { if d.HLV == nil { return base.RedactErrorf("no HLV present in fetched doc %s", base.UD(d.ID)) } // fetch the current version for the loaded doc and compare against the CV specified in the IDandCV key fetchedDocSource, fetchedDocVersion := d.HLV.GetCurrentVersion() - if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.VersionCAS { + if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.Version { return base.RedactErrorf("mismatch between specified current version and fetched document current version for doc %s", base.UD(d.ID)) } return nil diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 433e4bbd2c..39c168e87a 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -21,22 +21,31 @@ const hlvExpandMacroCASValue = math.MaxUint64 type HybridLogicalVector struct { CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS at the time of replication + ImportCAS uint64 // Set when an import modifies the document CAS but preserves the HLV (import of a version replicated by XDCR) SourceID string // source bucket uuid of where this entry originated from Version uint64 // current cas of the current version on the version vector MergeVersions map[string]uint64 // map of merge versions for fast efficient lookup PreviousVersions map[string]uint64 // map of previous versions for fast efficient lookup } -// CurrentVersionVector is a structure used to add a new sourceID:CAS entry to a HLV -type CurrentVersionVector struct { - VersionCAS uint64 - SourceID string +// SourceAndVersion is a structure used to add a new entry to a HLV +type SourceAndVersion struct { + SourceID string + Version uint64 +} + +func CreateVersion(source string, version uint64) SourceAndVersion { + return SourceAndVersion{ + SourceID: source, + Version: version, + } } type PersistedHybridLogicalVector struct { CurrentVersionCAS string `json:"cvCas,omitempty"` - SourceID string `json:"src,omitempty"` - Version string `json:"vrs,omitempty"` + ImportCAS string `json:"importCAS,omitempty"` + SourceID string `json:"src"` + Version string `json:"vrs"` MergeVersions map[string]string `json:"mv,omitempty"` PreviousVersions map[string]string `json:"pv,omitempty"` } @@ -66,19 +75,19 @@ func (hlv *HybridLogicalVector) IsInConflict(otherVector HybridLogicalVector) bo // AddVersion adds a version vector to the in memory representation of a HLV and moves current version vector to // previous versions on the HLV if needed -func (hlv *HybridLogicalVector) AddVersion(newVersion CurrentVersionVector) error { - if newVersion.VersionCAS < hlv.Version { - return fmt.Errorf("attempting to add new verison vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.VersionCAS) +func (hlv *HybridLogicalVector) AddVersion(newVersion SourceAndVersion) error { + if newVersion.Version < hlv.Version { + return fmt.Errorf("attempting to add new verison vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.Version) } // check if this is the first time we're adding a source - version pair if hlv.SourceID == "" { - hlv.Version = newVersion.VersionCAS + hlv.Version = newVersion.Version hlv.SourceID = newVersion.SourceID return nil } // if new entry has the same source we simple just update the version if newVersion.SourceID == hlv.SourceID { - hlv.Version = newVersion.VersionCAS + hlv.Version = newVersion.Version return nil } // if we get here this is a new version from a different sourceID thus need to move current sourceID to previous versions and update current version @@ -86,7 +95,7 @@ func (hlv *HybridLogicalVector) AddVersion(newVersion CurrentVersionVector) erro hlv.PreviousVersions = make(map[string]uint64) } hlv.PreviousVersions[hlv.SourceID] = hlv.Version - hlv.Version = newVersion.VersionCAS + hlv.Version = newVersion.Version hlv.SourceID = newVersion.SourceID return nil } @@ -204,10 +213,14 @@ func (hlv *HybridLogicalVector) UnmarshalJSON(inputjson []byte) error { func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridLogicalVector, error) { persistedHLV := PersistedHybridLogicalVector{} var cvCasByteArray []byte + var importCASBytes []byte var vrsCasByteArray []byte if hlv.CurrentVersionCAS != 0 { cvCasByteArray = base.Uint64CASToLittleEndianHex(hlv.CurrentVersionCAS) } + if hlv.ImportCAS != 0 { + importCASBytes = base.Uint64CASToLittleEndianHex(hlv.ImportCAS) + } if hlv.Version != 0 { vrsCasByteArray = base.Uint64CASToLittleEndianHex(hlv.Version) } @@ -222,6 +235,7 @@ func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridL } persistedHLV.CurrentVersionCAS = string(cvCasByteArray) + persistedHLV.ImportCAS = string(importCASBytes) persistedHLV.SourceID = hlv.SourceID persistedHLV.Version = string(vrsCasByteArray) persistedHLV.PreviousVersions = pvPersistedFormat @@ -231,6 +245,9 @@ func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridL func (hlv *HybridLogicalVector) convertPersistedHLVToInMemoryHLV(persistedJSON PersistedHybridLogicalVector) { hlv.CurrentVersionCAS = base.HexCasToUint64(persistedJSON.CurrentVersionCAS) + if persistedJSON.ImportCAS != "" { + hlv.ImportCAS = base.HexCasToUint64(persistedJSON.ImportCAS) + } hlv.SourceID = persistedJSON.SourceID // convert the hex cas to uint64 cas hlv.Version = base.HexCasToUint64(persistedJSON.Version) diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index cc1fe07d8e..029baef512 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -9,11 +9,14 @@ package db import ( + "context" "reflect" "strconv" "strings" "testing" + sgbucket "github.com/couchbase/sg-bucket" + "github.com/couchbase/sync_gateway/base" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -34,14 +37,14 @@ func TestInternalHLVFunctions(t *testing.T) { const newSource = "s_testsource" // create a new version vector entry that will error method AddVersion - badNewVector := CurrentVersionVector{ - VersionCAS: 123345, - SourceID: currSourceId, + badNewVector := SourceAndVersion{ + Version: 123345, + SourceID: currSourceId, } // create a new version vector entry that should be added to HLV successfully - newVersionVector := CurrentVersionVector{ - VersionCAS: newCAS, - SourceID: currSourceId, + newVersionVector := SourceAndVersion{ + Version: newCAS, + SourceID: currSourceId, } // Get current version vector, sourceID and CAS pair @@ -229,3 +232,93 @@ func TestHybridLogicalVectorPersistence(t *testing.T) { assert.Equal(t, inMemoryHLV.PreviousVersions, hlvFromPersistance.PreviousVersions) assert.Equal(t, inMemoryHLV.MergeVersions, hlvFromPersistance.MergeVersions) } + +// Tests import of server-side mutations made by HLV-aware and non-HLV-aware peers +func TestHLVImport(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelInfo, base.KeyMigrate, base.KeyImport) + + db, ctx := setupTestDB(t) + defer db.Close(ctx) + + collection := GetSingleDatabaseCollectionWithUser(t, db) + localSource := collection.dbCtx.BucketUUID + + // 1. Test standard import of an SDK write + standardImportKey := "standardImport_" + t.Name() + standardImportBody := []byte(`{"prop":"value"}`) + cas, err := collection.dataStore.WriteCas(standardImportKey, 0, 0, 0, standardImportBody, sgbucket.Raw) + require.NoError(t, err, "write error") + _, err = collection.ImportDocRaw(ctx, standardImportKey, standardImportBody, nil, nil, false, cas, nil, ImportFromFeed) + require.NoError(t, err, "import error") + + importedDoc, _, err := collection.GetDocWithXattr(ctx, standardImportKey, DocUnmarshalAll) + require.NoError(t, err) + importedHLV := importedDoc.HLV + require.Equal(t, cas, importedHLV.ImportCAS) + require.Equal(t, importedDoc.Cas, importedHLV.CurrentVersionCAS) + require.Equal(t, importedDoc.Cas, importedHLV.Version) + require.Equal(t, localSource, importedHLV.SourceID) + + // 2. Test import of write by HLV-aware peer (HLV is already updated, sync metadata is not). + otherSource := "otherSource" + hlvHelper := NewHLVAgent(t, collection.dataStore, otherSource, "_sync") + existingHLVKey := "existingHLV_" + t.Name() + _ = hlvHelper.insertWithHLV(ctx, existingHLVKey) + + var existingBody, existingXattr []byte + cas, err = collection.dataStore.GetWithXattr(ctx, existingHLVKey, "_sync", "", &existingBody, &existingXattr, nil) + require.NoError(t, err) + + _, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattr, nil, false, cas, nil, ImportFromFeed) + require.NoError(t, err, "import error") + + importedDoc, _, err = collection.GetDocWithXattr(ctx, existingHLVKey, DocUnmarshalAll) + require.NoError(t, err) + importedHLV = importedDoc.HLV + // cas in the HLV's current version and cvCAS should not have changed, and should match importCAS + require.Equal(t, cas, importedHLV.ImportCAS) + require.Equal(t, cas, importedHLV.CurrentVersionCAS) + require.Equal(t, cas, importedHLV.Version) + require.Equal(t, otherSource, importedHLV.SourceID) +} + +// HLVAgent performs HLV updates directly (not via SG) for simulating/testing interaction with non-SG HLV agents +type HLVAgent struct { + t *testing.T + datastore base.DataStore + source string // All writes by the HLVHelper are done as this source + xattrName string // xattr name to store the HLV +} + +var defaultHelperBody = map[string]interface{}{"version": 1} + +func NewHLVAgent(t *testing.T, datastore base.DataStore, source string, xattrName string) *HLVAgent { + return &HLVAgent{ + t: t, + datastore: datastore, + source: source, // all writes by the HLVHelper are done as this source + xattrName: xattrName, + } +} + +// insertWithHLV inserts a new document into the bucket with a populated HLV (matching a write from +// a different HLV-aware peer) +func (h *HLVAgent) insertWithHLV(ctx context.Context, key string) (casOut uint64) { + hlv := &HybridLogicalVector{} + err := hlv.AddVersion(CreateVersion(h.source, hlvExpandMacroCASValue)) + require.NoError(h.t, err) + hlv.CurrentVersionCAS = hlvExpandMacroCASValue + + syncData := &SyncData{HLV: hlv} + syncDataBytes, err := base.JSONMarshal(syncData) + require.NoError(h.t, err) + + mutateInOpts := &sgbucket.MutateInOptions{ + MacroExpansion: hlv.computeMacroExpansions(), + } + + cas, err := h.datastore.WriteCasWithXattr(ctx, key, h.xattrName, 0, 0, defaultHelperBody, syncDataBytes, mutateInOpts) + require.NoError(h.t, err) + return cas +} diff --git a/db/revision_cache_bypass.go b/db/revision_cache_bypass.go index 1b05788870..978df65a0f 100644 --- a/db/revision_cache_bypass.go +++ b/db/revision_cache_bypass.go @@ -56,7 +56,7 @@ func (rc *BypassRevisionCache) GetWithRev(ctx context.Context, docID, revID stri } // GetWithCV fetches the Current Version for the given docID and CV immediately from the bucket. -func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (docRev DocumentRevision, err error) { +func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, includeBody, includeDelta bool) (docRev DocumentRevision, err error) { unmarshalLevel := DocUnmarshalSync if includeBody { @@ -126,7 +126,7 @@ func (rc *BypassRevisionCache) RemoveWithRev(docID, revID string) { // nop } -func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *CurrentVersionVector) { +func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *SourceAndVersion) { // nop } diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index e50ba72f98..4d7ffd43dd 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -36,7 +36,7 @@ type RevisionCache interface { // GetWithCV returns the given revision by CV, and stores if not already cached. // When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body. // When includeDelta=true, the returned DocumentRevision will include delta - requires additional locking during retrieval. - GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (DocumentRevision, error) + GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, includeBody, includeDelta bool) (DocumentRevision, error) // GetActive returns the current revision for the given doc ID, and stores if not already cached. // When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body. @@ -55,7 +55,7 @@ type RevisionCache interface { RemoveWithRev(docID, revID string) // RemoveWithCV evicts a revision from the cache using its current version. - RemoveWithCV(docID string, cv *CurrentVersionVector) + RemoveWithCV(docID string, cv *SourceAndVersion) // UpdateDelta stores the given toDelta value in the given rev if cached UpdateDelta(ctx context.Context, docID, revID string, toDelta RevisionDelta) @@ -128,7 +128,7 @@ type DocumentRevision struct { Delta *RevisionDelta Deleted bool Removed bool // True if the revision is a removal. - CV *CurrentVersionVector + CV *SourceAndVersion _shallowCopyBody Body // an unmarshalled body that can produce shallow copies } @@ -262,7 +262,7 @@ func newRevCacheDelta(deltaBytes []byte, fromRevID string, toRevision DocumentRe // This is the RevisionCacheLoaderFunc callback for the context's RevisionCache. // Its job is to load a revision from the bucket when there's a cache miss. -func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, id IDAndRev, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *CurrentVersionVector, err error) { +func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, id IDAndRev, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *SourceAndVersion, err error) { var doc *Document unmarshalLevel := DocUnmarshalSync if unmarshalBody { @@ -278,9 +278,9 @@ func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, // revCacheLoaderForCv will load a document from the bucket using the CV, comapre the fetched doc and the CV specified in the function, // and will still return revid for purpose of populating the Rev ID lookup map on the cache func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingStore, id IDandCV, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) { - cv := CurrentVersionVector{ - VersionCAS: id.Version, - SourceID: id.Source, + cv := SourceAndVersion{ + Version: id.Version, + SourceID: id.Source, } var doc *Document unmarshalLevel := DocUnmarshalSync @@ -295,7 +295,7 @@ func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingS } // Common revCacheLoader functionality used either during a cache miss (from revCacheLoader), or directly when retrieving current rev from cache -func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, revid string) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *CurrentVersionVector, err error) { +func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, revid string) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *SourceAndVersion, err error) { if bodyBytes, body, attachments, err = backingStore.getRevision(ctx, doc, revid); err != nil { // If we can't find the revision (either as active or conflicted body from the document, or as old revision body backup), check whether // the revision was a channel removal. If so, we want to store as removal in the revision cache @@ -320,14 +320,14 @@ func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBa history = encodeRevisions(ctx, doc.ID, validatedHistory) channels = doc.History[revid].Channels if doc.HLV != nil { - fetchedCV = &CurrentVersionVector{SourceID: doc.HLV.SourceID, VersionCAS: doc.HLV.Version} + fetchedCV = &SourceAndVersion{SourceID: doc.HLV.SourceID, Version: doc.HLV.Version} } return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, fetchedCV, err } // revCacheLoaderForDocumentCV used either during cache miss (from revCacheLoaderForCv), or used directly when getting current active CV from cache -func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv CurrentVersionVector) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) { +func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv SourceAndVersion) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) { if bodyBytes, body, attachments, err = backingStore.getCurrentVersion(ctx, doc); err != nil { // we need implementation of IsChannelRemoval for CV here. // pending CBG-3213 support of channel removal for CV diff --git a/db/revision_cache_lru.go b/db/revision_cache_lru.go index 32d78d7613..450cd4250a 100644 --- a/db/revision_cache_lru.go +++ b/db/revision_cache_lru.go @@ -49,7 +49,7 @@ func (sc *ShardedLRURevisionCache) GetWithRev(ctx context.Context, docID, revID return sc.getShard(docID).GetWithRev(ctx, docID, revID, includeBody, includeDelta) } -func (sc *ShardedLRURevisionCache) GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (docRev DocumentRevision, err error) { +func (sc *ShardedLRURevisionCache) GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, includeBody, includeDelta bool) (docRev DocumentRevision, err error) { return sc.getShard(docID).GetWithCV(ctx, docID, cv, includeBody, includeDelta) } @@ -77,7 +77,7 @@ func (sc *ShardedLRURevisionCache) RemoveWithRev(docID, revID string) { sc.getShard(docID).RemoveWithRev(docID, revID) } -func (sc *ShardedLRURevisionCache) RemoveWithCV(docID string, cv *CurrentVersionVector) { +func (sc *ShardedLRURevisionCache) RemoveWithCV(docID string, cv *SourceAndVersion) { sc.getShard(docID).RemoveWithCV(docID, cv) } @@ -103,7 +103,7 @@ type revCacheValue struct { delta *RevisionDelta body Body id string - cv CurrentVersionVector + cv SourceAndVersion revID string bodyBytes []byte lock sync.RWMutex @@ -133,7 +133,7 @@ func (rc *LRURevisionCache) GetWithRev(ctx context.Context, docID, revID string, return rc.getFromCacheByRev(ctx, docID, revID, true, includeBody, includeDelta) } -func (rc *LRURevisionCache) GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (DocumentRevision, error) { +func (rc *LRURevisionCache) GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, includeBody, includeDelta bool) (DocumentRevision, error) { return rc.getFromCacheByCV(ctx, docID, cv, true, includeBody, includeDelta) } @@ -175,7 +175,7 @@ func (rc *LRURevisionCache) getFromCacheByRev(ctx context.Context, docID, revID return docRev, err } -func (rc *LRURevisionCache) getFromCacheByCV(ctx context.Context, docID string, cv *CurrentVersionVector, loadCacheOnMiss bool, includeBody bool, includeDelta bool) (DocumentRevision, error) { +func (rc *LRURevisionCache) getFromCacheByCV(ctx context.Context, docID string, cv *SourceAndVersion, loadCacheOnMiss bool, includeBody bool, includeDelta bool) (DocumentRevision, error) { value := rc.getValueByCV(docID, cv, loadCacheOnMiss) if value == nil { return DocumentRevision{}, nil @@ -292,7 +292,7 @@ func (rc *LRURevisionCache) Put(ctx context.Context, docRev DocumentRevision) { func (rc *LRURevisionCache) Upsert(ctx context.Context, docRev DocumentRevision) { var value *revCacheValue // similar to PUT operation we should have the CV defined by this point (updateHLV is called before calling this) - key := IDandCV{DocID: docRev.DocID, Source: docRev.CV.SourceID, Version: docRev.CV.VersionCAS} + key := IDandCV{DocID: docRev.DocID, Source: docRev.CV.SourceID, Version: docRev.CV.Version} legacyKey := IDAndRev{DocID: docRev.DocID, RevID: docRev.RevID} rc.lock.Lock() @@ -340,12 +340,12 @@ func (rc *LRURevisionCache) getValue(docID, revID string, create bool) (value *r } // getValueByCV gets a value from rev cache by CV, if not found and create is true, will add the value to cache and both lookup maps -func (rc *LRURevisionCache) getValueByCV(docID string, cv *CurrentVersionVector, create bool) (value *revCacheValue) { +func (rc *LRURevisionCache) getValueByCV(docID string, cv *SourceAndVersion, create bool) (value *revCacheValue) { if docID == "" || cv == nil { return nil } - key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.VersionCAS} + key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.Version} rc.lock.Lock() if elem := rc.hlvCache[key]; elem != nil { rc.lruList.MoveToFront(elem) @@ -363,9 +363,9 @@ func (rc *LRURevisionCache) getValueByCV(docID string, cv *CurrentVersionVector, } // addToRevMapPostLoad will generate and entry in the Rev lookup map for a new document entering the cache -func (rc *LRURevisionCache) addToRevMapPostLoad(docID, revID string, cv *CurrentVersionVector) { +func (rc *LRURevisionCache) addToRevMapPostLoad(docID, revID string, cv *SourceAndVersion) { legacyKey := IDAndRev{DocID: docID, RevID: revID} - key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.VersionCAS} + key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.Version} rc.lock.Lock() defer rc.lock.Unlock() @@ -393,9 +393,9 @@ func (rc *LRURevisionCache) addToRevMapPostLoad(docID, revID string, cv *Current } // addToHLVMapPostLoad will generate and entry in the CV lookup map for a new document entering the cache -func (rc *LRURevisionCache) addToHLVMapPostLoad(docID, revID string, cv *CurrentVersionVector) { +func (rc *LRURevisionCache) addToHLVMapPostLoad(docID, revID string, cv *SourceAndVersion) { legacyKey := IDAndRev{DocID: docID, RevID: revID} - key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.VersionCAS} + key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.Version} rc.lock.Lock() defer rc.lock.Unlock() @@ -425,13 +425,13 @@ func (rc *LRURevisionCache) RemoveWithRev(docID, revID string) { } // RemoveWithCV removes a value from rev cache by CV reference if present -func (rc *LRURevisionCache) RemoveWithCV(docID string, cv *CurrentVersionVector) { +func (rc *LRURevisionCache) RemoveWithCV(docID string, cv *SourceAndVersion) { rc.removeFromCacheByCV(docID, cv) } // removeFromCacheByCV removes an entry from rev cache by CV -func (rc *LRURevisionCache) removeFromCacheByCV(docID string, cv *CurrentVersionVector) { - key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.VersionCAS} +func (rc *LRURevisionCache) removeFromCacheByCV(docID string, cv *SourceAndVersion) { + key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.Version} rc.lock.Lock() defer rc.lock.Unlock() element, ok := rc.hlvCache[key] @@ -458,7 +458,7 @@ func (rc *LRURevisionCache) removeFromCacheByRev(docID, revID string) { } // grab the cv key key from the value to enable us to remove the reference from the rev lookup map too elem := element.Value.(*revCacheValue) - hlvKey := IDandCV{DocID: docID, Source: elem.cv.SourceID, Version: elem.cv.VersionCAS} + hlvKey := IDandCV{DocID: docID, Source: elem.cv.SourceID, Version: elem.cv.Version} rc.lruList.Remove(element) delete(rc.cache, key) // remove from CV lookup map too @@ -475,7 +475,7 @@ func (rc *LRURevisionCache) removeValue(value *revCacheValue) { delete(rc.cache, revKey) } // need to also check hlv lookup cache map - hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.VersionCAS} + hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.Version} if element := rc.hlvCache[hlvKey]; element != nil && element.Value == value { rc.lruList.Remove(element) delete(rc.hlvCache, hlvKey) @@ -485,7 +485,7 @@ func (rc *LRURevisionCache) removeValue(value *revCacheValue) { func (rc *LRURevisionCache) purgeOldest_() { value := rc.lruList.Remove(rc.lruList.Back()).(*revCacheValue) revKey := IDAndRev{DocID: value.id, RevID: value.revID} - hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.VersionCAS} + hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.Version} delete(rc.cache, revKey) delete(rc.hlvCache, hlvKey) } @@ -499,7 +499,7 @@ func (value *revCacheValue) load(ctx context.Context, backingStore RevisionCache // to reduce locking when includeDelta=false var delta *RevisionDelta var docRevBody Body - var fetchedCV *CurrentVersionVector + var fetchedCV *SourceAndVersion var revid string // Attempt to read cached value. @@ -537,7 +537,7 @@ func (value *revCacheValue) load(ctx context.Context, backingStore RevisionCache } else { cacheHit = false if value.revID == "" { - hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.VersionCAS} + hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.Version} value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, revid, value.err = revCacheLoaderForCv(ctx, backingStore, hlvKey, includeBody) // based off the current value load we need to populate the revid key with what has been fetched from the bucket (for use of populating the opposite lookup map) value.revID = revid @@ -594,7 +594,7 @@ func (value *revCacheValue) asDocumentRevision(body Body, delta *RevisionDelta) Attachments: value.attachments.ShallowCopy(), // Avoid caller mutating the stored attachments Deleted: value.deleted, Removed: value.removed, - CV: &CurrentVersionVector{VersionCAS: value.cv.VersionCAS, SourceID: value.cv.SourceID}, + CV: &SourceAndVersion{Version: value.cv.Version, SourceID: value.cv.SourceID}, } if body != nil { docRev._shallowCopyBody = body.ShallowCopy() @@ -609,7 +609,7 @@ func (value *revCacheValue) asDocumentRevision(body Body, delta *RevisionDelta) func (value *revCacheValue) loadForDoc(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, includeBody bool) (docRev DocumentRevision, cacheHit bool, err error) { var docRevBody Body - var fetchedCV *CurrentVersionVector + var fetchedCV *SourceAndVersion var revid string value.lock.RLock() if value.bodyBytes != nil || value.err != nil { diff --git a/db/revision_cache_test.go b/db/revision_cache_test.go index d5abbe6b97..91d94df408 100644 --- a/db/revision_cache_test.go +++ b/db/revision_cache_test.go @@ -80,7 +80,7 @@ func (t *testBackingStore) getCurrentVersion(ctx context.Context, doc *Document) "testing": true, BodyId: doc.ID, BodyRev: doc.CurrentRev, - "current_version": &CurrentVersionVector{VersionCAS: doc.HLV.Version, SourceID: doc.HLV.SourceID}, + "current_version": &SourceAndVersion{Version: doc.HLV.Version, SourceID: doc.HLV.SourceID}, } bodyBytes, err := base.JSONMarshal(b) return bodyBytes, b, nil, err @@ -110,7 +110,7 @@ func TestLRURevisionCacheEviction(t *testing.T) { // Fill up the rev cache with the first 10 docs for docID := 0; docID < 10; docID++ { id := strconv.Itoa(docID) - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &SourceAndVersion{Version: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) } // Get them back out @@ -127,7 +127,7 @@ func TestLRURevisionCacheEviction(t *testing.T) { // Add 3 more docs to the now full revcache for i := 10; i < 13; i++ { docID := strconv.Itoa(i) - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: docID, RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(i), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: docID, RevID: "1-abc", CV: &SourceAndVersion{Version: uint64(i), SourceID: "test"}, History: Revisions{"start": 1}}) } // Check that the first 3 docs were evicted @@ -170,7 +170,7 @@ func TestLRURevisionCacheEvictionMixedRevAndCV(t *testing.T) { // Fill up the rev cache with the first 10 docs for docID := 0; docID < 10; docID++ { id := strconv.Itoa(docID) - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &SourceAndVersion{Version: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) } // assert that the list has 10 elements along with both lookup maps @@ -181,7 +181,7 @@ func TestLRURevisionCacheEvictionMixedRevAndCV(t *testing.T) { // Add 3 more docs to the now full rev cache to trigger eviction for docID := 10; docID < 13; docID++ { id := strconv.Itoa(docID) - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &SourceAndVersion{Version: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) } // assert the cache and associated lookup maps only have 10 items in them (i.e.e is eviction working?) assert.Equal(t, 10, len(cache.hlvCache)) @@ -192,7 +192,7 @@ func TestLRURevisionCacheEvictionMixedRevAndCV(t *testing.T) { prevCacheHitCount := cacheHitCounter.Value() for i := 0; i < 10; i++ { id := strconv.Itoa(i + 3) - cv := CurrentVersionVector{VersionCAS: uint64(i + 3), SourceID: "test"} + cv := SourceAndVersion{Version: uint64(i + 3), SourceID: "test"} docRev, err := cache.GetWithCV(ctx, id, &cv, RevCacheOmitBody, RevCacheOmitDelta) assert.NoError(t, err) assert.NotNil(t, docRev.BodyBytes, "nil body for %s", id) @@ -270,13 +270,13 @@ func TestBackingStoreCV(t *testing.T) { cache := NewLRURevisionCache(10, &testBackingStore{[]string{"not_found"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) // Get Rev for the first time - miss cache, but fetch the doc and revision to store - cv := CurrentVersionVector{SourceID: "test", VersionCAS: 123} + cv := SourceAndVersion{SourceID: "test", Version: 123} docRev, err := cache.GetWithCV(base.TestCtx(t), "doc1", &cv, RevCacheOmitBody, RevCacheOmitDelta) assert.NoError(t, err) assert.Equal(t, "doc1", docRev.DocID) assert.NotNil(t, docRev.Channels) assert.Equal(t, "test", docRev.CV.SourceID) - assert.Equal(t, uint64(123), docRev.CV.VersionCAS) + assert.Equal(t, uint64(123), docRev.CV.Version) assert.Equal(t, int64(0), cacheHitCounter.Value()) assert.Equal(t, int64(1), cacheMissCounter.Value()) assert.Equal(t, int64(1), getDocumentCounter.Value()) @@ -287,14 +287,14 @@ func TestBackingStoreCV(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "doc1", docRev.DocID) assert.Equal(t, "test", docRev.CV.SourceID) - assert.Equal(t, uint64(123), docRev.CV.VersionCAS) + assert.Equal(t, uint64(123), docRev.CV.Version) assert.Equal(t, int64(1), cacheHitCounter.Value()) assert.Equal(t, int64(1), cacheMissCounter.Value()) assert.Equal(t, int64(1), getDocumentCounter.Value()) assert.Equal(t, int64(1), getRevisionCounter.Value()) // Doc doesn't exist, so miss the cache, and fail when getting the doc - cv = CurrentVersionVector{SourceID: "test11", VersionCAS: 100} + cv = SourceAndVersion{SourceID: "test11", Version: 100} docRev, err = cache.GetWithCV(base.TestCtx(t), "not_found", &cv, RevCacheOmitBody, RevCacheOmitDelta) assertHTTPError(t, err, 404) assert.Nil(t, docRev.BodyBytes) @@ -557,7 +557,7 @@ func TestSingleLoad(t *testing.T) { cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} cache := NewLRURevisionCache(10, &testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) - cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc123", RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(123), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc123", RevID: "1-abc", CV: &SourceAndVersion{Version: uint64(123), SourceID: "test"}, History: Revisions{"start": 1}}) _, err := cache.GetWithRev(base.TestCtx(t), "doc123", "1-abc", true, false) assert.NoError(t, err) } @@ -567,7 +567,7 @@ func TestConcurrentLoad(t *testing.T) { cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} cache := NewLRURevisionCache(10, &testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) - cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc1", RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(1234), SourceID: "test"}, History: Revisions{"start": 1}}) + cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc1", RevID: "1-abc", CV: &SourceAndVersion{Version: uint64(1234), SourceID: "test"}, History: Revisions{"start": 1}}) // Trigger load into cache var wg sync.WaitGroup @@ -632,7 +632,7 @@ func TestRevCacheOperationsCV(t *testing.T) { cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} cache := NewLRURevisionCache(10, &testBackingStore{[]string{"test_doc"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) - cv := CurrentVersionVector{SourceID: "test", VersionCAS: 123} + cv := SourceAndVersion{SourceID: "test", Version: 123} documentRevision := DocumentRevision{ DocID: "doc1", RevID: "1-abc", @@ -648,7 +648,7 @@ func TestRevCacheOperationsCV(t *testing.T) { assert.Equal(t, "doc1", docRev.DocID) assert.Equal(t, base.SetOf("chan1"), docRev.Channels) assert.Equal(t, "test", docRev.CV.SourceID) - assert.Equal(t, uint64(123), docRev.CV.VersionCAS) + assert.Equal(t, uint64(123), docRev.CV.Version) assert.Equal(t, int64(1), cacheHitCounter.Value()) assert.Equal(t, int64(0), cacheMissCounter.Value()) @@ -661,7 +661,7 @@ func TestRevCacheOperationsCV(t *testing.T) { assert.Equal(t, "doc1", docRev.DocID) assert.Equal(t, base.SetOf("chan1"), docRev.Channels) assert.Equal(t, "test", docRev.CV.SourceID) - assert.Equal(t, uint64(123), docRev.CV.VersionCAS) + assert.Equal(t, uint64(123), docRev.CV.Version) assert.Equal(t, []byte(`{"test":"12345"}`), docRev.BodyBytes) assert.Equal(t, int64(2), cacheHitCounter.Value()) assert.Equal(t, int64(0), cacheMissCounter.Value()) @@ -705,7 +705,7 @@ func TestLoaderMismatchInCV(t *testing.T) { cache := NewLRURevisionCache(10, &testBackingStore{[]string{"test_doc"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) // create cv with incorrect version to the one stored in backing store - cv := CurrentVersionVector{SourceID: "test", VersionCAS: 1234} + cv := SourceAndVersion{SourceID: "test", Version: 1234} _, err := cache.GetWithCV(base.TestCtx(t), "doc1", &cv, RevCacheOmitBody, RevCacheOmitDelta) require.Error(t, err) @@ -735,7 +735,7 @@ func TestConcurrentLoadByCVAndRevOnCache(t *testing.T) { wg := sync.WaitGroup{} wg.Add(2) - cv := CurrentVersionVector{SourceID: "test", VersionCAS: 123} + cv := SourceAndVersion{SourceID: "test", Version: 123} go func() { _, err := cache.GetWithRev(ctx, "doc1", "1-abc", RevCacheOmitBody, RevCacheIncludeDelta) require.NoError(t, err) @@ -773,9 +773,9 @@ func TestGetActive(t *testing.T) { rev1id, doc, err := collection.Put(ctx, "doc", Body{"val": 123}) require.NoError(t, err) - expectedCV := CurrentVersionVector{ - SourceID: db.BucketUUID, - VersionCAS: doc.Cas, + expectedCV := SourceAndVersion{ + SourceID: db.BucketUUID, + Version: doc.Cas, } // remove the entry form the rev cache to force teh cache to not have the active version in it @@ -801,7 +801,7 @@ func TestConcurrentPutAndGetOnRevCache(t *testing.T) { wg := sync.WaitGroup{} wg.Add(2) - cv := CurrentVersionVector{SourceID: "test", VersionCAS: 123} + cv := SourceAndVersion{SourceID: "test", Version: 123} docRev := DocumentRevision{ DocID: "doc1", RevID: "1-abc", diff --git a/rest/attachment_test.go b/rest/attachment_test.go index 2f0cf87f3b..f44408068d 100644 --- a/rest/attachment_test.go +++ b/rest/attachment_test.go @@ -1060,7 +1060,6 @@ func TestAttachmentContentType(t *testing.T) { } func TestBasicAttachmentRemoval(t *testing.T) { - t.Skip("Disabled pending CBG-3503") rt := NewRestTester(t, &RestTesterConfig{GuestEnabled: true}) defer rt.Close() @@ -2224,7 +2223,6 @@ func TestAttachmentDeleteOnPurge(t *testing.T) { } func TestAttachmentDeleteOnExpiry(t *testing.T) { - t.Skip("Disabled pending CBG-3503") rt := NewRestTester(t, nil) defer rt.Close() diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index 679d6c2114..d352647b09 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -424,7 +424,6 @@ func TestXattrDoubleDelete(t *testing.T) { } func TestViewQueryTombstoneRetrieval(t *testing.T) { - t.Skip("Disabled pending CBG-3503") base.SkipImportTestsIfNotEnabled(t) if !base.TestsDisableGSI() {