diff --git a/base/constants.go b/base/constants.go index 5639c9a0a4..49530a20ac 100644 --- a/base/constants.go +++ b/base/constants.go @@ -135,8 +135,9 @@ const ( // SyncPropertyName is used when storing sync data inline in a document. SyncPropertyName = "_sync" // SyncXattrName is used when storing sync data in a document's xattrs. - SyncXattrName = "_sync" - VvXattrName = "_vv" + SyncXattrName = "_sync" + VvXattrName = "_vv" + GlobalXattrName = "_globalSync" // MouXattrName is used when storing metadata-only update information in a document's xattrs. MouXattrName = "_mou" diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 41b63a7186..e2ad7fb667 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -1457,7 +1457,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) { } var doc1DCPBytes []byte if base.TestUseXattrs() { - body, syncXattr, _, _, err := doc1.MarshalWithXattrs() + body, syncXattr, _, _, _, err := doc1.MarshalWithXattrs() require.NoError(t, err) doc1DCPBytes = sgbucket.EncodeValueWithXattrs(body, sgbucket.Xattr{Name: base.SyncXattrName, Value: syncXattr}) } else { @@ -1482,7 +1482,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) { var dataType sgbucket.FeedDataType = base.MemcachedDataTypeJSON if base.TestUseXattrs() { dataType |= base.MemcachedDataTypeXattr - body, syncXattr, _, _, err := doc2.MarshalWithXattrs() + body, syncXattr, _, _, _, err := doc2.MarshalWithXattrs() require.NoError(t, err) doc2DCPBytes = sgbucket.EncodeValueWithXattrs(body, sgbucket.Xattr{Name: base.SyncXattrName, Value: syncXattr}) } else { diff --git a/db/crud.go b/db/crud.go index a4bc1a2dae..3dc283111d 100644 --- a/db/crud.go +++ b/db/crud.go @@ -117,7 +117,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin func (c *DatabaseCollection) GetDocWithXattrs(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) { rawBucketDoc = &sgbucket.BucketDocument{} var getErr error - rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncAndUserXattrKeys()) + rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys()) if getErr != nil { return nil, nil, getErr } @@ -143,7 +143,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) ( if c.UseXattrs() { // Retrieve doc and xattr from bucket, unmarshal only xattr. // Triggers on-demand import when document xattr doesn't match cas. - rawDoc, xattrs, cas, getErr := c.dataStore.GetWithXattrs(ctx, key, c.syncAndUserXattrKeys()) + rawDoc, xattrs, cas, getErr := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys()) if getErr != nil { return emptySyncData, getErr } @@ -192,7 +192,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) ( // unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { - return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], xattrs[base.VirtualXattrRevSeqNo], cas, unmarshalLevel) + return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], xattrs[base.VirtualXattrRevSeqNo], xattrs[base.GlobalXattrName], cas, unmarshalLevel) } @@ -2239,7 +2239,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do if expiry != nil { initialExpiry = *expiry } - casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncMouRevSeqNoAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) { + casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) { // Be careful: this block can be invoked multiple times if there are races! if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil { return @@ -2295,8 +2295,8 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do // Return the new raw document value for the bucket to store. doc.SetCrc32cUserXattrHash() - var rawSyncXattr, rawMouXattr, rawVvXattr, rawDocBody []byte - rawDocBody, rawSyncXattr, rawVvXattr, rawMouXattr, err = doc.MarshalWithXattrs() + var rawSyncXattr, rawMouXattr, rawVvXattr, rawGlobalSync, rawDocBody []byte + rawDocBody, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalSync, err = doc.MarshalWithXattrs() if err != nil { return updatedDoc, err } @@ -2310,6 +2310,13 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do if rawMouXattr != nil && db.useMou() { updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr } + if rawGlobalSync != nil { + updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalSync + } else { + if currentXattrs[base.GlobalXattrName] != nil && !isNewDocCreation { + updatedDoc.XattrsToDelete = append(updatedDoc.XattrsToDelete, base.GlobalXattrName) + } + } // Warn when sync data is larger than a configured threshold if db.unsupportedOptions() != nil && db.unsupportedOptions().WarningThresholds != nil { diff --git a/db/database.go b/db/database.go index c48b5ca6c3..c2541b19c4 100644 --- a/db/database.go +++ b/db/database.go @@ -1877,7 +1877,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, doc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.metadataOnlyUpdate) } - _, rawSyncXattr, rawVvXattr, rawMouXattr, err := updatedDoc.MarshalWithXattrs() + _, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs() updatedDoc := sgbucket.UpdatedDoc{ Doc: nil, // Resync does not require document body update Xattrs: map[string][]byte{ @@ -1892,13 +1892,15 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(xattrMouCasPath(), sgbucket.MacroCas)) } } - + if rawGlobalXattr != nil { + updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr + } return updatedDoc, err } opts := &sgbucket.MutateInOptions{ MacroExpansion: macroExpandSpec(base.SyncXattrName), } - _, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc) + _, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc) } else { _, err = db.dataStore.Update(key, 0, func(currentValue []byte) ([]byte, *uint32, bool, error) { // Be careful: this block can be invoked multiple times if there are races! diff --git a/db/database_collection.go b/db/database_collection.go index ab8862f3a4..8cc316c95f 100644 --- a/db/database_collection.go +++ b/db/database_collection.go @@ -237,9 +237,9 @@ func (c *DatabaseCollection) unsupportedOptions() *UnsupportedOptions { return c.dbCtx.Options.UnsupportedOptions } -// syncAndUserXattrKeys returns the xattr keys for the user and sync xattrs. -func (c *DatabaseCollection) syncAndUserXattrKeys() []string { - xattrKeys := []string{base.SyncXattrName, base.VvXattrName} +// syncGlobalSyncAndUserXattrKeys returns the xattr keys for the user and sync xattrs. +func (c *DatabaseCollection) syncGlobalSyncAndUserXattrKeys() []string { + xattrKeys := []string{base.SyncXattrName, base.VvXattrName, base.GlobalXattrName} userXattrKey := c.userXattrKey() if userXattrKey != "" { xattrKeys = append(xattrKeys, userXattrKey) @@ -247,11 +247,11 @@ func (c *DatabaseCollection) syncAndUserXattrKeys() []string { return xattrKeys } -// syncMouRevSeqNoAndUserXattrKeys returns the xattr keys for the user, mou, revSeqNo and sync xattrs. -func (c *DatabaseCollection) syncMouRevSeqNoAndUserXattrKeys() []string { +// syncGlobalSyncMouRevSeqNoAndUserXattrKeys returns the xattr keys for the user, mou, revSeqNo and sync xattrs. +func (c *DatabaseCollection) syncGlobalSyncMouRevSeqNoAndUserXattrKeys() []string { xattrKeys := []string{base.SyncXattrName, base.VvXattrName} if c.useMou() { - xattrKeys = append(xattrKeys, base.MouXattrName, base.VirtualXattrRevSeqNo) + xattrKeys = append(xattrKeys, base.MouXattrName, base.VirtualXattrRevSeqNo, base.GlobalXattrName) } userXattrKey := c.userXattrKey() if userXattrKey != "" { diff --git a/db/document.go b/db/document.go index 59f0da6904..73df2323b0 100644 --- a/db/document.go +++ b/db/document.go @@ -188,6 +188,7 @@ func (sd *SyncData) HashRedact(salt string) SyncData { // Document doesn't do any locking - document instances aren't intended to be shared across multiple goroutines. type Document struct { SyncData // Sync metadata + GlobalSyncData // Global sync metadata, this will hold non cluster specific sync metadata to be copied by XDCR _body Body // Marshalled document body. Unmarshalled lazily - should be accessed using Body() _rawBody []byte // Raw document body, as retrieved from the bucket. Marshaled lazily - should be accessed using BodyBytes() ID string `json:"-"` // Doc id. (We're already using a custom MarshalJSON for *document that's based on body, so the json:"-" probably isn't needed here) @@ -203,6 +204,10 @@ type Document struct { RevSeqNo uint64 // Server rev seq no for a document } +type GlobalSyncData struct { + GlobalAttachments AttachmentsMeta `json:"attachments_meta,omitempty"` +} + type historyOnlySyncData struct { revOnlySyncData History RevTree `json:"history"` @@ -410,14 +415,14 @@ func unmarshalDocument(docid string, data []byte) (*Document, error) { return doc, nil } -func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, documentXattr []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { +func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, virtualXattr []byte, globalSyncData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { if len(syncXattrData) == 0 && len(hlvXattrData) == 0 { // If no xattr data, unmarshal as standard doc doc, err = unmarshalDocument(docid, data) } else { doc = NewDocument(docid) - err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, documentXattr, unmarshalLevel) + err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, virtualXattr, globalSyncData, unmarshalLevel) } if err != nil { return nil, err @@ -466,7 +471,7 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey var xattrValues map[string][]byte var hlv *HybridLogicalVector if dataType&base.MemcachedDataTypeXattr != 0 { - xattrKeys := []string{base.SyncXattrName, base.MouXattrName, base.VvXattrName} + xattrKeys := []string{base.SyncXattrName, base.MouXattrName, base.VvXattrName, base.GlobalXattrName} if userXattrKey != "" { xattrKeys = append(xattrKeys, userXattrKey) } @@ -532,7 +537,7 @@ func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, da if err != nil { return nil, err } - return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], xattrs[base.VirtualXattrRevSeqNo], cas, DocUnmarshalAll) + return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], xattrs[base.VirtualXattrRevSeqNo], nil, cas, DocUnmarshalAll) } func (doc *SyncData) HasValidSyncData() bool { @@ -1095,7 +1100,7 @@ func (doc *Document) MarshalJSON() (data []byte, err error) { // unmarshalLevel is anything less than the full document + metadata, the raw data is retained for subsequent // lazy unmarshalling as needed. // Must handle cases where document body and hlvXattrData are present without syncXattrData for all DocumentUnmarshalLevel -func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, documentXattr []byte, unmarshalLevel DocumentUnmarshalLevel) error { +func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, virtualXattr []byte, globalSyncData []byte, unmarshalLevel DocumentUnmarshalLevel) error { if doc.ID == "" { base.WarnfCtx(ctx, "Attempted to unmarshal document without ID set") return errors.New("Document was unmarshalled without ID set") @@ -1117,9 +1122,9 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err)) } } - if documentXattr != nil { + if virtualXattr != nil { var revSeqNo string - err := base.JSONUnmarshal(documentXattr, &revSeqNo) + err := base.JSONUnmarshal(virtualXattr, &revSeqNo) if err != nil { return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal doc virtual revSeqNo xattr during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err)) } @@ -1131,6 +1136,12 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat doc.RevSeqNo = revNo } } + if len(globalSyncData) > 0 { + if err := base.JSONUnmarshal(globalSyncData, &doc.GlobalSyncData); err != nil { + base.WarnfCtx(ctx, "Failed to unmarshal globalSync xattr for key %v, globalSync will be ignored. Err: %v globalSync:%s", base.UD(doc.ID), err, globalSyncData) + } + doc.SyncData.Attachments = doc.GlobalSyncData.GlobalAttachments + } doc._rawBody = data // Unmarshal body if requested and present if unmarshalLevel == DocUnmarshalAll && len(data) > 0 { @@ -1151,6 +1162,12 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalNoHistory). Error: %v", base.UD(doc.ID), err)) } } + if len(globalSyncData) > 0 { + if err := base.JSONUnmarshal(globalSyncData, &doc.GlobalSyncData); err != nil { + base.WarnfCtx(ctx, "Failed to unmarshal globalSync xattr for key %v, globalSync will be ignored. Err: %v globalSync:%s", base.UD(doc.ID), err, globalSyncData) + } + doc.SyncData.Attachments = doc.GlobalSyncData.GlobalAttachments + } doc._rawBody = data case DocUnmarshalHistory: if syncXattrData != nil { @@ -1211,7 +1228,7 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat } // MarshalWithXattrs marshals the Document into body, and sync, vv and mou xattrs for persistence. -func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr, vvXattr, mouXattr []byte, err error) { +func (doc *Document) MarshalWithXattrs() (data, syncXattr, vvXattr, mouXattr, globalXattr []byte, err error) { // Grab the rawBody if it's already marshalled, otherwise unmarshal the body if doc._rawBody != nil { if !doc.IsDeleted() { @@ -1228,7 +1245,7 @@ func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr, vvXattr, mouXa if !deleted { data, err = base.JSONMarshal(body) if err != nil { - return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc body with id: %s. Error: %v", base.UD(doc.ID), err)) + return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc body with id: %s. Error: %v", base.UD(doc.ID), err)) } } } @@ -1236,23 +1253,37 @@ func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr, vvXattr, mouXa if doc.SyncData.HLV != nil { vvXattr, err = base.JSONMarshal(&doc.SyncData.HLV) if err != nil { - return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc vv with id: %s. Error: %v", base.UD(doc.ID), err)) + return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc vv with id: %s. Error: %v", base.UD(doc.ID), err)) } } + // assign any attachments we have stored in document sync data to global sync data + // then nil the sync data attachments to prevent marshalling of it + doc.GlobalSyncData.GlobalAttachments = doc.Attachments + doc.Attachments = nil syncXattr, err = base.JSONMarshal(doc.SyncData) if err != nil { - return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err)) + return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err)) } if doc.metadataOnlyUpdate != nil { mouXattr, err = base.JSONMarshal(doc.metadataOnlyUpdate) if err != nil { - return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc MouData with id: %s. Error: %v", base.UD(doc.ID), err)) + return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc MouData with id: %s. Error: %v", base.UD(doc.ID), err)) + } + } + // marshal global xattrs if there are attachments defined + if len(doc.GlobalSyncData.GlobalAttachments) > 0 { + globalXattr, err = base.JSONMarshal(doc.GlobalSyncData) + if err != nil { + return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc GlobalXattr with id: %s. Error: %v", base.UD(doc.ID), err)) } + // restore attachment meta to sync data post global xattr construction + doc.Attachments = make(AttachmentsMeta) + doc.Attachments = doc.GlobalSyncData.GlobalAttachments } - return data, syncXattr, vvXattr, mouXattr, nil + return data, syncXattr, vvXattr, mouXattr, globalXattr, nil } // computeMetadataOnlyUpdate computes a new metadataOnlyUpdate based on the existing document's CAS and metadataOnlyUpdate diff --git a/db/document_test.go b/db/document_test.go index 254407f46f..61e41389a2 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -137,7 +137,7 @@ func BenchmarkDocUnmarshal(b *testing.B) { b.Run(bm.name, func(b *testing.B) { ctx := base.TestCtx(b) for i := 0; i < b.N; i++ { - _, _ = unmarshalDocumentWithXattrs(ctx, "doc_1k", doc1k_body, doc1k_meta, nil, nil, nil, nil, 1, bm.unmarshalLevel) + _, _ = unmarshalDocumentWithXattrs(ctx, "doc_1k", doc1k_body, doc1k_meta, nil, nil, nil, nil, nil, 1, bm.unmarshalLevel) } }) } @@ -263,7 +263,7 @@ func TestParseVersionVectorSyncData(t *testing.T) { sync_meta := []byte(doc_meta_no_vv) vv_meta := []byte(doc_meta_vv) - doc, err := unmarshalDocumentWithXattrs(ctx, "doc_1k", nil, sync_meta, vv_meta, nil, nil, nil, 1, DocUnmarshalNoHistory) + doc, err := unmarshalDocumentWithXattrs(ctx, "doc_1k", nil, sync_meta, vv_meta, nil, nil, nil, nil, 1, DocUnmarshalNoHistory) require.NoError(t, err) strCAS := string(base.Uint64CASToLittleEndianHex(123456)) @@ -274,7 +274,7 @@ func TestParseVersionVectorSyncData(t *testing.T) { assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) - doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, nil, 1, DocUnmarshalAll) + doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, nil, nil, 1, DocUnmarshalAll) require.NoError(t, err) // assert on doc version vector values @@ -284,7 +284,7 @@ func TestParseVersionVectorSyncData(t *testing.T) { assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) - doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, nil, 1, DocUnmarshalNoHistory) + doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, nil, nil, 1, DocUnmarshalNoHistory) require.NoError(t, err) // assert on doc version vector values @@ -358,11 +358,11 @@ func TestRevAndVersion(t *testing.T) { Version: test.version, } - marshalledDoc, marshalledXattr, marshalledVvXattr, _, err := document.MarshalWithXattrs() + marshalledDoc, marshalledXattr, marshalledVvXattr, _, _, err := document.MarshalWithXattrs() require.NoError(t, err) newDocument := NewDocument("docID") - err = newDocument.UnmarshalWithXattrs(ctx, marshalledDoc, marshalledXattr, marshalledVvXattr, nil, DocUnmarshalAll) + err = newDocument.UnmarshalWithXattrs(ctx, marshalledDoc, marshalledXattr, marshalledVvXattr, nil, nil, DocUnmarshalAll) require.NoError(t, err) require.Equal(t, test.revTreeID, newDocument.CurrentRev) require.Equal(t, expectedSequence, newDocument.Sequence) @@ -538,3 +538,128 @@ func getSingleXattrDCPBytes() []byte { dcpBody = append(dcpBody, body...) return dcpBody } + +const syncDataWithAttachment = `{ + "attachments": { + "bye.txt": { + "digest": "sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc=", + "length": 19, + "revpos": 1, + "stub": true, + "ver": 2 + }, + "hello.txt": { + "digest": "sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=", + "length": 11, + "revpos": 1, + "stub": true, + "ver": 2 + } + }, + "cas": "0x0000d2ba4104f217", + "channel_set": [ + { + "name": "sg_test_0", + "start": 1 + } + ], + "channel_set_history": null, + "channels": { + "sg_test_0": null + }, + "cluster_uuid": "6eca6cdd1ffcd7b2b7ea07039e68a774", + "history": { + "channels": [ + [ + "sg_test_0" + ] + ], + "parents": [ + -1 + ], + "revs": [ + "1-ca9ad22802b66f662ff171f226211d5c" + ] + }, + "recent_sequences": [ + 1 + ], + "rev": { + "rev": "1-ca9ad22802b66f662ff171f226211d5c", + "src": "RS1pdSMRlrNr0Ns0oOfc8A", + "ver": "0x0000d2ba4104f217" + }, + "sequence": 1, + "time_saved": "2024-09-04T11:38:05.093225+01:00", + "value_crc32c": "0x297bd0aa" + }` + +const globalXattr = `{ + "attachments_meta": { + "bye.txt": { + "digest": "sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc=", + "length": 19, + "revpos": 1, + "stub": true, + "ver": 2 + }, + "hello.txt": { + "digest": "sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=", + "length": 11, + "revpos": 1, + "stub": true, + "ver": 2 + } + } + }` + +// TestAttachmentReadStoredInXattr tests reads legacy format for attachments being stored in sync data xattr as well as +// testing the new location for attachments in global xattr +func TestAttachmentReadStoredInXattr(t *testing.T) { + ctx := base.TestCtx(t) + + // unmarshal attachments on sync data + testSync := []byte(syncDataWithAttachment) + doc, err := unmarshalDocumentWithXattrs(ctx, "doc1", nil, testSync, nil, nil, nil, nil, nil, 1, DocUnmarshalSync) + require.NoError(t, err) + + // assert on attachments + atts := doc.Attachments + assert.Len(t, atts, 2) + hello := atts["hello.txt"].(map[string]interface{}) + assert.Equal(t, "sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=", hello["digest"]) + assert.Equal(t, float64(11), hello["length"]) + assert.Equal(t, float64(1), hello["revpos"]) + assert.Equal(t, float64(2), hello["ver"]) + assert.True(t, hello["stub"].(bool)) + + bye := atts["bye.txt"].(map[string]interface{}) + assert.Equal(t, "sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc=", bye["digest"]) + assert.Equal(t, float64(19), bye["length"]) + assert.Equal(t, float64(1), bye["revpos"]) + assert.Equal(t, float64(2), bye["ver"]) + assert.True(t, bye["stub"].(bool)) + + // unmarshal attachments on global data + testGlobal := []byte(globalXattr) + sync_meta_no_attachments := []byte(doc_meta_no_vv) + doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta_no_attachments, nil, nil, nil, nil, testGlobal, 1, DocUnmarshalSync) + require.NoError(t, err) + + // assert on attachments + atts = doc.Attachments + assert.Len(t, atts, 2) + hello = atts["hello.txt"].(map[string]interface{}) + assert.Equal(t, "sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=", hello["digest"]) + assert.Equal(t, float64(11), hello["length"]) + assert.Equal(t, float64(1), hello["revpos"]) + assert.Equal(t, float64(2), hello["ver"]) + assert.True(t, hello["stub"].(bool)) + + bye = atts["bye.txt"].(map[string]interface{}) + assert.Equal(t, "sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc=", bye["digest"]) + assert.Equal(t, float64(19), bye["length"]) + assert.Equal(t, float64(1), bye["revpos"]) + assert.Equal(t, float64(2), bye["ver"]) + assert.True(t, bye["stub"].(bool)) +} diff --git a/db/import.go b/db/import.go index 452ded5ab5..c0c7bb2569 100644 --- a/db/import.go +++ b/db/import.go @@ -97,7 +97,7 @@ func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid strin existingBucketDoc.Xattrs[base.MouXattrName], err = base.JSONMarshal(existingDoc.metadataOnlyUpdate) } } else { - existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], existingBucketDoc.Xattrs[base.VvXattrName], existingBucketDoc.Xattrs[base.MouXattrName], err = existingDoc.MarshalWithXattrs() + existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], existingBucketDoc.Xattrs[base.VvXattrName], existingBucketDoc.Xattrs[base.MouXattrName], _, err = existingDoc.MarshalWithXattrs() } } @@ -404,7 +404,7 @@ func (db *DatabaseCollectionWithUser) migrateMetadata(ctx context.Context, docid } // Persist the document in xattr format - value, syncXattr, vvXattr, _, marshalErr := doc.MarshalWithXattrs() + value, syncXattr, vvXattr, _, globalXattr, marshalErr := doc.MarshalWithXattrs() if marshalErr != nil { return nil, false, marshalErr } @@ -415,6 +415,9 @@ func (db *DatabaseCollectionWithUser) migrateMetadata(ctx context.Context, docid if vvXattr != nil { xattrs[base.VvXattrName] = vvXattr } + if globalXattr != nil { + xattrs[base.GlobalXattrName] = globalXattr + } var casOut uint64 var writeErr error diff --git a/rest/attachment_test.go b/rest/attachment_test.go index 0feb56c428..d05095ca42 100644 --- a/rest/attachment_test.go +++ b/rest/attachment_test.go @@ -2680,6 +2680,22 @@ func CreateDocWithLegacyAttachment(t *testing.T, rt *RestTester, docID string, r require.Len(t, attachments, 1) } +// CreateDocWithLegacyAttachmentNoMigration create a doc with legacy attachment defined (v1) and will not attempt to migrate that attachment to v2 +func CreateDocWithLegacyAttachmentNoMigration(t *testing.T, rt *RestTester, docID string, rawDoc []byte, attKey string, attBody []byte) { + // Write attachment directly to the datastore. + dataStore := rt.GetSingleDataStore() + _, err := dataStore.Add(attKey, 0, attBody) + require.NoError(t, err) + + body := db.Body{} + err = body.Unmarshal(rawDoc) + require.NoError(t, err, "Error unmarshalling body") + + // Write raw document to the datastore. + _, err = dataStore.Add(docID, 0, rawDoc) + require.NoError(t, err) +} + func retrieveAttachmentMeta(t *testing.T, rt *RestTester, docID string) (attMeta map[string]interface{}) { body := rt.GetDocBody(docID) attachments, ok := body["_attachments"].(map[string]interface{}) @@ -2759,3 +2775,109 @@ func (rt *RestTester) storeAttachmentWithIfMatch(docID string, version DocVersio require.True(rt.TB(), body["ok"].(bool)) return DocVersionFromPutResponse(rt.TB(), response) } + +// TestLegacyAttachmentMigrationToGlobalXattrOnImport: +// - Create legacy attachment and perform a read to migrate the attachment to xattr +// - Assert that this migrated attachment is moved to global xattr not sync data xattr +// - Add new doc with legacy attachment but do not attempt to migrate after write +// - Trigger on demand import for write and assert that the attachment is moved ot global xattr +func TestLegacyAttachmentMigrationToGlobalXattrOnImport(t *testing.T) { + rt := NewRestTester(t, nil) + defer rt.Close() + collection, ctx := rt.GetSingleTestDatabaseCollectionWithUser() + + docID := "foo16" + attBody := []byte(`hi`) + digest := db.Sha1DigestKey(attBody) + attKey := db.MakeAttachmentKey(db.AttVersion1, docID, digest) + rawDoc := rawDocWithAttachmentAndSyncMeta() + + // Create a document with legacy attachment. + CreateDocWithLegacyAttachment(t, rt, docID, rawDoc, attKey, attBody) + + // get global xattr and assert the attachment is there + xattrs, _, err := collection.GetCollectionDatastore().GetXattrs(ctx, docID, []string{base.GlobalXattrName}) + require.NoError(t, err) + require.Contains(t, xattrs, base.GlobalXattrName) + var globalXattr db.GlobalSyncData + require.NoError(t, base.JSONUnmarshal(xattrs[base.GlobalXattrName], &globalXattr)) + hi := globalXattr.GlobalAttachments["hi.txt"].(map[string]interface{}) + + assert.Len(t, globalXattr.GlobalAttachments, 1) + assert.Equal(t, float64(2), hi["length"]) + + // Create a document with legacy attachment but do not attempt to migrate + docID = "baa16" + CreateDocWithLegacyAttachmentNoMigration(t, rt, docID, rawDoc, attKey, attBody) + + // Trigger on demand import for write + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/baa16", `{}`) + RequireStatus(t, resp, http.StatusConflict) + + // get xattrs of new doc we had the conflict update for, assert that the attachment metadata has been moved to global xattr + xattrs, _, err = collection.GetCollectionDatastore().GetXattrs(ctx, docID, []string{base.GlobalXattrName}) + require.NoError(t, err) + require.Contains(t, xattrs, base.GlobalXattrName) + globalXattr = db.GlobalSyncData{} + require.NoError(t, base.JSONUnmarshal(xattrs[base.GlobalXattrName], &globalXattr)) + newatt := globalXattr.GlobalAttachments["hi.txt"].(map[string]interface{}) + + assert.Len(t, globalXattr.GlobalAttachments, 1) + assert.Equal(t, float64(2), newatt["length"]) +} + +// TestAttachmentMigrationToGlobalXattrOnUpdate: +// - Create doc with attachment defined +// - Set doc in bucket to move attachment from global xattr to old location in sync data +// - Update this doc through sync gateway +// - Assert that the attachment metadata in moved from sync data to global xattr on update +func TestAttachmentMigrationToGlobalXattrOnUpdate(t *testing.T) { + rt := NewRestTester(t, nil) + defer rt.Close() + collection, ctx := rt.GetSingleTestDatabaseCollectionWithUser() + + docID := "baa" + + body := `{"test":"doc","_attachments":{"camera.txt":{"data":"Q2Fub24gRU9TIDVEIE1hcmsgSVY="}}}` + vrs := rt.PutDoc(docID, body) + + // get xattrs, remove the global xattr and move attachments back to sync data in the bucket + xattrs, cas, err := collection.GetCollectionDatastore().GetXattrs(ctx, docID, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + require.Contains(t, xattrs, base.GlobalXattrName) + require.Contains(t, xattrs, base.SyncXattrName) + + var bucketSyncData db.SyncData + require.NoError(t, base.JSONUnmarshal(xattrs[base.SyncXattrName], &bucketSyncData)) + var globalXattr db.GlobalSyncData + require.NoError(t, base.JSONUnmarshal(xattrs[base.GlobalXattrName], &globalXattr)) + + bucketSyncData.Attachments = globalXattr.GlobalAttachments + syncBytes := base.MustJSONMarshal(t, bucketSyncData) + xattrBytes := map[string][]byte{ + base.SyncXattrName: syncBytes, + } + // add new update sync data but also remove global xattr from doc + _, err = collection.GetCollectionDatastore().WriteWithXattrs(ctx, docID, 0, cas, []byte(`{"test":"doc"}`), xattrBytes, []string{base.GlobalXattrName}, nil) + require.NoError(t, err) + + // update doc + body = `{"some":"update","_attachments":{"camera.txt":{"data":"Q2Fub24gRU9TIDVEIE1hcmsgSVY="}}}` + _ = rt.UpdateDoc(docID, vrs, body) + + // assert that the attachments moved to global xattr after doc update + xattrs, _, err = collection.GetCollectionDatastore().GetXattrs(ctx, docID, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + require.Contains(t, xattrs, base.GlobalXattrName) + require.Contains(t, xattrs, base.SyncXattrName) + + bucketSyncData = db.SyncData{} + globalXattr = db.GlobalSyncData{} + require.NoError(t, base.JSONUnmarshal(xattrs[base.SyncXattrName], &bucketSyncData)) + require.NoError(t, base.JSONUnmarshal(xattrs[base.GlobalXattrName], &globalXattr)) + + assert.Nil(t, bucketSyncData.Attachments) + assert.NotNil(t, globalXattr.GlobalAttachments) + attMeta := globalXattr.GlobalAttachments["camera.txt"].(map[string]interface{}) + assert.Equal(t, float64(20), attMeta["length"]) +}