From c5b4ee47772adc0a6b8c3ca8bf8d68fed3dc58c4 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Fri, 24 May 2024 17:14:53 -0700 Subject: [PATCH] CBG-3877 Persist HLV to _vv xattr (#6843) Modifies document marshal and unmarshal to support a set of xattrs (_sync, _vv), and does the same for parsing DCP stream events (including user xattr). --- base/constants.go | 1 + db/change_cache.go | 3 +- db/crud.go | 59 ++++---- db/crud_test.go | 2 +- db/database.go | 7 +- db/database_collection.go | 2 +- db/document.go | 189 ++++++++++++++---------- db/document_test.go | 70 ++++----- db/hybrid_logical_vector.go | 4 +- db/hybrid_logical_vector_test.go | 13 +- db/import.go | 24 ++- db/import_listener.go | 6 +- db/import_test.go | 76 +++++++++- db/util_testing.go | 12 +- db/utilities_hlv_testing.go | 9 +- rest/blip_api_attachment_test.go | 15 +- rest/blip_api_crud_test.go | 29 ++-- rest/changes_test.go | 4 +- rest/changestest/changes_api_test.go | 2 +- rest/importuserxattrtest/import_test.go | 18 +-- 20 files changed, 329 insertions(+), 216 deletions(-) diff --git a/base/constants.go b/base/constants.go index d59998f3cd..a5b48b9b8a 100644 --- a/base/constants.go +++ b/base/constants.go @@ -136,6 +136,7 @@ const ( SyncPropertyName = "_sync" // SyncXattrName is used when storing sync data in a document's xattrs. SyncXattrName = "_sync" + VvXattrName = "_vv" // Intended to be used in Meta Map and related tests MetaMapXattrsKey = "xattrs" diff --git a/db/change_cache.go b/db/change_cache.go index 0426d43320..9e0b945b37 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -396,7 +396,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) { } // First unmarshal the doc (just its metadata, to save time/memory): - syncData, rawBody, _, rawUserXattr, err := UnmarshalDocumentSyncDataFromFeed(docJSON, event.DataType, collection.userXattrKey(), false) + syncData, rawBody, rawXattrs, err := UnmarshalDocumentSyncDataFromFeed(docJSON, event.DataType, collection.userXattrKey(), false) if err != nil { // Avoid log noise related to failed unmarshaling of binary documents. if event.DataType != base.MemcachedDataTypeRaw { @@ -409,6 +409,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) { } // If using xattrs and this isn't an SG write, we shouldn't attempt to cache. + rawUserXattr := rawXattrs[collection.userXattrKey()] if collection.UseXattrs() { if syncData == nil { return diff --git a/db/crud.go b/db/crud.go index bd6098731b..0e7e34763e 100644 --- a/db/crud.go +++ b/db/crud.go @@ -60,7 +60,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin return nil, nil, base.HTTPErrorf(400, "Invalid doc ID") } if c.UseXattrs() { - doc, rawBucketDoc, err = c.GetDocWithXattr(ctx, key, unmarshalLevel) + doc, rawBucketDoc, err = c.GetDocWithXattrs(ctx, key, unmarshalLevel) if err != nil { return nil, nil, err } @@ -73,7 +73,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin // If existing doc wasn't an SG Write, import the doc. if !isSgWrite { var importErr error - doc, importErr = c.OnDemandImportForGet(ctx, docid, rawBucketDoc.Body, rawBucketDoc.Xattrs[base.SyncXattrName], rawBucketDoc.Xattrs[c.userXattrKey()], rawBucketDoc.Cas) + doc, importErr = c.OnDemandImportForGet(ctx, docid, rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas) if importErr != nil { return nil, nil, importErr } @@ -114,7 +114,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin return doc, rawBucketDoc, nil } -func (c *DatabaseCollection) GetDocWithXattr(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) { +func (c *DatabaseCollection) GetDocWithXattrs(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) { rawBucketDoc = &sgbucket.BucketDocument{} var getErr error rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncAndUserXattrKeys()) @@ -123,7 +123,7 @@ func (c *DatabaseCollection) GetDocWithXattr(ctx context.Context, key string, un } var unmarshalErr error - doc, unmarshalErr = unmarshalDocumentWithXattr(ctx, key, rawBucketDoc.Body, rawBucketDoc.Xattrs[base.SyncXattrName], rawBucketDoc.Xattrs[c.userXattrKey()], rawBucketDoc.Cas, unmarshalLevel) + doc, unmarshalErr = c.unmarshalDocumentWithXattrs(ctx, key, rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, unmarshalLevel) if unmarshalErr != nil { return nil, nil, unmarshalErr } @@ -148,11 +148,8 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) ( return emptySyncData, getErr } - rawXattr := xattrs[base.SyncXattrName] - rawUserXattr := xattrs[c.userXattrKey()] - // Unmarshal xattr only - doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, docid, nil, rawXattr, rawUserXattr, cas, DocUnmarshalSync) + doc, unmarshalErr := c.unmarshalDocumentWithXattrs(ctx, docid, nil, xattrs, cas, DocUnmarshalSync) if unmarshalErr != nil { return emptySyncData, unmarshalErr } @@ -166,7 +163,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) ( if !isSgWrite { var importErr error - doc, importErr = c.OnDemandImportForGet(ctx, docid, rawDoc, rawXattr, rawUserXattr, cas) + doc, importErr = c.OnDemandImportForGet(ctx, docid, rawDoc, xattrs, cas) if importErr != nil { return emptySyncData, importErr } @@ -193,6 +190,12 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) ( } +// unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map +func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { + return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[db.userXattrKey()], cas, unmarshalLevel) + +} + // This gets *just* the Sync Metadata (_sync field) rather than the entire doc, for efficiency // reasons. Unlike GetDocSyncData it does not check for on-demand import; this means it does not // need to read the doc body from the bucket. @@ -200,10 +203,10 @@ func (db *DatabaseCollection) GetDocSyncDataNoImport(ctx context.Context, docid if db.UseXattrs() { var xattrs map[string][]byte var cas uint64 - xattrs, cas, err = db.dataStore.GetXattrs(ctx, docid, []string{base.SyncXattrName}) + xattrs, cas, err = db.dataStore.GetXattrs(ctx, docid, []string{base.SyncXattrName, base.VvXattrName}) if err == nil { var doc *Document - doc, err = unmarshalDocumentWithXattr(ctx, docid, nil, xattrs[base.SyncXattrName], nil, cas, level) + doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, nil, xattrs, cas, level) if err == nil { syncData = doc.SyncData } @@ -235,14 +238,14 @@ func (db *DatabaseCollection) GetDocSyncDataNoImport(ctx context.Context, docid return } -// OnDemandImportForGet. Attempts to import the doc based on the provided id, contents and cas. ImportDocRaw does cas retry handling +// OnDemandImportForGet. Attempts to import the doc based on the provided id, contents and cas. ImportDocRaw does cas retry handling // if the document gets updated after the initial retrieval attempt that triggered this. -func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid string, rawDoc []byte, rawXattr []byte, rawUserXattr []byte, cas uint64) (docOut *Document, err error) { +func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid string, rawDoc []byte, xattrs map[string][]byte, cas uint64) (docOut *Document, err error) { isDelete := rawDoc == nil importDb := DatabaseCollectionWithUser{DatabaseCollection: c, user: nil} var importErr error - docOut, importErr = importDb.ImportDocRaw(ctx, docid, rawDoc, rawXattr, rawUserXattr, isDelete, cas, nil, ImportOnDemand) + docOut, importErr = importDb.ImportDocRaw(ctx, docid, rawDoc, xattrs, isDelete, cas, nil, ImportOnDemand) if importErr == base.ErrImportCancelledFilter { // If the import was cancelled due to filter, treat as not found return nil, base.HTTPErrorf(404, "Not imported") @@ -1076,7 +1079,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) { var matchRev string if existingDoc != nil { - doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs[base.SyncXattrName], existingDoc.Xattrs[db.userXattrKey()], existingDoc.Cas, DocUnmarshalRev) + doc, unmarshalErr := db.unmarshalDocumentWithXattrs(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs, existingDoc.Cas, DocUnmarshalRev) if unmarshalErr != nil { return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling existing doc") } @@ -2204,15 +2207,15 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do } casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) { // Be careful: this block can be invoked multiple times if there are races! - currentXattr := currentXattrs[base.SyncXattrName] - currentUserXattr := currentXattrs[db.userXattrKey()] - if doc, err = unmarshalDocumentWithXattr(ctx, docid, currentValue, currentXattr, currentUserXattr, cas, DocUnmarshalAll); err != nil { + + if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil { return } prevCurrentRev = doc.CurrentRev // Check whether Sync Data originated in body - if currentXattr == nil && doc.Sequence > 0 { + currentSyncXattr := currentXattrs[base.SyncXattrName] + if currentSyncXattr == nil && doc.Sequence > 0 { doc.inlineSyncData = true } @@ -2249,18 +2252,22 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do // Return the new raw document value for the bucket to store. doc.SetCrc32cUserXattrHash() - var rawXattr, rawDocBody []byte - rawDocBody, rawXattr, err = doc.MarshalWithXattr() + + var rawSyncXattr, rawVvXattr, rawDocBody []byte + rawDocBody, rawSyncXattr, rawVvXattr, err = doc.MarshalWithXattrs() + if err != nil { + return updatedDoc, err + } if !isImport { updatedDoc.Doc = rawDocBody docBytes = len(updatedDoc.Doc) } - updatedDoc.Xattrs = map[string][]byte{base.SyncXattrName: rawXattr} + updatedDoc.Xattrs = map[string][]byte{base.SyncXattrName: rawSyncXattr, base.VvXattrName: rawVvXattr} // Warn when sync data is larger than a configured threshold if db.unsupportedOptions() != nil && db.unsupportedOptions().WarningThresholds != nil { if xattrBytesThreshold := db.unsupportedOptions().WarningThresholds.XattrSize; xattrBytesThreshold != nil { - xattrBytes = len(rawXattr) + xattrBytes = len(rawSyncXattr) if uint32(xattrBytes) >= *xattrBytesThreshold { db.dbStats().Database().WarnXattrSizeCount.Add(1) base.WarnfCtx(ctx, "Doc id: %v sync metadata size: %d bytes exceeds %d bytes for sync metadata warning threshold", base.UD(doc.ID), xattrBytes, *xattrBytesThreshold) @@ -2815,7 +2822,7 @@ func (c *DatabaseCollection) checkForUpgrade(ctx context.Context, key string, un return nil, nil } - doc, rawDocument, err := c.GetDocWithXattr(ctx, key, unmarshalLevel) + doc, rawDocument, err := c.GetDocWithXattrs(ctx, key, unmarshalLevel) if err != nil || doc == nil || !doc.HasValidSyncData() { return nil, nil } @@ -2968,8 +2975,8 @@ const ( xattrMacroCas = "cas" // SyncData.Cas xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c xattrMacroCurrentRevVersion = "rev.ver" // SyncDataJSON.RevAndVersion.CurrentVersion - versionVectorVrsMacro = "_vv.ver" // PersistedHybridLogicalVector.Version - versionVectorCVCASMacro = "_vv.cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS + versionVectorVrsMacro = "ver" // PersistedHybridLogicalVector.Version + versionVectorCVCASMacro = "cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS ) func macroExpandSpec(xattrName string) []sgbucket.MacroExpansionSpec { diff --git a/db/crud_test.go b/db/crud_test.go index 97d5211120..fed350e0c2 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -245,7 +245,7 @@ func TestHasAttachmentsFlagForLegacyAttachments(t *testing.T) { require.NoError(t, err) // Get the existing bucket doc - _, existingBucketDoc, err := collection.GetDocWithXattr(ctx, docID, DocUnmarshalAll) + _, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, docID, DocUnmarshalAll) require.NoError(t, err) // Migrate document metadata from document body to system xattr. diff --git a/db/database.go b/db/database.go index baf6bcb155..b43ec5e071 100644 --- a/db/database.go +++ b/db/database.go @@ -1792,7 +1792,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, if currentValue == nil || len(currentValue) == 0 { return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel } - doc, err := unmarshalDocumentWithXattr(ctx, docid, currentValue, currentXattrs[base.SyncXattrName], currentXattrs[db.userXattrKey()], cas, DocUnmarshalAll) + doc, err := db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll) if err != nil { return sgbucket.UpdatedDoc{}, err } @@ -1808,11 +1808,12 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, updatedDoc.UpdateExpiry(*updatedExpiry) } doc.SetCrc32cUserXattrHash() - _, rawXattr, err := updatedDoc.MarshalWithXattr() + _, rawSyncXattr, rawVvXattr, err := updatedDoc.MarshalWithXattrs() return sgbucket.UpdatedDoc{ Doc: nil, // Resync does not require document body update Xattrs: map[string][]byte{ - base.SyncXattrName: rawXattr, + base.SyncXattrName: rawSyncXattr, + base.VvXattrName: rawVvXattr, }, Expiry: updatedExpiry, }, err diff --git a/db/database_collection.go b/db/database_collection.go index 760c1a44b4..81e7c628f8 100644 --- a/db/database_collection.go +++ b/db/database_collection.go @@ -253,7 +253,7 @@ func (c *DatabaseCollection) unsupportedOptions() *UnsupportedOptions { // syncAndUserXattrKeys returns the xattr keys for the user and sync xattrs. func (c *DatabaseCollection) syncAndUserXattrKeys() []string { - xattrKeys := []string{base.SyncXattrName} + xattrKeys := []string{base.SyncXattrName, base.VvXattrName} userXattrKey := c.userXattrKey() if userXattrKey != "" { xattrKeys = append(xattrKeys, userXattrKey) diff --git a/db/document.go b/db/document.go index e61d1ca10f..d67cfe2c01 100644 --- a/db/document.go +++ b/db/document.go @@ -34,11 +34,11 @@ const DocumentHistoryMaxEntriesPerChannel = 5 type DocumentUnmarshalLevel uint8 const ( - DocUnmarshalAll = DocumentUnmarshalLevel(iota) // Unmarshals sync metadata and body - DocUnmarshalSync // Unmarshals all sync metadata - DocUnmarshalNoHistory // Unmarshals sync metadata excluding revtree history + DocUnmarshalAll = DocumentUnmarshalLevel(iota) // Unmarshals metadata and body + DocUnmarshalSync // Unmarshals metadata + DocUnmarshalNoHistory // Unmarshals metadata excluding revtree history DocUnmarshalHistory // Unmarshals revtree history + rev + CAS only - DocUnmarshalRev // Unmarshals rev + CAS only + DocUnmarshalRev // Unmarshals revTreeID + CAS only (no HLV) DocUnmarshalCAS // Unmarshals CAS (for import check) only DocUnmarshalNone // No unmarshalling (skips import/upgrade check) ) @@ -81,7 +81,7 @@ type SyncData struct { Attachments AttachmentsMeta `json:"attachments,omitempty"` ChannelSet []ChannelSetEntry `json:"channel_set"` ChannelSetHistory []ChannelSetEntry `json:"channel_set_history"` - HLV *HybridLogicalVector `json:"_vv,omitempty"` + HLV *HybridLogicalVector `json:"-"` // Marshalled/Unmarshalled separately from SyncData for storage in _vv, see MarshalWithXattrs/UnmarshalWithXattrs // Only used for performance metrics: TimeSaved time.Time `json:"time_saved,omitempty"` // Timestamp of save. @@ -401,14 +401,14 @@ func unmarshalDocument(docid string, data []byte) (*Document, error) { return doc, nil } -func unmarshalDocumentWithXattr(ctx context.Context, docid string, data []byte, xattrData []byte, userXattrData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { +func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, syncXattrData []byte, hlvXattrData []byte, userXattrData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { - if xattrData == nil || len(xattrData) == 0 { + if len(syncXattrData) == 0 && len(hlvXattrData) == 0 { // If no xattr data, unmarshal as standard doc doc, err = unmarshalDocument(docid, data) } else { doc = NewDocument(docid) - err = doc.UnmarshalWithXattr(ctx, data, xattrData, unmarshalLevel) + err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, unmarshalLevel) } if err != nil { return nil, err @@ -444,37 +444,47 @@ func UnmarshalDocumentSyncData(data []byte, needHistory bool) (*SyncData, error) // Returns the raw body, in case it's needed for import. // TODO: Using a pool of unmarshal workers may help prevent memory spikes under load -func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey string, needHistory bool) (result *SyncData, rawBody []byte, rawSyncXattr []byte, rawUserXattr []byte, err error) { +func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey string, needHistory bool) (result *SyncData, rawBody []byte, rawXattrs map[string][]byte, err error) { var body []byte // If xattr datatype flag is set, data includes both xattrs and document body. Check for presence of sync xattr. // Note that there could be a non-sync xattr present + var xattrValues map[string][]byte + var hlv *HybridLogicalVector if dataType&base.MemcachedDataTypeXattr != 0 { - var xattrs map[string][]byte - xattrKeys := []string{base.SyncXattrName} - if userXattrKey != "" { - xattrKeys = append(xattrKeys, userXattrKey) - } - body, xattrs, err = sgbucket.DecodeValueWithXattrs(xattrKeys, data) + xattrKeys := []string{base.SyncXattrName, base.VvXattrName, userXattrKey} + body, xattrValues, err = sgbucket.DecodeValueWithXattrs(xattrKeys, data) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, err } - rawSyncXattr = xattrs[base.SyncXattrName] - rawUserXattr = xattrs[userXattrKey] // If the sync xattr is present, use that to build SyncData - if len(rawSyncXattr) > 0 { + syncXattr, ok := xattrValues[base.SyncXattrName] + + if vvXattr, ok := xattrValues[base.VvXattrName]; ok { + err = base.JSONUnmarshal(vvXattr, &hlv) + if err != nil { + return nil, nil, nil, fmt.Errorf("error unmarshalling HLV: %w", err) + } + } + + if ok && len(syncXattr) > 0 { result = &SyncData{} if needHistory { result.History = make(RevTree) } - err = base.JSONUnmarshal(rawSyncXattr, result) + err = base.JSONUnmarshal(syncXattr, result) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", syncXattr, err) + return nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", string(syncXattr), err) + } + + if hlv != nil { + result.HLV = hlv } - return result, body, rawSyncXattr, rawUserXattr, nil + return result, body, xattrValues, nil } + } else { // Xattr flag not set - data is just the document body body = data @@ -482,22 +492,16 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey // Non-xattr data, or sync xattr not present. Attempt to retrieve sync metadata from document body result, err = UnmarshalDocumentSyncData(body, needHistory) - return result, body, nil, rawUserXattr, err -} -func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, data []byte, dataType uint8, userXattrKey string) (doc *Document, err error) { - if dataType&base.MemcachedDataTypeXattr == 0 { - return unmarshalDocument(docid, data) - } - xattrKeys := []string{base.SyncXattrName} - if userXattrKey != "" { - xattrKeys = append(xattrKeys, userXattrKey) + // If no sync data was found but HLV was present, initialize empty sync data + if result == nil && hlv != nil { + result = &SyncData{} } - body, xattrs, err := sgbucket.DecodeValueWithXattrs(xattrKeys, data) - if err != nil { - return nil, err + // If HLV was found, add to sync data + if hlv != nil { + result.HLV = hlv } - return unmarshalDocumentWithXattr(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[userXattrKey], cas, DocUnmarshalAll) + return result, body, xattrValues, err } func (doc *SyncData) HasValidSyncData() bool { @@ -1055,11 +1059,12 @@ func (doc *Document) MarshalJSON() (data []byte, err error) { return data, err } -// UnmarshalWithXattr unmarshals the provided raw document and xattr bytes. The provided DocumentUnmarshalLevel +// UnmarshalWithXattrs unmarshals the provided raw document and xattr bytes when present. The provided DocumentUnmarshalLevel // (unmarshalLevel) specifies how much of the provided document/xattr needs to be initially unmarshalled. If // unmarshalLevel is anything less than the full document + metadata, the raw data is retained for subsequent // lazy unmarshalling as needed. -func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata []byte, unmarshalLevel DocumentUnmarshalLevel) error { +// Must handle cases where document body and hlvXattrData are present without syncXattrData for all DocumentUnmarshalLevel +func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData []byte, unmarshalLevel DocumentUnmarshalLevel) error { if doc.ID == "" { base.WarnfCtx(ctx, "Attempted to unmarshal document without ID set") return errors.New("Document was unmarshalled without ID set") @@ -1067,11 +1072,19 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata switch unmarshalLevel { case DocUnmarshalAll, DocUnmarshalSync: - // Unmarshal full document and/or sync metadata + // Unmarshal full document and/or sync metadata. Documents written by XDCR may have HLV but no sync data doc.SyncData = SyncData{History: make(RevTree)} - unmarshalErr := base.JSONUnmarshal(xdata, &doc.SyncData) - if unmarshalErr != nil { - return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), unmarshalErr)) + if syncXattrData != nil { + unmarshalErr := base.JSONUnmarshal(syncXattrData, &doc.SyncData) + if unmarshalErr != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), unmarshalErr)) + } + } + if hlvXattrData != nil { + err := base.JSONUnmarshal(hlvXattrData, &doc.SyncData.HLV) + if err != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err)) + } } doc._rawBody = data // Unmarshal body if requested and present @@ -1081,50 +1094,70 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata case DocUnmarshalNoHistory: // Unmarshal sync metadata only, excluding history doc.SyncData = SyncData{} - unmarshalErr := base.JSONUnmarshal(xdata, &doc.SyncData) - if unmarshalErr != nil { - return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalNoHistory). Error: %v", base.UD(doc.ID), unmarshalErr)) + if syncXattrData != nil { + unmarshalErr := base.JSONUnmarshal(syncXattrData, &doc.SyncData) + if unmarshalErr != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattrs() doc with id: %s (DocUnmarshalNoHistory). Error: %v", base.UD(doc.ID), unmarshalErr)) + } + } + if hlvXattrData != nil { + err := base.JSONUnmarshal(hlvXattrData, &doc.SyncData.HLV) + if err != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalNoHistory). Error: %v", base.UD(doc.ID), err)) + } } doc._rawBody = data case DocUnmarshalHistory: - historyOnlyMeta := historyOnlySyncData{History: make(RevTree)} - unmarshalErr := base.JSONUnmarshal(xdata, &historyOnlyMeta) - if unmarshalErr != nil { - return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalHistory). Error: %v", base.UD(doc.ID), unmarshalErr)) - } - doc.SyncData = SyncData{ - CurrentRev: historyOnlyMeta.CurrentRev.RevTreeID, - History: historyOnlyMeta.History, - Cas: historyOnlyMeta.Cas, + if syncXattrData != nil { + historyOnlyMeta := historyOnlySyncData{History: make(RevTree)} + unmarshalErr := base.JSONUnmarshal(syncXattrData, &historyOnlyMeta) + if unmarshalErr != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattrs() doc with id: %s (DocUnmarshalHistory). Error: %v", base.UD(doc.ID), unmarshalErr)) + } + doc.SyncData = SyncData{ + CurrentRev: historyOnlyMeta.CurrentRev.RevTreeID, + History: historyOnlyMeta.History, + Cas: historyOnlyMeta.Cas, + } + } else { + doc.SyncData = SyncData{} } doc._rawBody = data case DocUnmarshalRev: // Unmarshal only rev and cas from sync metadata - var revOnlyMeta revOnlySyncData - unmarshalErr := base.JSONUnmarshal(xdata, &revOnlyMeta) - if unmarshalErr != nil { - return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalRev). Error: %v", base.UD(doc.ID), unmarshalErr)) - } - doc.SyncData = SyncData{ - CurrentRev: revOnlyMeta.CurrentRev.RevTreeID, - Cas: revOnlyMeta.Cas, + if syncXattrData != nil { + var revOnlyMeta revOnlySyncData + unmarshalErr := base.JSONUnmarshal(syncXattrData, &revOnlyMeta) + if unmarshalErr != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattrs() doc with id: %s (DocUnmarshalRev). Error: %v", base.UD(doc.ID), unmarshalErr)) + } + doc.SyncData = SyncData{ + CurrentRev: revOnlyMeta.CurrentRev.RevTreeID, + Cas: revOnlyMeta.Cas, + } + } else { + doc.SyncData = SyncData{} } doc._rawBody = data case DocUnmarshalCAS: // Unmarshal only cas from sync metadata - var casOnlyMeta casOnlySyncData - unmarshalErr := base.JSONUnmarshal(xdata, &casOnlyMeta) - if unmarshalErr != nil { - return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalCAS). Error: %v", base.UD(doc.ID), unmarshalErr)) - } - doc.SyncData = SyncData{ - Cas: casOnlyMeta.Cas, + if syncXattrData != nil { + var casOnlyMeta casOnlySyncData + unmarshalErr := base.JSONUnmarshal(syncXattrData, &casOnlyMeta) + if unmarshalErr != nil { + return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattrs() doc with id: %s (DocUnmarshalCAS). Error: %v", base.UD(doc.ID), unmarshalErr)) + } + doc.SyncData = SyncData{ + Cas: casOnlyMeta.Cas, + } + } else { + doc.SyncData = SyncData{} } doc._rawBody = data } // If there's no body, but there is an xattr, set deleted flag and initialize an empty body - if len(data) == 0 && len(xdata) > 0 { + if len(data) == 0 && len(syncXattrData) > 0 { doc._body = Body{} doc._rawBody = []byte(base.EmptyDocument) doc.Deleted = true @@ -1132,7 +1165,8 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata return nil } -func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) { +// MarshalWithXattrs marshals the Document into body, and sync and vv xattrs for persistence. +func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr []byte, vvXattr []byte, err error) { // Grab the rawBody if it's already marshalled, otherwise unmarshal the body if doc._rawBody != nil { if !doc.IsDeleted() { @@ -1149,18 +1183,25 @@ func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) { if !deleted { data, err = base.JSONMarshal(body) if err != nil { - return nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattr() doc body with id: %s. Error: %v", base.UD(doc.ID), err)) + return nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc body with id: %s. Error: %v", base.UD(doc.ID), err)) } } } } - xdata, err = base.JSONMarshal(&doc.SyncData) + if doc.SyncData.HLV != nil { + vvXattr, err = base.JSONMarshal(&doc.SyncData.HLV) + if err != nil { + return nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc vv with id: %s. Error: %v", base.UD(doc.ID), err)) + } + } + + syncXattr, err = base.JSONMarshal(&doc.SyncData) if err != nil { - return nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattr() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err)) + return nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err)) } - return data, xdata, nil + return data, syncXattr, vvXattr, nil } // HasCurrentVersion Compares the specified CV with the fetched documents CV, returns error on mismatch between the two diff --git a/db/document_test.go b/db/document_test.go index 4a2b0312d0..7da06d479b 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -137,7 +137,7 @@ func BenchmarkDocUnmarshal(b *testing.B) { b.Run(bm.name, func(b *testing.B) { ctx := base.TestCtx(b) for i := 0; i < b.N; i++ { - _, _ = unmarshalDocumentWithXattr(ctx, "doc_1k", doc1k_body, doc1k_meta, nil, 1, bm.unmarshalLevel) + _, _ = unmarshalDocumentWithXattrs(ctx, "doc_1k", doc1k_body, doc1k_meta, nil, nil, 1, bm.unmarshalLevel) } }) } @@ -192,7 +192,7 @@ func BenchmarkUnmarshalBody(b *testing.B) { } } -const doc_meta_with_vv = `{ +const doc_meta_no_vv = `{ "rev": "3-89758294abc63157354c2b08547c2d21", "sequence": 7, "recent_sequences": [ @@ -235,22 +235,23 @@ const doc_meta_with_vv = `{ }, "GHI": null }, - "_vv":{ - "cvCas":"0x40e2010000000000", - "src":"cb06dc003846116d9b66d2ab23887a96", - "ver":"0x40e2010000000000", - "mv":{ - "s_LhRPsa7CpjEvP5zeXTXEBA":"c0ff05d7ac059a16", - "s_NqiIe0LekFPLeX4JvTO6Iw":"1c008cd6ac059a16" - }, - "pv":{ - "s_YZvBpEaztom9z5V/hDoeIw":"f0ff44d6ac059a16" - } - }, "cas": "", "time_saved": "2017-10-25T12:45:29.622450174-07:00" }` +const doc_meta_vv = `{ + "cvCas":"0x40e2010000000000", + "src":"cb06dc003846116d9b66d2ab23887a96", + "ver":"0x40e2010000000000", + "mv":{ + "s_LhRPsa7CpjEvP5zeXTXEBA":"c0ff05d7ac059a16", + "s_NqiIe0LekFPLeX4JvTO6Iw":"1c008cd6ac059a16" + }, + "pv":{ + "s_YZvBpEaztom9z5V/hDoeIw":"f0ff44d6ac059a16" + } + }` + func TestParseVersionVectorSyncData(t *testing.T) { mv := make(map[string]string) pv := make(map[string]string) @@ -260,8 +261,9 @@ func TestParseVersionVectorSyncData(t *testing.T) { ctx := base.TestCtx(t) - doc_meta := []byte(doc_meta_with_vv) - doc, err := unmarshalDocumentWithXattr(ctx, "doc_1k", nil, doc_meta, nil, 1, DocUnmarshalNoHistory) + sync_meta := []byte(doc_meta_no_vv) + vv_meta := []byte(doc_meta_vv) + doc, err := unmarshalDocumentWithXattrs(ctx, "doc_1k", nil, sync_meta, vv_meta, nil, 1, DocUnmarshalNoHistory) require.NoError(t, err) strCAS := string(base.Uint64CASToLittleEndianHex(123456)) @@ -272,7 +274,7 @@ func TestParseVersionVectorSyncData(t *testing.T) { assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) - doc, err = unmarshalDocumentWithXattr(ctx, "doc1", nil, doc_meta, nil, 1, DocUnmarshalAll) + doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, 1, DocUnmarshalAll) require.NoError(t, err) // assert on doc version vector values @@ -282,7 +284,7 @@ func TestParseVersionVectorSyncData(t *testing.T) { assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) - doc, err = unmarshalDocumentWithXattr(ctx, "doc1", nil, doc_meta, nil, 1, DocUnmarshalNoHistory) + doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, 1, DocUnmarshalNoHistory) require.NoError(t, err) // assert on doc version vector values @@ -347,32 +349,22 @@ func TestRevAndVersion(t *testing.T) { require.NoError(t, err) log.Printf("marshalled:%s", marshalledSyncData) - var newSyncData SyncData - err = base.JSONUnmarshal(marshalledSyncData, &newSyncData) - require.NoError(t, err) - require.Equal(t, test.revTreeID, newSyncData.CurrentRev) - require.Equal(t, expectedSequence, newSyncData.Sequence) - if test.source != "" { - require.NotNil(t, newSyncData.HLV) - require.Equal(t, test.source, newSyncData.HLV.SourceID) - require.Equal(t, test.version, newSyncData.HLV.Version) - } - // Document test document := NewDocument("docID") document.SyncData.CurrentRev = test.revTreeID + document.SyncData.Sequence = expectedSequence document.SyncData.HLV = &HybridLogicalVector{ SourceID: test.source, Version: test.version, } - marshalledDoc, marshalledXattr, err := document.MarshalWithXattr() + marshalledDoc, marshalledXattr, marshalledVvXattr, err := document.MarshalWithXattrs() require.NoError(t, err) newDocument := NewDocument("docID") - err = newDocument.UnmarshalWithXattr(ctx, marshalledDoc, marshalledXattr, DocUnmarshalAll) + err = newDocument.UnmarshalWithXattrs(ctx, marshalledDoc, marshalledXattr, marshalledVvXattr, DocUnmarshalAll) require.NoError(t, err) require.Equal(t, test.revTreeID, newDocument.CurrentRev) - require.Equal(t, expectedSequence, newSyncData.Sequence) + require.Equal(t, expectedSequence, newDocument.Sequence) if test.source != "" { require.NotNil(t, newDocument.HLV) require.Equal(t, test.source, newDocument.HLV.SourceID) @@ -493,17 +485,15 @@ func TestDCPDecodeValue(t *testing.T) { require.Nil(t, xattrs) } // UnmarshalDocumentSyncData wraps DecodeValueWithXattrs - result, rawBody, rawXattr, rawUserXattr, err := UnmarshalDocumentSyncDataFromFeed(test.body, base.MemcachedDataTypeXattr, "", false) + result, rawBody, rawXattrs, err := UnmarshalDocumentSyncDataFromFeed(test.body, base.MemcachedDataTypeXattr, "", false) require.ErrorIs(t, err, test.expectedErr) if test.expectedSyncXattr != nil { require.NotNil(t, result) + require.Equal(t, test.expectedSyncXattr, rawXattrs[base.SyncXattrName]) } else { require.Nil(t, result) } require.Equal(t, test.expectedBody, rawBody) - require.Equal(t, test.expectedSyncXattr, rawXattr) - require.Nil(t, rawUserXattr) - }) } } @@ -514,19 +504,17 @@ func TestInvalidXattrStreamEmptyBody(t *testing.T) { emptyBody := []byte{} // DecodeValueWithXattrs is the underlying function - body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{"_sync"}, inputStream) + body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.SyncXattrName}, inputStream) require.NoError(t, err) require.Equal(t, emptyBody, body) require.Empty(t, xattrs) // UnmarshalDocumentSyncData wraps DecodeValueWithXattrs - result, rawBody, rawXattr, rawUserXattr, err := UnmarshalDocumentSyncDataFromFeed(inputStream, base.MemcachedDataTypeXattr, "", false) + result, rawBody, rawXattrs, err := UnmarshalDocumentSyncDataFromFeed(inputStream, base.MemcachedDataTypeXattr, "", false) require.Error(t, err) // unexpected end of JSON input require.Nil(t, result) require.Equal(t, emptyBody, rawBody) - require.Nil(t, rawXattr) - require.Nil(t, rawUserXattr) - + require.Nil(t, rawXattrs[base.SyncXattrName]) } // getSingleXattrDCPBytes returns a DCP body with a single xattr pair and body diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 9c242b7e59..15f57288f2 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -377,14 +377,14 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector HybridLogicalVector func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansionSpec { var outputSpec []sgbucket.MacroExpansionSpec if hlv.Version == hlvExpandMacroCASValue { - spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(base.SyncXattrName), sgbucket.MacroCas) + spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(base.VvXattrName), sgbucket.MacroCas) outputSpec = append(outputSpec, spec) // If version is being expanded, we need to also specify the macro expansion for the expanded rev property currentRevSpec := sgbucket.NewMacroExpansionSpec(xattrCurrentRevVersionPath(base.SyncXattrName), sgbucket.MacroCas) outputSpec = append(outputSpec, currentRevSpec) } if hlv.CurrentVersionCAS == hlvExpandMacroCASValue { - spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.SyncXattrName), sgbucket.MacroCas) + spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.VvXattrName), sgbucket.MacroCas) outputSpec = append(outputSpec, spec) } return outputSpec diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index c006f6989d..63304bd171 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -279,10 +279,10 @@ func TestHLVImport(t *testing.T) { standardImportBody := []byte(`{"prop":"value"}`) cas, err := collection.dataStore.WriteCas(standardImportKey, 0, 0, standardImportBody, sgbucket.Raw) require.NoError(t, err, "write error") - _, err = collection.ImportDocRaw(ctx, standardImportKey, standardImportBody, nil, nil, false, cas, nil, ImportFromFeed) + _, err = collection.ImportDocRaw(ctx, standardImportKey, standardImportBody, nil, false, cas, nil, ImportFromFeed) require.NoError(t, err, "import error") - importedDoc, _, err := collection.GetDocWithXattr(ctx, standardImportKey, DocUnmarshalAll) + importedDoc, _, err := collection.GetDocWithXattrs(ctx, standardImportKey, DocUnmarshalAll) require.NoError(t, err) importedHLV := importedDoc.HLV encodedCAS := string(base.Uint64CASToLittleEndianHex(cas)) @@ -293,19 +293,18 @@ func TestHLVImport(t *testing.T) { // 2. Test import of write by HLV-aware peer (HLV is already updated, sync metadata is not). otherSource := "otherSource" - hlvHelper := NewHLVAgent(t, collection.dataStore, otherSource, "_sync") + hlvHelper := NewHLVAgent(t, collection.dataStore, otherSource, "_vv") existingHLVKey := "existingHLV_" + t.Name() _ = hlvHelper.InsertWithHLV(ctx, existingHLVKey) - existingBody, existingXattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, existingHLVKey, []string{base.SyncXattrName}) + existingBody, existingXattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, existingHLVKey, []string{base.SyncXattrName, base.VvXattrName}) require.NoError(t, err) - existingXattr := existingXattrs[base.SyncXattrName] encodedCAS = EncodeValue(cas) - _, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattr, nil, false, cas, nil, ImportFromFeed) + _, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattrs, false, cas, nil, ImportFromFeed) require.NoError(t, err, "import error") - importedDoc, _, err = collection.GetDocWithXattr(ctx, existingHLVKey, DocUnmarshalAll) + importedDoc, _, err = collection.GetDocWithXattrs(ctx, existingHLVKey, DocUnmarshalAll) require.NoError(t, err) importedHLV = importedDoc.HLV // cas in the HLV's current version and cvCAS should not have changed, and should match importCAS diff --git a/db/import.go b/db/import.go index db7556b37b..df615cfb51 100644 --- a/db/import.go +++ b/db/import.go @@ -32,7 +32,7 @@ const ( ) // Imports a document that was written by someone other than sync gateway, given the existing state of the doc in raw bytes -func (db *DatabaseCollectionWithUser) ImportDocRaw(ctx context.Context, docid string, value []byte, xattrValue []byte, userXattrValue []byte, isDelete bool, cas uint64, expiry *uint32, mode ImportMode) (docOut *Document, err error) { +func (db *DatabaseCollectionWithUser) ImportDocRaw(ctx context.Context, docid string, value []byte, xattrs map[string][]byte, isDelete bool, cas uint64, expiry *uint32, mode ImportMode) (docOut *Document, err error) { var body Body if isDelete { @@ -53,14 +53,9 @@ func (db *DatabaseCollectionWithUser) ImportDocRaw(ctx context.Context, docid st } existingBucketDoc := &sgbucket.BucketDocument{ - Body: value, - Xattrs: map[string][]byte{ - base.SyncXattrName: xattrValue, - }, - Cas: cas, - } - if db.userXattrKey() != "" { - existingBucketDoc.Xattrs[db.userXattrKey()] = userXattrValue + Body: value, + Xattrs: xattrs, + Cas: cas, } return db.importDoc(ctx, docid, body, expiry, isDelete, existingBucketDoc, mode) @@ -92,7 +87,7 @@ func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid strin if existingDoc.Deleted { existingBucketDoc.Xattrs[base.SyncXattrName], err = base.JSONMarshal(existingDoc.SyncData) } else { - existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], err = existingDoc.MarshalWithXattr() + existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], existingBucketDoc.Xattrs[base.VvXattrName], err = existingDoc.MarshalWithXattrs() } } @@ -389,15 +384,18 @@ func (db *DatabaseCollectionWithUser) migrateMetadata(ctx context.Context, docid } // Persist the document in xattr format - value, xattrValue, marshalErr := doc.MarshalWithXattr() + value, syncXattr, vvXattr, marshalErr := doc.MarshalWithXattrs() if marshalErr != nil { return nil, false, marshalErr } - // Use WriteWithXattr to handle both normal migration and tombstone migration (xattr creation, body delete) xattrs := map[string][]byte{ - base.SyncXattrName: xattrValue, + base.SyncXattrName: syncXattr, } + if vvXattr != nil { + xattrs[base.VvXattrName] = vvXattr + } + var casOut uint64 var writeErr error if doc.hasFlag(channels.Deleted) { diff --git a/db/import_listener.go b/db/import_listener.go index 403710ac3c..d6d00baa8e 100644 --- a/db/import_listener.go +++ b/db/import_listener.go @@ -168,7 +168,7 @@ func (il *importListener) ProcessFeedEvent(event sgbucket.FeedEvent) (shouldPers } func (il *importListener) ImportFeedEvent(ctx context.Context, collection *DatabaseCollectionWithUser, event sgbucket.FeedEvent) { - syncData, rawBody, rawXattr, rawUserXattr, err := UnmarshalDocumentSyncDataFromFeed(event.Value, event.DataType, collection.userXattrKey(), false) + syncData, rawBody, rawXattrs, err := UnmarshalDocumentSyncDataFromFeed(event.Value, event.DataType, collection.userXattrKey(), false) if err != nil { if errors.Is(err, sgbucket.ErrEmptyMetadata) { base.WarnfCtx(ctx, "Unexpected empty metadata when processing feed event. docid: %s opcode: %v datatype:%v", base.UD(event.Key), event.Opcode, event.DataType) @@ -182,7 +182,7 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab var isSGWrite bool var crc32Match bool if syncData != nil { - isSGWrite, crc32Match, _ = syncData.IsSGWrite(event.Cas, rawBody, rawUserXattr) + isSGWrite, crc32Match, _ = syncData.IsSGWrite(event.Cas, rawBody, rawXattrs[collection.userXattrKey()]) if crc32Match { il.dbStats.Crc32MatchCount.Add(1) } @@ -204,7 +204,7 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab default: } - _, err := collection.ImportDocRaw(ctx, docID, rawBody, rawXattr, rawUserXattr, isDelete, event.Cas, &event.Expiry, ImportFromFeed) + _, err := collection.ImportDocRaw(ctx, docID, rawBody, rawXattrs, isDelete, event.Cas, &event.Expiry, ImportFromFeed) if err != nil { if err == base.ErrImportCasFailure { base.DebugfCtx(ctx, base.KeyImport, "Not importing mutation - document %s has been subsequently updated and will be imported based on that mutation.", base.UD(docID)) diff --git a/db/import_test.go b/db/import_test.go index b02c804c2c..fd7aa7709d 100644 --- a/db/import_test.go +++ b/db/import_test.go @@ -60,7 +60,7 @@ func TestMigrateMetadata(t *testing.T) { assert.NoError(t, err, "Error writing doc w/ expiry") // Get the existing bucket doc - _, existingBucketDoc, err := collection.GetDocWithXattr(ctx, key, DocUnmarshalAll) + _, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll) require.NoError(t, err) // Set the expiry value to a stale value (it's about to be stale, since below it will get updated to a later value) existingBucketDoc.Expiry = uint32(syncMetaExpiry.Unix()) @@ -94,6 +94,70 @@ func TestMigrateMetadata(t *testing.T) { } +// Tests metadata migration where a document with inline sync data has been replicated by XDCR, so also has an +// existing HLV. Migration should preserve the existing HLV while moving doc._sync to sync xattr +func TestMigrateMetadataWithHLV(t *testing.T) { + + if !base.TestUseXattrs() { + t.Skip("This test only works with XATTRS enabled") + } + + base.SetUpTestLogging(t, base.LevelInfo, base.KeyMigrate, base.KeyImport) + + db, ctx := setupTestDB(t) + defer db.Close(ctx) + + collection := GetSingleDatabaseCollectionWithUser(t, db) + + key := "TestMigrateMetadata" + bodyBytes := rawDocWithSyncMeta() + body := Body{} + err := body.Unmarshal(bodyBytes) + assert.NoError(t, err, "Error unmarshalling body") + + hlv := &HybridLogicalVector{} + require.NoError(t, hlv.AddVersion(CreateVersion("source123", base.CasToString(100)))) + hlv.CurrentVersionCAS = base.CasToString(100) + hlvBytes := base.MustJSONMarshal(t, hlv) + xattrBytes := map[string][]byte{ + base.VvXattrName: hlvBytes, + } + + // Create via the SDK with inline sync metadata and an existing _vv xattr + _, err = collection.dataStore.WriteWithXattrs(ctx, key, 0, 0, bodyBytes, xattrBytes, nil) + require.NoError(t, err) + + // Get the existing bucket doc + _, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll) + require.NoError(t, err) + + // Migrate metadata + _, _, err = collection.migrateMetadata( + ctx, + key, + body, + existingBucketDoc, + &sgbucket.MutateInOptions{PreserveExpiry: false}, + ) + require.NoError(t, err) + + // Fetch the existing doc, ensure _vv is preserved + var migratedHLV *HybridLogicalVector + _, migratedBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll) + require.NoError(t, err) + migratedHLVBytes, ok := migratedBucketDoc.Xattrs[base.VvXattrName] + require.True(t, ok) + require.NoError(t, base.JSONUnmarshal(migratedHLVBytes, &migratedHLV)) + require.Equal(t, hlv.Version, migratedHLV.Version) + require.Equal(t, hlv.SourceID, migratedHLV.SourceID) + require.Equal(t, hlv.CurrentVersionCAS, migratedHLV.CurrentVersionCAS) + + migratedSyncXattrBytes, ok := migratedBucketDoc.Xattrs[base.SyncXattrName] + require.True(t, ok) + require.NotZero(t, len(migratedSyncXattrBytes)) + +} + // This invokes db.importDoc() with two different scenarios: // // Scenario 1: normal import @@ -153,7 +217,7 @@ func TestImportWithStaleBucketDocCorrectExpiry(t *testing.T) { assert.NoError(t, err, "Error writing doc w/ expiry") // Get the existing bucket doc - _, existingBucketDoc, err := collection.GetDocWithXattr(ctx, key, DocUnmarshalAll) + _, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll) assert.NoError(t, err, fmt.Sprintf("Error retrieving doc w/ xattr: %v", err)) body = Body{} @@ -321,7 +385,7 @@ func TestImportWithCasFailureUpdate(t *testing.T) { assert.NoError(t, err) // Get the existing bucket doc - _, existingBucketDoc, err = collection.GetDocWithXattr(ctx, testcase.docname, DocUnmarshalAll) + _, existingBucketDoc, err = collection.GetDocWithXattrs(ctx, testcase.docname, DocUnmarshalAll) assert.NoError(t, err, fmt.Sprintf("Error retrieving doc w/ xattr: %v", err)) importD := `{"new":"Val"}` @@ -418,8 +482,10 @@ func TestImportNullDocRaw(t *testing.T) { // Feed import of null doc exp := uint32(0) - - importedDoc, err := collection.ImportDocRaw(ctx, "TestImportNullDoc", []byte("null"), []byte("{}"), nil, false, 1, &exp, ImportFromFeed) + xattrs := map[string][]byte{ + base.SyncXattrName: []byte("{}"), + } + importedDoc, err := collection.ImportDocRaw(ctx, "TestImportNullDoc", []byte("null"), xattrs, false, 1, &exp, ImportFromFeed) assert.Equal(t, base.ErrEmptyDocument, err) assert.True(t, importedDoc == nil, "Expected no imported doc") } diff --git a/db/util_testing.go b/db/util_testing.go index a5d2f16ec6..b524c0488f 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -260,7 +260,17 @@ func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *ba key := string(event.Key) if base.TestUseXattrs() { - purgeErr = dataStore.DeleteWithXattrs(ctx, key, []string{base.SyncXattrName}) + systemXattrNames, decodeErr := sgbucket.DecodeXattrNames(event.Value, true) + if decodeErr != nil { + purgeErrors = purgeErrors.Append(decodeErr) + tbp.Logf(ctx, "Error decoding DCP event xattrs for key %s. %v", key, decodeErr) + return false + } + if len(systemXattrNames) > 0 { + purgeErr = dataStore.DeleteWithXattrs(ctx, key, systemXattrNames) + } else { + purgeErr = dataStore.Delete(key) + } } else { purgeErr = dataStore.Delete(key) } diff --git a/db/utilities_hlv_testing.go b/db/utilities_hlv_testing.go index 854899897b..f1bd2e3081 100644 --- a/db/utilities_hlv_testing.go +++ b/db/utilities_hlv_testing.go @@ -41,24 +41,21 @@ func NewHLVAgent(t *testing.T, datastore base.DataStore, source string, xattrNam } // InsertWithHLV inserts a new document into the bucket with a populated HLV (matching a write from -// a different HLV-aware peer) +// a different, non-SGW HLV-aware peer) func (h *HLVAgent) InsertWithHLV(ctx context.Context, key string) (casOut uint64) { hlv := &HybridLogicalVector{} err := hlv.AddVersion(CreateVersion(h.Source, hlvExpandMacroCASValue)) require.NoError(h.t, err) hlv.CurrentVersionCAS = hlvExpandMacroCASValue - syncData := &SyncData{HLV: hlv} - syncDataBytes, err := base.JSONMarshal(syncData) - require.NoError(h.t, err) - + vvDataBytes := base.MustJSONMarshal(h.t, hlv) mutateInOpts := &sgbucket.MutateInOptions{ MacroExpansion: hlv.computeMacroExpansions(), } docBody := base.MustJSONMarshal(h.t, defaultHelperBody) xattrData := map[string][]byte{ - h.xattrName: syncDataBytes, + h.xattrName: vvDataBytes, } cas, err := h.datastore.WriteWithXattrs(ctx, key, 0, 0, docBody, xattrData, mutateInOpts) diff --git a/rest/blip_api_attachment_test.go b/rest/blip_api_attachment_test.go index 9449ae5d3a..2e564401e5 100644 --- a/rest/blip_api_attachment_test.go +++ b/rest/blip_api_attachment_test.go @@ -290,10 +290,10 @@ func TestBlipPushPullNewAttachmentCommonAncestor(t *testing.T) { } btcRunner := NewBlipTesterClientRunner(t) - const docID = "doc1" ctx := base.TestCtx(t) btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { + docID := t.Name() rt := NewRestTester(t, &rtConfig) defer rt.Close() @@ -372,10 +372,10 @@ func TestBlipPushPullNewAttachmentNoCommonAncestor(t *testing.T) { GuestEnabled: true, } - const docID = "doc1" btcRunner := NewBlipTesterClientRunner(t) ctx := base.TestCtx(t) + const docID = "doc1" btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, &rtConfig) defer rt.Close() @@ -555,6 +555,7 @@ func TestBlipAttachNameChange(t *testing.T) { rt := NewRestTester(t, rtConfig) defer rt.Close() + docID := "doc" opts := &BlipTesterClientOpts{SupportedBLIPProtocols: SupportedBLIPProtocols} client1 := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts) defer client1.Close() @@ -564,20 +565,20 @@ func TestBlipAttachNameChange(t *testing.T) { digest := db.Sha1DigestKey(attachmentA) // Push initial attachment data - version, err := btcRunner.PushRev(client1.id, "doc", EmptyDocVersion(), []byte(`{"key":"val","_attachments":{"attachment": {"data":"`+attachmentAData+`"}}}`)) + version, err := btcRunner.PushRev(client1.id, docID, EmptyDocVersion(), []byte(`{"key":"val","_attachments":{"attachment": {"data":"`+attachmentAData+`"}}}`)) require.NoError(t, err) // Confirm attachment is in the bucket - attachmentAKey := db.MakeAttachmentKey(2, "doc", digest) + attachmentAKey := db.MakeAttachmentKey(2, docID, digest) bucketAttachmentA, _, err := client1.rt.GetSingleDataStore().GetRaw(attachmentAKey) require.NoError(t, err) require.EqualValues(t, bucketAttachmentA, attachmentA) // Simulate changing only the attachment name over CBL // Use revpos 2 to simulate revpos bug in CBL 2.8 - 3.0.0 - version, err = btcRunner.PushRev(client1.id, "doc", version, []byte(`{"key":"val","_attachments":{"attach":{"revpos":2,"content_type":"","length":11,"stub":true,"digest":"`+digest+`"}}}`)) + version, err = btcRunner.PushRev(client1.id, docID, version, []byte(`{"key":"val","_attachments":{"attach":{"revpos":2,"content_type":"","length":11,"stub":true,"digest":"`+digest+`"}}}`)) require.NoError(t, err) - err = client1.rt.WaitForVersion("doc", version) + err = client1.rt.WaitForVersion(docID, version) require.NoError(t, err) // Check if attachment is still in bucket @@ -585,7 +586,7 @@ func TestBlipAttachNameChange(t *testing.T) { assert.NoError(t, err) assert.Equal(t, bucketAttachmentA, attachmentA) - resp := client1.rt.SendAdminRequest("GET", "/{{.keyspace}}/doc/attach", "") + resp := client1.rt.SendAdminRequest("GET", "/{{.keyspace}}/"+docID+"/attach", "") RequireStatus(t, resp, http.StatusOK) assert.Equal(t, attachmentA, resp.BodyBytes()) }) diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index d768a3284d..df009c186d 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -1739,15 +1739,17 @@ func TestPutRevV4(t *testing.T) { defer bt.Close() collection := bt.restTester.GetSingleTestDatabaseCollection() + docID := t.Name() + // 1. Send rev with history history := "1@def, 2@abc" - sent, _, resp, err := bt.SendRev("foo", db.EncodeTestVersion("3@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(history)}) + sent, _, resp, err := bt.SendRev(docID, db.EncodeTestVersion("3@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(history)}) assert.True(t, sent) require.NoError(t, err) assert.Equal(t, "", resp.Properties["Error-Code"]) // Validate against the bucket doc's HLV - doc, _, err := collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + doc, _, err := collection.GetDocWithXattrs(base.TestCtx(t), docID, db.DocUnmarshalNoHistory) require.NoError(t, err) pv, _ := db.ParseTestHistory(t, history) db.RequireCVEqual(t, doc.HLV, "3@efg") @@ -1755,13 +1757,13 @@ func TestPutRevV4(t *testing.T) { assert.True(t, reflect.DeepEqual(pv, doc.HLV.PreviousVersions)) // 2. Update the document with a non-conflicting revision, where only cv is updated - sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("4@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(history)}) + sent, _, resp, err = bt.SendRev(docID, db.EncodeTestVersion("4@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(history)}) assert.True(t, sent) require.NoError(t, err) assert.Equal(t, "", resp.Properties["Error-Code"]) // Validate against the bucket doc's HLV - doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + doc, _, err = collection.GetDocWithXattrs(base.TestCtx(t), docID, db.DocUnmarshalNoHistory) require.NoError(t, err) db.RequireCVEqual(t, doc.HLV, "4@efg") assert.Equal(t, db.EncodeValue(doc.Cas), doc.HLV.CurrentVersionCAS) @@ -1769,13 +1771,13 @@ func TestPutRevV4(t *testing.T) { // 3. Update the document again with a non-conflicting revision from a different source (previous cv moved to pv) updatedHistory := "1@def, 2@abc, 4@efg" - sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("1@jkl"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) + sent, _, resp, err = bt.SendRev(docID, db.EncodeTestVersion("1@jkl"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) assert.True(t, sent) require.NoError(t, err) assert.Equal(t, "", resp.Properties["Error-Code"]) // Validate against the bucket doc's HLV - doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + doc, _, err = collection.GetDocWithXattrs(base.TestCtx(t), docID, db.DocUnmarshalNoHistory) require.NoError(t, err) pv, _ = db.ParseTestHistory(t, updatedHistory) db.RequireCVEqual(t, doc.HLV, "1@jkl") @@ -1784,13 +1786,13 @@ func TestPutRevV4(t *testing.T) { // 4. Update the document again with a non-conflicting revision from a different source, and additional sources in history (previous cv moved to pv, and pv expanded) updatedHistory = "1@def, 2@abc, 4@efg, 1@jkl, 1@mmm" - sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("1@nnn"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) + sent, _, resp, err = bt.SendRev(docID, db.EncodeTestVersion("1@nnn"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) assert.True(t, sent) require.NoError(t, err) assert.Equal(t, "", resp.Properties["Error-Code"]) // Validate against the bucket doc's HLV - doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "foo", db.DocUnmarshalNoHistory) + doc, _, err = collection.GetDocWithXattrs(base.TestCtx(t), docID, db.DocUnmarshalNoHistory) require.NoError(t, err) pv, _ = db.ParseTestHistory(t, updatedHistory) db.RequireCVEqual(t, doc.HLV, "1@nnn") @@ -1798,20 +1800,21 @@ func TestPutRevV4(t *testing.T) { assert.True(t, reflect.DeepEqual(pv, doc.HLV.PreviousVersions)) // 5. Attempt to update the document again with a conflicting revision from a different source (previous cv not in pv), expect conflict - sent, _, resp, err = bt.SendRev("foo", db.EncodeTestVersion("1@pqr"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) + sent, _, resp, err = bt.SendRev(docID, db.EncodeTestVersion("1@pqr"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) assert.True(t, sent) require.Error(t, err) assert.Equal(t, "409", resp.Properties["Error-Code"]) // 6. Test sending rev with merge versions included in history (note new key) + newDocID := t.Name() + "_2" mvHistory := "3@def, 3@abc; 1@def, 2@abc" - sent, _, resp, err = bt.SendRev("boo", db.EncodeTestVersion("3@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(mvHistory)}) + sent, _, resp, err = bt.SendRev(newDocID, db.EncodeTestVersion("3@efg"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(mvHistory)}) assert.True(t, sent) require.NoError(t, err) assert.Equal(t, "", resp.Properties["Error-Code"]) // assert on bucket doc - doc, _, err = collection.GetDocWithXattr(base.TestCtx(t), "boo", db.DocUnmarshalNoHistory) + doc, _, err = collection.GetDocWithXattrs(base.TestCtx(t), newDocID, db.DocUnmarshalNoHistory) require.NoError(t, err) pv, mv := db.ParseTestHistory(t, mvHistory) @@ -2043,13 +2046,13 @@ func TestPullReplicationUpdateOnOtherHLVAwarePeer(t *testing.T) { const docID = "doc1" otherSource := "otherSource" - hlvHelper := db.NewHLVAgent(t, rt.GetSingleDataStore(), otherSource, "_sync") + hlvHelper := db.NewHLVAgent(t, rt.GetSingleDataStore(), otherSource, "_vv") existingHLVKey := "doc1" cas := hlvHelper.InsertWithHLV(ctx, existingHLVKey) // force import of this write _, _ = rt.GetDoc(docID) - bucketDoc, _, err := collection.GetDocWithXattr(ctx, docID, db.DocUnmarshalAll) + bucketDoc, _, err := collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalAll) require.NoError(t, err) // create doc version of the above doc write diff --git a/rest/changes_test.go b/rest/changes_test.go index f1806ee6e4..157d1f15ca 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -298,7 +298,7 @@ func TestCVPopulationOnChangesViaAPI(t *testing.T) { changes, err := rt.WaitForChanges(1, "/{{.keyspace}}/_changes", "", true) require.NoError(t, err) - fetchedDoc, _, err := collection.GetDocWithXattr(ctx, DocID, db.DocUnmarshalCAS) + fetchedDoc, _, err := collection.GetDocWithXattrs(ctx, DocID, db.DocUnmarshalCAS) require.NoError(t, err) assert.Equal(t, "doc1", changes.Results[0].ID) @@ -330,7 +330,7 @@ func TestCVPopulationOnDocIDChanges(t *testing.T) { changes, err := rt.WaitForChanges(1, fmt.Sprintf(`/{{.keyspace}}/_changes?filter=_doc_ids&doc_ids=%s`, DocID), "", true) require.NoError(t, err) - fetchedDoc, _, err := collection.GetDocWithXattr(ctx, DocID, db.DocUnmarshalCAS) + fetchedDoc, _, err := collection.GetDocWithXattrs(ctx, DocID, db.DocUnmarshalCAS) require.NoError(t, err) assert.Equal(t, "doc1", changes.Results[0].ID) diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index d91dd1325f..b39a95c455 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -924,7 +924,7 @@ func TestChangesFromCompoundSinceViaDocGrant(t *testing.T) { // Write another doc _ = rt.PutDoc("mix-1", `{"channel":["ABC", "PBS", "HBO"]}`) - fetchedDoc, _, err := collection.GetDocWithXattr(ctx, "mix-1", db.DocUnmarshalSync) + fetchedDoc, _, err := collection.GetDocWithXattrs(ctx, "mix-1", db.DocUnmarshalSync) require.NoError(t, err) mixSource, mixVersion := fetchedDoc.HLV.GetCurrentVersion() diff --git a/rest/importuserxattrtest/import_test.go b/rest/importuserxattrtest/import_test.go index 86bdc5af70..f8f4547ff5 100644 --- a/rest/importuserxattrtest/import_test.go +++ b/rest/importuserxattrtest/import_test.go @@ -410,11 +410,11 @@ func TestUnmarshalDocFromImportFeed(t *testing.T) { } value := sgbucket.EncodeValueWithXattrs(body, xattrs...) - syncData, rawBody, rawXattr, rawUserXattr, err := db.UnmarshalDocumentSyncDataFromFeed(value, 5, userXattrKey, false) + syncData, rawBody, rawXattrs, err := db.UnmarshalDocumentSyncDataFromFeed(value, 5, userXattrKey, false) require.NoError(t, err) - assert.Equal(t, syncXattr, string(rawXattr)) + assert.Equal(t, syncXattr, string(rawXattrs[base.SyncXattrName])) assert.Equal(t, uint64(200), syncData.Sequence) - assert.Equal(t, channelName, string(rawUserXattr)) + assert.Equal(t, channelName, string(rawXattrs[userXattrKey])) assert.Equal(t, body, rawBody) // construct data into dcp format with just user xattr defined @@ -423,21 +423,21 @@ func TestUnmarshalDocFromImportFeed(t *testing.T) { } value = sgbucket.EncodeValueWithXattrs(body, xattrs...) - syncData, rawBody, rawXattr, rawUserXattr, err = db.UnmarshalDocumentSyncDataFromFeed(value, 5, userXattrKey, false) + syncData, rawBody, rawXattrs, err = db.UnmarshalDocumentSyncDataFromFeed(value, 5, userXattrKey, false) require.NoError(t, err) assert.Nil(t, syncData) - assert.Nil(t, rawXattr) - assert.Equal(t, channelName, string(rawUserXattr)) + assert.Nil(t, rawXattrs[base.SyncXattrName]) + assert.Equal(t, channelName, string(rawXattrs[userXattrKey])) assert.Equal(t, body, rawBody) // construct data into dcp format with no xattr defined xattrs = []sgbucket.Xattr{} value = sgbucket.EncodeValueWithXattrs(body, xattrs...) - syncData, rawBody, rawXattr, rawUserXattr, err = db.UnmarshalDocumentSyncDataFromFeed(value, 5, userXattrKey, false) + syncData, rawBody, rawXattrs, err = db.UnmarshalDocumentSyncDataFromFeed(value, 5, userXattrKey, false) require.NoError(t, err) assert.Nil(t, syncData) - assert.Nil(t, rawXattr) - assert.Nil(t, rawUserXattr) + assert.Nil(t, rawXattrs[base.SyncXattrName]) + assert.Nil(t, rawXattrs[userXattrKey]) assert.Equal(t, body, rawBody) }