From 3df720ee1a9c2f7a8d1c9fe6b5bae84fd995344e Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Fri, 24 May 2024 17:14:53 -0700 Subject: [PATCH] CBG-3877 Persist HLV to _vv xattr (#6843) Modifies document marshal and unmarshal to support a set of xattrs (_sync, _vv), and does the same for parsing DCP stream events (including user xattr). --- base/constants.go | 1 + db/change_cache_test.go | 4 +- db/crud.go | 32 +++-- db/crud_test.go | 2 +- db/database.go | 5 +- db/database_collection.go | 4 +- db/document.go | 176 +++++++++++++++++---------- db/document_test.go | 60 ++++----- db/hybrid_logical_vector.go | 4 +- db/hybrid_logical_vector_test.go | 8 +- db/import.go | 10 +- db/import_test.go | 70 ++++++++++- db/util_testing.go | 12 +- db/utilities_hlv_testing.go | 9 +- rest/attachment_test.go | 6 +- rest/blip_api_attachment_test.go | 21 ++-- rest/blip_api_crud_test.go | 29 +++-- rest/changes_test.go | 4 +- rest/changestest/changes_api_test.go | 2 +- 19 files changed, 299 insertions(+), 160 deletions(-) diff --git a/base/constants.go b/base/constants.go index 0b5044c05d..4bfe8cca6d 100644 --- a/base/constants.go +++ b/base/constants.go @@ -136,6 +136,7 @@ const ( SyncPropertyName = "_sync" // SyncXattrName is used when storing sync data in a document's xattrs. SyncXattrName = "_sync" + VvXattrName = "_vv" // MouXattrName is used when storing metadata-only update information in a document's xattrs. MouXattrName = "_mou" diff --git a/db/change_cache_test.go b/db/change_cache_test.go index d2502e6d0b..41b63a7186 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -1457,7 +1457,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) { } var doc1DCPBytes []byte if base.TestUseXattrs() { - body, syncXattr, _, err := doc1.MarshalWithXattrs() + body, syncXattr, _, _, err := doc1.MarshalWithXattrs() require.NoError(t, err) doc1DCPBytes = sgbucket.EncodeValueWithXattrs(body, sgbucket.Xattr{Name: base.SyncXattrName, Value: syncXattr}) } else { @@ -1482,7 +1482,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) { var dataType sgbucket.FeedDataType = base.MemcachedDataTypeJSON if base.TestUseXattrs() { dataType |= base.MemcachedDataTypeXattr - body, syncXattr, _, err := doc2.MarshalWithXattrs() + body, syncXattr, _, _, err := doc2.MarshalWithXattrs() require.NoError(t, err) doc2DCPBytes = sgbucket.EncodeValueWithXattrs(body, sgbucket.Xattr{Name: base.SyncXattrName, Value: syncXattr}) } else { diff --git a/db/crud.go b/db/crud.go index 9a64bc45f2..7cf3678260 100644 --- a/db/crud.go +++ b/db/crud.go @@ -60,7 +60,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin return nil, nil, base.HTTPErrorf(400, "Invalid doc ID") } if c.UseXattrs() { - doc, rawBucketDoc, err = c.GetDocWithXattr(ctx, key, unmarshalLevel) + doc, rawBucketDoc, err = c.GetDocWithXattrs(ctx, key, unmarshalLevel) if err != nil { return nil, nil, err } @@ -114,7 +114,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin return doc, rawBucketDoc, nil } -func (c *DatabaseCollection) GetDocWithXattr(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) { +func (c *DatabaseCollection) GetDocWithXattrs(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) { rawBucketDoc = &sgbucket.BucketDocument{} var getErr error rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncAndUserXattrKeys()) @@ -190,6 +190,12 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) ( } +// unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map +func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { + return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], cas, unmarshalLevel) + +} + // This gets *just* the Sync Metadata (_sync field) rather than the entire doc, for efficiency // reasons. Unlike GetDocSyncData it does not check for on-demand import; this means it does not // need to read the doc body from the bucket. @@ -197,7 +203,7 @@ func (db *DatabaseCollection) GetDocSyncDataNoImport(ctx context.Context, docid if db.UseXattrs() { var xattrs map[string][]byte var cas uint64 - xattrs, cas, err = db.dataStore.GetXattrs(ctx, docid, []string{base.SyncXattrName}) + xattrs, cas, err = db.dataStore.GetXattrs(ctx, docid, []string{base.SyncXattrName, base.VvXattrName}) if err == nil { var doc *Document doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, nil, xattrs, cas, level) @@ -232,7 +238,7 @@ func (db *DatabaseCollection) GetDocSyncDataNoImport(ctx context.Context, docid return } -// OnDemandImportForGet. Attempts to import the doc based on the provided id, contents and cas. ImportDocRaw does cas retry handling +// OnDemandImportForGet. Attempts to import the doc based on the provided id, contents and cas. ImportDocRaw does cas retry handling // if the document gets updated after the initial retrieval attempt that triggered this. func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid string, rawDoc []byte, xattrs map[string][]byte, cas uint64) (docOut *Document, err error) { isDelete := rawDoc == nil @@ -2274,13 +2280,19 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do // Return the new raw document value for the bucket to store. doc.SetCrc32cUserXattrHash() - var rawSyncXattr, rawMouXattr, rawDocBody []byte - rawDocBody, rawSyncXattr, rawMouXattr, err = doc.MarshalWithXattrs() + + var rawSyncXattr, rawMouXattr, rawVvXattr, rawDocBody []byte + rawDocBody, rawSyncXattr, rawVvXattr, rawMouXattr, err = doc.MarshalWithXattrs() + if err != nil { + return updatedDoc, err + } + if len(rawDocBody) > 0 { updatedDoc.Doc = rawDocBody docBytes = len(updatedDoc.Doc) } - updatedDoc.Xattrs = map[string][]byte{base.SyncXattrName: rawSyncXattr} + + updatedDoc.Xattrs = map[string][]byte{base.SyncXattrName: rawSyncXattr, base.VvXattrName: rawVvXattr} if rawMouXattr != nil && db.useMou() { updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr } @@ -2898,7 +2910,7 @@ func (c *DatabaseCollection) checkForUpgrade(ctx context.Context, key string, un return nil, nil } - doc, rawDocument, err := c.GetDocWithXattr(ctx, key, unmarshalLevel) + doc, rawDocument, err := c.GetDocWithXattrs(ctx, key, unmarshalLevel) if err != nil || doc == nil || !doc.HasValidSyncData() { return nil, nil } @@ -3051,8 +3063,8 @@ const ( xattrMacroCas = "cas" // SyncData.Cas xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c xattrMacroCurrentRevVersion = "rev.ver" // SyncDataJSON.RevAndVersion.CurrentVersion - versionVectorVrsMacro = "_vv.ver" // PersistedHybridLogicalVector.Version - versionVectorCVCASMacro = "_vv.cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS + versionVectorVrsMacro = "ver" // PersistedHybridLogicalVector.Version + versionVectorCVCASMacro = "cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS expandMacroCASValue = "expand" // static value that indicates that a CAS macro expansion should be applied to a property ) diff --git a/db/crud_test.go b/db/crud_test.go index 66915d0466..41168053f3 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -245,7 +245,7 @@ func TestHasAttachmentsFlagForLegacyAttachments(t *testing.T) { require.NoError(t, err) // Get the existing bucket doc - _, existingBucketDoc, err := collection.GetDocWithXattr(ctx, docID, DocUnmarshalAll) + _, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, docID, DocUnmarshalAll) require.NoError(t, err) // Migrate document metadata from document body to system xattr. diff --git a/db/database.go b/db/database.go index a7d929c597..6574dd712f 100644 --- a/db/database.go +++ b/db/database.go @@ -1874,11 +1874,12 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, doc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.metadataOnlyUpdate) } - _, rawXattr, rawMouXattr, err := updatedDoc.MarshalWithXattrs() + _, rawSyncXattr, rawVvXattr, rawMouXattr, err := updatedDoc.MarshalWithXattrs() updatedDoc := sgbucket.UpdatedDoc{ Doc: nil, // Resync does not require document body update Xattrs: map[string][]byte{ - base.SyncXattrName: rawXattr, + base.SyncXattrName: rawSyncXattr, + base.VvXattrName: rawVvXattr, }, Expiry: updatedExpiry, } diff --git a/db/database_collection.go b/db/database_collection.go index 495865f8bb..02bfdd3532 100644 --- a/db/database_collection.go +++ b/db/database_collection.go @@ -239,7 +239,7 @@ func (c *DatabaseCollection) unsupportedOptions() *UnsupportedOptions { // syncAndUserXattrKeys returns the xattr keys for the user and sync xattrs. func (c *DatabaseCollection) syncAndUserXattrKeys() []string { - xattrKeys := []string{base.SyncXattrName} + xattrKeys := []string{base.SyncXattrName, base.VvXattrName} userXattrKey := c.userXattrKey() if userXattrKey != "" { xattrKeys = append(xattrKeys, userXattrKey) @@ -249,7 +249,7 @@ func (c *DatabaseCollection) syncAndUserXattrKeys() []string { // syncMouAndUserXattrKeys returns the xattr keys for the user, mou and sync xattrs. func (c *DatabaseCollection) syncMouAndUserXattrKeys() []string { - xattrKeys := []string{base.SyncXattrName} + xattrKeys := []string{base.SyncXattrName, base.VvXattrName} if c.useMou() { xattrKeys = append(xattrKeys, base.MouXattrName) } diff --git a/db/document.go b/db/document.go index b36beb2ddb..5e173c48d3 100644 --- a/db/document.go +++ b/db/document.go @@ -34,11 +34,11 @@ const DocumentHistoryMaxEntriesPerChannel = 5 type DocumentUnmarshalLevel uint8 const ( - DocUnmarshalAll = DocumentUnmarshalLevel(iota) // Unmarshals sync metadata and body - DocUnmarshalSync // Unmarshals all sync metadata - DocUnmarshalNoHistory // Unmarshals sync metadata excluding revtree history + DocUnmarshalAll = DocumentUnmarshalLevel(iota) // Unmarshals metadata and body + DocUnmarshalSync // Unmarshals metadata + DocUnmarshalNoHistory // Unmarshals metadata excluding revtree history DocUnmarshalHistory // Unmarshals revtree history + rev + CAS only - DocUnmarshalRev // Unmarshals rev + CAS only + DocUnmarshalRev // Unmarshals revTreeID + CAS only (no HLV) DocUnmarshalCAS // Unmarshals CAS (for import check) only DocUnmarshalNone // No unmarshalling (skips import/upgrade check) ) @@ -86,7 +86,7 @@ type SyncData struct { Attachments AttachmentsMeta `json:"attachments,omitempty"` ChannelSet []ChannelSetEntry `json:"channel_set"` ChannelSetHistory []ChannelSetEntry `json:"channel_set_history"` - HLV *HybridLogicalVector `json:"_vv,omitempty"` + HLV *HybridLogicalVector `json:"-"` // Marshalled/Unmarshalled separately from SyncData for storage in _vv, see MarshalWithXattrs/UnmarshalWithXattrs // Only used for performance metrics: TimeSaved time.Time `json:"time_saved,omitempty"` // Timestamp of save. @@ -407,20 +407,14 @@ func unmarshalDocument(docid string, data []byte) (*Document, error) { return doc, nil } -// unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map -func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { - return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], cas, unmarshalLevel) +func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, syncXattrData, hlvXattrData, mouXattrData, userXattrData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { -} - -func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, syncXattrData, mouXattrData, userXattrData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { - - if syncXattrData == nil || len(syncXattrData) == 0 { + if len(syncXattrData) == 0 && len(hlvXattrData) == 0 { // If no xattr data, unmarshal as standard doc doc, err = unmarshalDocument(docid, data) } else { doc = NewDocument(docid) - err = doc.UnmarshalWithXattr(ctx, data, syncXattrData, unmarshalLevel) + err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, unmarshalLevel) } if err != nil { return nil, err @@ -462,14 +456,14 @@ func UnmarshalDocumentSyncData(data []byte, needHistory bool) (*SyncData, error) // TODO: Using a pool of unmarshal workers may help prevent memory spikes under load func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey string, needHistory bool) (result *SyncData, rawBody []byte, rawXattrs map[string][]byte, err error) { - var body []byte - var xattrValues map[string][]byte // If xattr datatype flag is set, data includes both xattrs and document body. Check for presence of sync xattr. // Note that there could be a non-sync xattr present + var xattrValues map[string][]byte + var hlv *HybridLogicalVector if dataType&base.MemcachedDataTypeXattr != 0 { - xattrKeys := []string{base.SyncXattrName, base.MouXattrName} + xattrKeys := []string{base.SyncXattrName, base.MouXattrName, base.VvXattrName} if userXattrKey != "" { xattrKeys = append(xattrKeys, userXattrKey) } @@ -478,19 +472,32 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey return nil, nil, nil, err } - rawSyncXattr, _ := xattrValues[base.SyncXattrName] // If the sync xattr is present, use that to build SyncData - if len(rawSyncXattr) > 0 { + syncXattr, ok := xattrValues[base.SyncXattrName] + + if vvXattr, ok := xattrValues[base.VvXattrName]; ok { + err = base.JSONUnmarshal(vvXattr, &hlv) + if err != nil { + return nil, nil, nil, fmt.Errorf("error unmarshalling HLV: %w", err) + } + } + + if ok && len(syncXattr) > 0 { result = &SyncData{} if needHistory { result.History = make(RevTree) } - err = base.JSONUnmarshal(rawSyncXattr, result) + err = base.JSONUnmarshal(syncXattr, result) if err != nil { - return nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", syncXattr, err) + return nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", string(syncXattr), err) + } + + if hlv != nil { + result.HLV = hlv } return result, body, xattrValues, nil } + } else { // Xattr flag not set - data is just the document body body = data @@ -498,6 +505,15 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey // Non-xattr data, or sync xattr not present. Attempt to retrieve sync metadata from document body result, err = UnmarshalDocumentSyncData(body, needHistory) + + // If no sync data was found but HLV was present, initialize empty sync data + if result == nil && hlv != nil { + result = &SyncData{} + } + // If HLV was found, add to sync data + if hlv != nil { + result.HLV = hlv + } return result, body, xattrValues, err } @@ -513,7 +529,7 @@ func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, da if err != nil { return nil, err } - return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], cas, DocUnmarshalAll) + return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], cas, DocUnmarshalAll) } func (doc *SyncData) HasValidSyncData() bool { @@ -916,7 +932,7 @@ func (doc *Document) updateChannels(ctx context.Context, newChannels base.Set) ( doc.updateChannelHistory(channel, curSequence, false) changed = append(changed, channel) // If the current version requires macro expansion, new removal in channel map will also require macro expansion - if doc.HLV.Version == hlvExpandMacroCASValue { + if doc.HLV != nil && doc.HLV.Version == hlvExpandMacroCASValue { revokedChannelsRequiringExpansion = append(revokedChannelsRequiringExpansion, channel) } } @@ -1071,11 +1087,12 @@ func (doc *Document) MarshalJSON() (data []byte, err error) { return data, err } -// UnmarshalWithXattr unmarshals the provided raw document and xattr bytes. The provided DocumentUnmarshalLevel +// UnmarshalWithXattrs unmarshals the provided raw document and xattr bytes when present. The provided DocumentUnmarshalLevel // (unmarshalLevel) specifies how much of the provided document/xattr needs to be initially unmarshalled. If // unmarshalLevel is anything less than the full document + metadata, the raw data is retained for subsequent // lazy unmarshalling as needed. -func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata []byte, unmarshalLevel DocumentUnmarshalLevel) error { +// Must handle cases where document body and hlvXattrData are present without syncXattrData for all DocumentUnmarshalLevel +func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData []byte, unmarshalLevel DocumentUnmarshalLevel) error { if doc.ID == "" { base.WarnfCtx(ctx, "Attempted to unmarshal document without ID set") return errors.New("Document was unmarshalled without ID set") @@ -1083,11 +1100,19 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata switch unmarshalLevel { case DocUnmarshalAll, DocUnmarshalSync: - // Unmarshal full document and/or sync metadata + // Unmarshal full document and/or sync metadata. Documents written by XDCR may have HLV but no sync data doc.SyncData = SyncData{History: make(RevTree)} - unmarshalErr := base.JSONUnmarshal(xdata, &doc.SyncData) - if unmarshalErr != nil { - return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), unmarshalErr)) + if syncXattrData != nil { + unmarshalErr := base.JSONUnmarshal(syncXattrData, &doc.SyncData) + if unmarshalErr != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), unmarshalErr)) + } + } + if hlvXattrData != nil { + err := base.JSONUnmarshal(hlvXattrData, &doc.SyncData.HLV) + if err != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err)) + } } doc._rawBody = data // Unmarshal body if requested and present @@ -1097,50 +1122,70 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata case DocUnmarshalNoHistory: // Unmarshal sync metadata only, excluding history doc.SyncData = SyncData{} - unmarshalErr := base.JSONUnmarshal(xdata, &doc.SyncData) - if unmarshalErr != nil { - return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalNoHistory). Error: %v", base.UD(doc.ID), unmarshalErr)) + if syncXattrData != nil { + unmarshalErr := base.JSONUnmarshal(syncXattrData, &doc.SyncData) + if unmarshalErr != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattrs() doc with id: %s (DocUnmarshalNoHistory). Error: %v", base.UD(doc.ID), unmarshalErr)) + } + } + if hlvXattrData != nil { + err := base.JSONUnmarshal(hlvXattrData, &doc.SyncData.HLV) + if err != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalNoHistory). Error: %v", base.UD(doc.ID), err)) + } } doc._rawBody = data case DocUnmarshalHistory: - historyOnlyMeta := historyOnlySyncData{History: make(RevTree)} - unmarshalErr := base.JSONUnmarshal(xdata, &historyOnlyMeta) - if unmarshalErr != nil { - return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalHistory). Error: %v", base.UD(doc.ID), unmarshalErr)) - } - doc.SyncData = SyncData{ - CurrentRev: historyOnlyMeta.CurrentRev.RevTreeID, - History: historyOnlyMeta.History, - Cas: historyOnlyMeta.Cas, + if syncXattrData != nil { + historyOnlyMeta := historyOnlySyncData{History: make(RevTree)} + unmarshalErr := base.JSONUnmarshal(syncXattrData, &historyOnlyMeta) + if unmarshalErr != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattrs() doc with id: %s (DocUnmarshalHistory). Error: %v", base.UD(doc.ID), unmarshalErr)) + } + doc.SyncData = SyncData{ + CurrentRev: historyOnlyMeta.CurrentRev.RevTreeID, + History: historyOnlyMeta.History, + Cas: historyOnlyMeta.Cas, + } + } else { + doc.SyncData = SyncData{} } doc._rawBody = data case DocUnmarshalRev: // Unmarshal only rev and cas from sync metadata - var revOnlyMeta revOnlySyncData - unmarshalErr := base.JSONUnmarshal(xdata, &revOnlyMeta) - if unmarshalErr != nil { - return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalRev). Error: %v", base.UD(doc.ID), unmarshalErr)) - } - doc.SyncData = SyncData{ - CurrentRev: revOnlyMeta.CurrentRev.RevTreeID, - Cas: revOnlyMeta.Cas, + if syncXattrData != nil { + var revOnlyMeta revOnlySyncData + unmarshalErr := base.JSONUnmarshal(syncXattrData, &revOnlyMeta) + if unmarshalErr != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattrs() doc with id: %s (DocUnmarshalRev). Error: %v", base.UD(doc.ID), unmarshalErr)) + } + doc.SyncData = SyncData{ + CurrentRev: revOnlyMeta.CurrentRev.RevTreeID, + Cas: revOnlyMeta.Cas, + } + } else { + doc.SyncData = SyncData{} } doc._rawBody = data case DocUnmarshalCAS: // Unmarshal only cas from sync metadata - var casOnlyMeta casOnlySyncData - unmarshalErr := base.JSONUnmarshal(xdata, &casOnlyMeta) - if unmarshalErr != nil { - return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalCAS). Error: %v", base.UD(doc.ID), unmarshalErr)) - } - doc.SyncData = SyncData{ - Cas: casOnlyMeta.Cas, + if syncXattrData != nil { + var casOnlyMeta casOnlySyncData + unmarshalErr := base.JSONUnmarshal(syncXattrData, &casOnlyMeta) + if unmarshalErr != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattrs() doc with id: %s (DocUnmarshalCAS). Error: %v", base.UD(doc.ID), unmarshalErr)) + } + doc.SyncData = SyncData{ + Cas: casOnlyMeta.Cas, + } + } else { + doc.SyncData = SyncData{} } doc._rawBody = data } // If there's no body, but there is an xattr, set deleted flag and initialize an empty body - if len(data) == 0 && len(xdata) > 0 { + if len(data) == 0 && len(syncXattrData) > 0 { doc._body = Body{} doc._rawBody = []byte(base.EmptyDocument) doc.Deleted = true @@ -1148,7 +1193,8 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata return nil } -func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr []byte, mouXattr []byte, err error) { +// MarshalWithXattrs marshals the Document into body, and sync, vv and mou xattrs for persistence. +func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr, vvXattr, mouXattr []byte, err error) { // Grab the rawBody if it's already marshalled, otherwise unmarshal the body if doc._rawBody != nil { if !doc.IsDeleted() { @@ -1165,25 +1211,31 @@ func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr []byte, mouXatt if !deleted { data, err = base.JSONMarshal(body) if err != nil { - return nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc body with id: %s. Error: %v", base.UD(doc.ID), err)) + return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc body with id: %s. Error: %v", base.UD(doc.ID), err)) } } } } + if doc.SyncData.HLV != nil { + vvXattr, err = base.JSONMarshal(&doc.SyncData.HLV) + if err != nil { + return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc vv with id: %s. Error: %v", base.UD(doc.ID), err)) + } + } syncXattr, err = base.JSONMarshal(doc.SyncData) if err != nil { - return nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err)) + return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err)) } if doc.metadataOnlyUpdate != nil { mouXattr, err = base.JSONMarshal(doc.metadataOnlyUpdate) if err != nil { - return nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc MouData with id: %s. Error: %v", base.UD(doc.ID), err)) + return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc MouData with id: %s. Error: %v", base.UD(doc.ID), err)) } } - return data, syncXattr, mouXattr, nil + return data, syncXattr, vvXattr, mouXattr, nil } // computeMetadataOnlyUpdate computes a new metadataOnlyUpdate based on the existing document's CAS and metadataOnlyUpdate diff --git a/db/document_test.go b/db/document_test.go index 3d172fdf05..1d95bc1260 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -137,7 +137,7 @@ func BenchmarkDocUnmarshal(b *testing.B) { b.Run(bm.name, func(b *testing.B) { ctx := base.TestCtx(b) for i := 0; i < b.N; i++ { - _, _ = unmarshalDocumentWithXattrs(ctx, "doc_1k", doc1k_body, doc1k_meta, nil, nil, 1, bm.unmarshalLevel) + _, _ = unmarshalDocumentWithXattrs(ctx, "doc_1k", doc1k_body, doc1k_meta, nil, nil, nil, 1, bm.unmarshalLevel) } }) } @@ -192,7 +192,7 @@ func BenchmarkUnmarshalBody(b *testing.B) { } } -const doc_meta_with_vv = `{ +const doc_meta_no_vv = `{ "rev": "3-89758294abc63157354c2b08547c2d21", "sequence": 7, "recent_sequences": [ @@ -235,22 +235,23 @@ const doc_meta_with_vv = `{ }, "GHI": null }, - "_vv":{ - "cvCas":"0x40e2010000000000", - "src":"cb06dc003846116d9b66d2ab23887a96", - "ver":"0x40e2010000000000", - "mv":{ - "s_LhRPsa7CpjEvP5zeXTXEBA":"c0ff05d7ac059a16", - "s_NqiIe0LekFPLeX4JvTO6Iw":"1c008cd6ac059a16" - }, - "pv":{ - "s_YZvBpEaztom9z5V/hDoeIw":"f0ff44d6ac059a16" - } - }, "cas": "", "time_saved": "2017-10-25T12:45:29.622450174-07:00" }` +const doc_meta_vv = `{ + "cvCas":"0x40e2010000000000", + "src":"cb06dc003846116d9b66d2ab23887a96", + "ver":"0x40e2010000000000", + "mv":{ + "s_LhRPsa7CpjEvP5zeXTXEBA":"c0ff05d7ac059a16", + "s_NqiIe0LekFPLeX4JvTO6Iw":"1c008cd6ac059a16" + }, + "pv":{ + "s_YZvBpEaztom9z5V/hDoeIw":"f0ff44d6ac059a16" + } + }` + func TestParseVersionVectorSyncData(t *testing.T) { mv := make(map[string]string) pv := make(map[string]string) @@ -260,8 +261,9 @@ func TestParseVersionVectorSyncData(t *testing.T) { ctx := base.TestCtx(t) - doc_meta := []byte(doc_meta_with_vv) - doc, err := unmarshalDocumentWithXattrs(ctx, "doc_1k", nil, doc_meta, nil, nil, 1, DocUnmarshalNoHistory) + sync_meta := []byte(doc_meta_no_vv) + vv_meta := []byte(doc_meta_vv) + doc, err := unmarshalDocumentWithXattrs(ctx, "doc_1k", nil, sync_meta, vv_meta, nil, nil, 1, DocUnmarshalNoHistory) require.NoError(t, err) strCAS := string(base.Uint64CASToLittleEndianHex(123456)) @@ -272,7 +274,7 @@ func TestParseVersionVectorSyncData(t *testing.T) { assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) - doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, doc_meta, nil, nil, 1, DocUnmarshalAll) + doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, 1, DocUnmarshalAll) require.NoError(t, err) // assert on doc version vector values @@ -282,7 +284,7 @@ func TestParseVersionVectorSyncData(t *testing.T) { assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) - doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, doc_meta, nil, nil, 1, DocUnmarshalNoHistory) + doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, 1, DocUnmarshalNoHistory) require.NoError(t, err) // assert on doc version vector values @@ -347,32 +349,23 @@ func TestRevAndVersion(t *testing.T) { require.NoError(t, err) log.Printf("marshalled:%s", marshalledSyncData) - var newSyncData SyncData - err = base.JSONUnmarshal(marshalledSyncData, &newSyncData) - require.NoError(t, err) - require.Equal(t, test.revTreeID, newSyncData.CurrentRev) - require.Equal(t, expectedSequence, newSyncData.Sequence) - if test.source != "" { - require.NotNil(t, newSyncData.HLV) - require.Equal(t, test.source, newSyncData.HLV.SourceID) - require.Equal(t, test.version, newSyncData.HLV.Version) - } - // Document test document := NewDocument("docID") document.SyncData.CurrentRev = test.revTreeID + document.SyncData.Sequence = expectedSequence document.SyncData.HLV = &HybridLogicalVector{ SourceID: test.source, Version: test.version, } - marshalledDoc, marshalledSyncXattr, _, err := document.MarshalWithXattrs() + + marshalledDoc, marshalledXattr, marshalledVvXattr, _, err := document.MarshalWithXattrs() require.NoError(t, err) newDocument := NewDocument("docID") - err = newDocument.UnmarshalWithXattr(ctx, marshalledDoc, marshalledSyncXattr, DocUnmarshalAll) + err = newDocument.UnmarshalWithXattrs(ctx, marshalledDoc, marshalledXattr, marshalledVvXattr, DocUnmarshalAll) require.NoError(t, err) require.Equal(t, test.revTreeID, newDocument.CurrentRev) - require.Equal(t, expectedSequence, newSyncData.Sequence) + require.Equal(t, expectedSequence, newDocument.Sequence) if test.source != "" { require.NotNil(t, newDocument.HLV) require.Equal(t, test.source, newDocument.HLV.SourceID) @@ -512,7 +505,7 @@ func TestInvalidXattrStreamEmptyBody(t *testing.T) { emptyBody := []byte{} // DecodeValueWithXattrs is the underlying function - body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{"_sync"}, inputStream) + body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.SyncXattrName}, inputStream) require.NoError(t, err) require.Equal(t, emptyBody, body) require.Empty(t, xattrs) @@ -523,7 +516,6 @@ func TestInvalidXattrStreamEmptyBody(t *testing.T) { require.Nil(t, result) require.Equal(t, emptyBody, rawBody) require.Nil(t, rawXattrs[base.SyncXattrName]) - } // getSingleXattrDCPBytes returns a DCP body with a single xattr pair and body diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 9c242b7e59..15f57288f2 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -377,14 +377,14 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector HybridLogicalVector func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansionSpec { var outputSpec []sgbucket.MacroExpansionSpec if hlv.Version == hlvExpandMacroCASValue { - spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(base.SyncXattrName), sgbucket.MacroCas) + spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(base.VvXattrName), sgbucket.MacroCas) outputSpec = append(outputSpec, spec) // If version is being expanded, we need to also specify the macro expansion for the expanded rev property currentRevSpec := sgbucket.NewMacroExpansionSpec(xattrCurrentRevVersionPath(base.SyncXattrName), sgbucket.MacroCas) outputSpec = append(outputSpec, currentRevSpec) } if hlv.CurrentVersionCAS == hlvExpandMacroCASValue { - spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.SyncXattrName), sgbucket.MacroCas) + spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.VvXattrName), sgbucket.MacroCas) outputSpec = append(outputSpec, spec) } return outputSpec diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index 953fc4cea1..2ed9db95fb 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -281,7 +281,7 @@ func TestHLVImport(t *testing.T) { _, err = collection.ImportDocRaw(ctx, standardImportKey, standardImportBody, nil, false, cas, nil, ImportFromFeed) require.NoError(t, err, "import error") - importedDoc, _, err := collection.GetDocWithXattr(ctx, standardImportKey, DocUnmarshalAll) + importedDoc, _, err := collection.GetDocWithXattrs(ctx, standardImportKey, DocUnmarshalAll) require.NoError(t, err) importedHLV := importedDoc.HLV encodedCAS := string(base.Uint64CASToLittleEndianHex(cas)) @@ -292,18 +292,18 @@ func TestHLVImport(t *testing.T) { // 2. Test import of write by HLV-aware peer (HLV is already updated, sync metadata is not). otherSource := "otherSource" - hlvHelper := NewHLVAgent(t, collection.dataStore, otherSource, "_sync") + hlvHelper := NewHLVAgent(t, collection.dataStore, otherSource, "_vv") existingHLVKey := "existingHLV_" + t.Name() _ = hlvHelper.insertWithHLV(ctx, existingHLVKey) - existingBody, existingXattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, existingHLVKey, []string{base.SyncXattrName}) + existingBody, existingXattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, existingHLVKey, []string{base.SyncXattrName, base.VvXattrName}) require.NoError(t, err) encodedCAS = EncodeValue(cas) _, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattrs, false, cas, nil, ImportFromFeed) require.NoError(t, err, "import error") - importedDoc, _, err = collection.GetDocWithXattr(ctx, existingHLVKey, DocUnmarshalAll) + importedDoc, _, err = collection.GetDocWithXattrs(ctx, existingHLVKey, DocUnmarshalAll) require.NoError(t, err) importedHLV = importedDoc.HLV // cas in the HLV's current version and cvCAS should not have changed, and should match importCAS diff --git a/db/import.go b/db/import.go index 28e30bb664..073d8b2470 100644 --- a/db/import.go +++ b/db/import.go @@ -90,7 +90,7 @@ func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid strin existingBucketDoc.Xattrs[base.MouXattrName], err = base.JSONMarshal(existingDoc.metadataOnlyUpdate) } } else { - existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], existingBucketDoc.Xattrs[base.MouXattrName], err = existingDoc.MarshalWithXattrs() + existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], existingBucketDoc.Xattrs[base.VvXattrName], existingBucketDoc.Xattrs[base.MouXattrName], err = existingDoc.MarshalWithXattrs() } } @@ -397,14 +397,18 @@ func (db *DatabaseCollectionWithUser) migrateMetadata(ctx context.Context, docid } // Persist the document in xattr format - value, syncXattrValue, _, marshalErr := doc.MarshalWithXattrs() + value, syncXattr, vvXattr, _, marshalErr := doc.MarshalWithXattrs() if marshalErr != nil { return nil, false, marshalErr } xattrs := map[string][]byte{ - base.SyncXattrName: syncXattrValue, + base.SyncXattrName: syncXattr, } + if vvXattr != nil { + xattrs[base.VvXattrName] = vvXattr + } + var casOut uint64 var writeErr error var xattrsToDelete []string diff --git a/db/import_test.go b/db/import_test.go index 3969f42627..28964d11f0 100644 --- a/db/import_test.go +++ b/db/import_test.go @@ -184,7 +184,7 @@ func TestMigrateMetadata(t *testing.T) { assert.NoError(t, err, "Error writing doc w/ expiry") // Get the existing bucket doc - _, existingBucketDoc, err := collection.GetDocWithXattr(ctx, key, DocUnmarshalAll) + _, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll) require.NoError(t, err) // Set the expiry value to a stale value (it's about to be stale, since below it will get updated to a later value) existingBucketDoc.Expiry = uint32(syncMetaExpiry.Unix()) @@ -218,6 +218,70 @@ func TestMigrateMetadata(t *testing.T) { } +// Tests metadata migration where a document with inline sync data has been replicated by XDCR, so also has an +// existing HLV. Migration should preserve the existing HLV while moving doc._sync to sync xattr +func TestMigrateMetadataWithHLV(t *testing.T) { + + if !base.TestUseXattrs() { + t.Skip("This test only works with XATTRS enabled") + } + + base.SetUpTestLogging(t, base.LevelInfo, base.KeyMigrate, base.KeyImport) + + db, ctx := setupTestDB(t) + defer db.Close(ctx) + + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + + key := "TestMigrateMetadata" + bodyBytes := rawDocWithSyncMeta() + body := Body{} + err := body.Unmarshal(bodyBytes) + assert.NoError(t, err, "Error unmarshalling body") + + hlv := &HybridLogicalVector{} + require.NoError(t, hlv.AddVersion(CreateVersion("source123", base.CasToString(100)))) + hlv.CurrentVersionCAS = base.CasToString(100) + hlvBytes := base.MustJSONMarshal(t, hlv) + xattrBytes := map[string][]byte{ + base.VvXattrName: hlvBytes, + } + + // Create via the SDK with inline sync metadata and an existing _vv xattr + _, err = collection.dataStore.WriteWithXattrs(ctx, key, 0, 0, bodyBytes, xattrBytes, nil, nil) + require.NoError(t, err) + + // Get the existing bucket doc + _, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll) + require.NoError(t, err) + + // Migrate metadata + _, _, err = collection.migrateMetadata( + ctx, + key, + body, + existingBucketDoc, + &sgbucket.MutateInOptions{PreserveExpiry: false}, + ) + require.NoError(t, err) + + // Fetch the existing doc, ensure _vv is preserved + var migratedHLV *HybridLogicalVector + _, migratedBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll) + require.NoError(t, err) + migratedHLVBytes, ok := migratedBucketDoc.Xattrs[base.VvXattrName] + require.True(t, ok) + require.NoError(t, base.JSONUnmarshal(migratedHLVBytes, &migratedHLV)) + require.Equal(t, hlv.Version, migratedHLV.Version) + require.Equal(t, hlv.SourceID, migratedHLV.SourceID) + require.Equal(t, hlv.CurrentVersionCAS, migratedHLV.CurrentVersionCAS) + + migratedSyncXattrBytes, ok := migratedBucketDoc.Xattrs[base.SyncXattrName] + require.True(t, ok) + require.NotZero(t, len(migratedSyncXattrBytes)) + +} + // This invokes db.importDoc() with two different scenarios: // // Scenario 1: normal import @@ -277,7 +341,7 @@ func TestImportWithStaleBucketDocCorrectExpiry(t *testing.T) { assert.NoError(t, err, "Error writing doc w/ expiry") // Get the existing bucket doc - _, existingBucketDoc, err := collection.GetDocWithXattr(ctx, key, DocUnmarshalAll) + _, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll) assert.NoError(t, err, fmt.Sprintf("Error retrieving doc w/ xattr: %v", err)) body = Body{} @@ -445,7 +509,7 @@ func TestImportWithCasFailureUpdate(t *testing.T) { assert.NoError(t, err) // Get the existing bucket doc - _, existingBucketDoc, err = collection.GetDocWithXattr(ctx, testcase.docname, DocUnmarshalAll) + _, existingBucketDoc, err = collection.GetDocWithXattrs(ctx, testcase.docname, DocUnmarshalAll) assert.NoError(t, err, fmt.Sprintf("Error retrieving doc w/ xattr: %v", err)) importD := `{"new":"Val"}` diff --git a/db/util_testing.go b/db/util_testing.go index 50d32156a0..9125fb6705 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -178,7 +178,17 @@ func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *ba key := string(event.Key) if base.TestUseXattrs() { - purgeErr = dataStore.DeleteWithXattrs(ctx, key, []string{base.SyncXattrName}) + systemXattrNames, decodeErr := sgbucket.DecodeXattrNames(event.Value, true) + if decodeErr != nil { + purgeErrors = purgeErrors.Append(decodeErr) + tbp.Logf(ctx, "Error decoding DCP event xattrs for key %s. %v", key, decodeErr) + return false + } + if len(systemXattrNames) > 0 { + purgeErr = dataStore.DeleteWithXattrs(ctx, key, systemXattrNames) + } else { + purgeErr = dataStore.Delete(key) + } } else { purgeErr = dataStore.Delete(key) } diff --git a/db/utilities_hlv_testing.go b/db/utilities_hlv_testing.go index 2812060abd..b921036676 100644 --- a/db/utilities_hlv_testing.go +++ b/db/utilities_hlv_testing.go @@ -41,24 +41,21 @@ func NewHLVAgent(t *testing.T, datastore base.DataStore, source string, xattrNam } // InsertWithHLV inserts a new document into the bucket with a populated HLV (matching a write from -// a different HLV-aware peer) +// a different, non-SGW HLV-aware peer) func (h *HLVAgent) InsertWithHLV(ctx context.Context, key string) (casOut uint64) { hlv := &HybridLogicalVector{} err := hlv.AddVersion(CreateVersion(h.Source, hlvExpandMacroCASValue)) require.NoError(h.t, err) hlv.CurrentVersionCAS = hlvExpandMacroCASValue - syncData := &SyncData{HLV: hlv} - syncDataBytes, err := base.JSONMarshal(syncData) - require.NoError(h.t, err) - + vvDataBytes := base.MustJSONMarshal(h.t, hlv) mutateInOpts := &sgbucket.MutateInOptions{ MacroExpansion: hlv.computeMacroExpansions(), } docBody := base.MustJSONMarshal(h.t, defaultHelperBody) xattrData := map[string][]byte{ - h.xattrName: syncDataBytes, + h.xattrName: vvDataBytes, } cas, err := h.datastore.WriteWithXattrs(ctx, key, 0, 0, docBody, xattrData, nil, mutateInOpts) diff --git a/rest/attachment_test.go b/rest/attachment_test.go index 104d4f7b60..0feb56c428 100644 --- a/rest/attachment_test.go +++ b/rest/attachment_test.go @@ -2367,6 +2367,8 @@ func TestMinRevPosWorkToAvoidUnnecessaryProveAttachment(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // CBG-4166 + const docID = "doc" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { @@ -2407,6 +2409,7 @@ func TestAttachmentWithErroneousRevPos(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // CBG-4166 btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, rtConfig) @@ -2433,7 +2436,7 @@ func TestAttachmentWithErroneousRevPos(t *testing.T) { btcRunner.AttachmentsLock(btc.id).Unlock() // Put doc with an erroneous revpos 1 but with a different digest, referring to the above attachment - _, err = btcRunner.PushRevWithHistory(btc.id, docID, version.GetRev(btc.UseHLV()), []byte(`{"_attachments": {"hello.txt": {"revpos":1,"stub":true,"length": 19,"digest":"sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc="}}}`), 1, 0) + _, err := btcRunner.PushRevWithHistory(btc.id, docID, version.GetRev(btc.UseHLV()), []byte(`{"_attachments": {"hello.txt": {"revpos":1,"stub":true,"length": 19,"digest":"sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc="}}}`), 1, 0) require.NoError(t, err) // Ensure message and attachment is pushed up @@ -2587,6 +2590,7 @@ func TestCBLRevposHandling(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // CBG-4166 const ( doc1ID = "doc1" doc2ID = "doc2" diff --git a/rest/blip_api_attachment_test.go b/rest/blip_api_attachment_test.go index 6c0b0badb6..9d42e50ade 100644 --- a/rest/blip_api_attachment_test.go +++ b/rest/blip_api_attachment_test.go @@ -287,10 +287,9 @@ func TestBlipPushPullNewAttachmentCommonAncestor(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - const docID = "doc1" - ctx := base.TestCtx(t) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { + docID := t.Name() rt := NewRestTester(t, &rtConfig) defer rt.Close() @@ -318,7 +317,7 @@ func TestBlipPushPullNewAttachmentCommonAncestor(t *testing.T) { // Wait for the documents to be replicated at SG btc.pushReplication.WaitForMessage(2) - collection := rt.GetSingleTestDatabaseCollection() + collection, ctx := rt.GetSingleTestDatabaseCollection() doc, err := collection.GetDocument(ctx, docID, db.DocUnmarshalNoHistory) require.NoError(t, err) @@ -369,10 +368,9 @@ func TestBlipPushPullNewAttachmentNoCommonAncestor(t *testing.T) { GuestEnabled: true, } - const docID = "doc1" btcRunner := NewBlipTesterClientRunner(t) - ctx := base.TestCtx(t) + const docID = "doc1" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, &rtConfig) defer rt.Close() @@ -406,7 +404,7 @@ func TestBlipPushPullNewAttachmentNoCommonAncestor(t *testing.T) { // Wait for the document to be replicated at SG btc.pushReplication.WaitForMessage(2) - collection := rt.GetSingleTestDatabaseCollection() + collection, ctx := rt.GetSingleTestDatabaseCollection() doc, err := collection.GetDocument(ctx, docID, db.DocUnmarshalNoHistory) require.NoError(t, err) @@ -552,6 +550,7 @@ func TestBlipAttachNameChange(t *testing.T) { rt := NewRestTester(t, rtConfig) defer rt.Close() + docID := "doc" opts := &BlipTesterClientOpts{SupportedBLIPProtocols: SupportedBLIPProtocols} client1 := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts) defer client1.Close() @@ -561,20 +560,20 @@ func TestBlipAttachNameChange(t *testing.T) { digest := db.Sha1DigestKey(attachmentA) // Push initial attachment data - version, err := btcRunner.PushRev(client1.id, "doc", EmptyDocVersion(), []byte(`{"key":"val","_attachments":{"attachment": {"data":"`+attachmentAData+`"}}}`)) + version, err := btcRunner.PushRev(client1.id, docID, EmptyDocVersion(), []byte(`{"key":"val","_attachments":{"attachment": {"data":"`+attachmentAData+`"}}}`)) require.NoError(t, err) // Confirm attachment is in the bucket - attachmentAKey := db.MakeAttachmentKey(2, "doc", digest) + attachmentAKey := db.MakeAttachmentKey(2, docID, digest) bucketAttachmentA, _, err := client1.rt.GetSingleDataStore().GetRaw(attachmentAKey) require.NoError(t, err) require.EqualValues(t, bucketAttachmentA, attachmentA) // Simulate changing only the attachment name over CBL // Use revpos 2 to simulate revpos bug in CBL 2.8 - 3.0.0 - version, err = btcRunner.PushRev(client1.id, "doc", version, []byte(`{"key":"val","_attachments":{"attach":{"revpos":2,"content_type":"","length":11,"stub":true,"digest":"`+digest+`"}}}`)) + version, err = btcRunner.PushRev(client1.id, docID, version, []byte(`{"key":"val","_attachments":{"attach":{"revpos":2,"content_type":"","length":11,"stub":true,"digest":"`+digest+`"}}}`)) require.NoError(t, err) - err = client1.rt.WaitForVersion("doc", version) + err = client1.rt.WaitForVersion(docID, version) require.NoError(t, err) // Check if attachment is still in bucket @@ -582,7 +581,7 @@ func TestBlipAttachNameChange(t *testing.T) { assert.NoError(t, err) assert.Equal(t, bucketAttachmentA, attachmentA) - resp := client1.rt.SendAdminRequest("GET", "/{{.keyspace}}/doc/attach", "") + resp := client1.rt.SendAdminRequest("GET", "/{{.keyspace}}/"+docID+"/attach", "") RequireStatus(t, resp, http.StatusOK) assert.Equal(t, attachmentA, resp.BodyBytes()) }) diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index a3a07a633b..961a1c431b 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -1738,15 +1738,17 @@ func TestPutRevV4(t *testing.T) { defer bt.Close() collection, _ := bt.restTester.GetSingleTestDatabaseCollection() + docID := t.Name() + // 1. Send rev with history history := "1@def, 2@abc" - sent, _, resp, err := bt.SendRev("foo", db.EncodeTestVersion("3@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(history)}) + sent, _, resp, err := bt.SendRev(docID, db.EncodeTestVersion("3@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(history)}) assert.True(t, sent) require.NoError(t, err) assert.Equal(t, "", resp.Properties["Error-Code"]) // Validate against the bucket doc's HLV - doc, _, err := collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + doc, _, err := collection.GetDocWithXattrs(base.TestCtx(t), docID, db.DocUnmarshalNoHistory) require.NoError(t, err) pv, _ := db.ParseTestHistory(t, history) db.RequireCVEqual(t, doc.HLV, "3@efg") @@ -1754,13 +1756,13 @@ func TestPutRevV4(t *testing.T) { assert.True(t, reflect.DeepEqual(pv, doc.HLV.PreviousVersions)) // 2. Update the document with a non-conflicting revision, where only cv is updated - sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("4@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(history)}) + sent, _, resp, err = bt.SendRev(docID, db.EncodeTestVersion("4@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(history)}) assert.True(t, sent) require.NoError(t, err) assert.Equal(t, "", resp.Properties["Error-Code"]) // Validate against the bucket doc's HLV - doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + doc, _, err = collection.GetDocWithXattrs(base.TestCtx(t), docID, db.DocUnmarshalNoHistory) require.NoError(t, err) db.RequireCVEqual(t, doc.HLV, "4@efg") assert.Equal(t, db.EncodeValue(doc.Cas), doc.HLV.CurrentVersionCAS) @@ -1768,13 +1770,13 @@ func TestPutRevV4(t *testing.T) { // 3. Update the document again with a non-conflicting revision from a different source (previous cv moved to pv) updatedHistory := "1@def, 2@abc, 4@efg" - sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("1@jkl"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) + sent, _, resp, err = bt.SendRev(docID, db.EncodeTestVersion("1@jkl"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) assert.True(t, sent) require.NoError(t, err) assert.Equal(t, "", resp.Properties["Error-Code"]) // Validate against the bucket doc's HLV - doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + doc, _, err = collection.GetDocWithXattrs(base.TestCtx(t), docID, db.DocUnmarshalNoHistory) require.NoError(t, err) pv, _ = db.ParseTestHistory(t, updatedHistory) db.RequireCVEqual(t, doc.HLV, "1@jkl") @@ -1783,13 +1785,13 @@ func TestPutRevV4(t *testing.T) { // 4. Update the document again with a non-conflicting revision from a different source, and additional sources in history (previous cv moved to pv, and pv expanded) updatedHistory = "1@def, 2@abc, 4@efg, 1@jkl, 1@mmm" - sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("1@nnn"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) + sent, _, resp, err = bt.SendRev(docID, db.EncodeTestVersion("1@nnn"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) assert.True(t, sent) require.NoError(t, err) assert.Equal(t, "", resp.Properties["Error-Code"]) // Validate against the bucket doc's HLV - doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + doc, _, err = collection.GetDocWithXattrs(base.TestCtx(t), docID, db.DocUnmarshalNoHistory) require.NoError(t, err) pv, _ = db.ParseTestHistory(t, updatedHistory) db.RequireCVEqual(t, doc.HLV, "1@nnn") @@ -1797,20 +1799,21 @@ func TestPutRevV4(t *testing.T) { assert.True(t, reflect.DeepEqual(pv, doc.HLV.PreviousVersions)) // 5. Attempt to update the document again with a conflicting revision from a different source (previous cv not in pv), expect conflict - sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("1@pqr"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) + sent, _, resp, err = bt.SendRev(docID, db.EncodeTestVersion("1@pqr"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) assert.True(t, sent) require.Error(t, err) assert.Equal(t, "409", resp.Properties["Error-Code"]) // 6. Test sending rev with merge versions included in history (note new key) + newDocID := t.Name() + "_2" mvHistory := "3@def, 3@abc; 1@def, 2@abc" - sent, _, resp, err = bt.SendRev("boo", db.EncodeTestVersion("3@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(mvHistory)}) + sent, _, resp, err = bt.SendRev(newDocID, db.EncodeTestVersion("3@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(mvHistory)}) assert.True(t, sent) require.NoError(t, err) assert.Equal(t, "", resp.Properties["Error-Code"]) // assert on bucket doc - doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "boo", db.DocUnmarshalNoHistory) + doc, _, err = collection.GetDocWithXattrs(base.TestCtx(t), newDocID, db.DocUnmarshalNoHistory) require.NoError(t, err) pv, mv := db.ParseTestHistory(t, mvHistory) @@ -2168,13 +2171,13 @@ func TestPullReplicationUpdateOnOtherHLVAwarePeer(t *testing.T) { const docID = "doc1" otherSource := "otherSource" - hlvHelper := db.NewHLVAgent(t, rt.GetSingleDataStore(), otherSource, "_sync") + hlvHelper := db.NewHLVAgent(t, rt.GetSingleDataStore(), otherSource, "_vv") existingHLVKey := "doc1" cas := hlvHelper.InsertWithHLV(ctx, existingHLVKey) // force import of this write _, _ = rt.GetDoc(docID) - bucketDoc, _, err := collection.GetDocWithXattr(ctx, docID, db.DocUnmarshalAll) + bucketDoc, _, err := collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalAll) require.NoError(t, err) // create doc version of the above doc write diff --git a/rest/changes_test.go b/rest/changes_test.go index ed8c6292a1..9b8e912aad 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -430,7 +430,7 @@ func TestCVPopulationOnChangesViaAPI(t *testing.T) { changes, err := rt.WaitForChanges(1, "/{{.keyspace}}/_changes", "", true) require.NoError(t, err) - fetchedDoc, _, err := collection.GetDocWithXattr(ctx, DocID, db.DocUnmarshalCAS) + fetchedDoc, _, err := collection.GetDocWithXattrs(ctx, DocID, db.DocUnmarshalCAS) require.NoError(t, err) assert.Equal(t, "doc1", changes.Results[0].ID) @@ -461,7 +461,7 @@ func TestCVPopulationOnDocIDChanges(t *testing.T) { changes, err := rt.WaitForChanges(1, fmt.Sprintf(`/{{.keyspace}}/_changes?filter=_doc_ids&doc_ids=%s`, DocID), "", true) require.NoError(t, err) - fetchedDoc, _, err := collection.GetDocWithXattr(ctx, DocID, db.DocUnmarshalCAS) + fetchedDoc, _, err := collection.GetDocWithXattrs(ctx, DocID, db.DocUnmarshalCAS) require.NoError(t, err) assert.Equal(t, "doc1", changes.Results[0].ID) diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index 574293cfba..8e248a22b0 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -805,7 +805,7 @@ func TestChangesFromCompoundSinceViaDocGrant(t *testing.T) { // Write another doc _ = rt.PutDoc("mix-1", `{"channel":["ABC", "PBS", "HBO"]}`) - fetchedDoc, _, err := collection.GetDocWithXattr(ctx, "mix-1", db.DocUnmarshalSync) + fetchedDoc, _, err := collection.GetDocWithXattrs(ctx, "mix-1", db.DocUnmarshalSync) require.NoError(t, err) mixSource, mixVersion := fetchedDoc.HLV.GetCurrentVersion()