Skip to content

Commit

Permalink
CBG-4206: read/write attachments to global sync xattr (#7107)
Browse files Browse the repository at this point in the history
* CBG-4206: read/write attchments to global sync xattr

* add commnet to doc struct for global sync

* chnages after rebase

* updates to add new test + test case

* fix misspelling
  • Loading branch information
gregns1 authored and adamcfraser committed Nov 27, 2024
1 parent 864b8a2 commit c016a4a
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 40 deletions.
5 changes: 3 additions & 2 deletions base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ const (
// SyncPropertyName is used when storing sync data inline in a document.
SyncPropertyName = "_sync"
// SyncXattrName is used when storing sync data in a document's xattrs.
SyncXattrName = "_sync"
VvXattrName = "_vv"
SyncXattrName = "_sync"
VvXattrName = "_vv"
GlobalXattrName = "_globalSync"

// MouXattrName is used when storing metadata-only update information in a document's xattrs.
MouXattrName = "_mou"
Expand Down
4 changes: 2 additions & 2 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
}
var doc1DCPBytes []byte
if base.TestUseXattrs() {
body, syncXattr, _, _, err := doc1.MarshalWithXattrs()
body, syncXattr, _, _, _, err := doc1.MarshalWithXattrs()
require.NoError(t, err)
doc1DCPBytes = sgbucket.EncodeValueWithXattrs(body, sgbucket.Xattr{Name: base.SyncXattrName, Value: syncXattr})
} else {
Expand All @@ -1482,7 +1482,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
var dataType sgbucket.FeedDataType = base.MemcachedDataTypeJSON
if base.TestUseXattrs() {
dataType |= base.MemcachedDataTypeXattr
body, syncXattr, _, _, err := doc2.MarshalWithXattrs()
body, syncXattr, _, _, _, err := doc2.MarshalWithXattrs()
require.NoError(t, err)
doc2DCPBytes = sgbucket.EncodeValueWithXattrs(body, sgbucket.Xattr{Name: base.SyncXattrName, Value: syncXattr})
} else {
Expand Down
19 changes: 13 additions & 6 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
func (c *DatabaseCollection) GetDocWithXattrs(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
rawBucketDoc = &sgbucket.BucketDocument{}
var getErr error
rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncAndUserXattrKeys())
rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys())
if getErr != nil {
return nil, nil, getErr
}
Expand All @@ -143,7 +143,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (
if c.UseXattrs() {
// Retrieve doc and xattr from bucket, unmarshal only xattr.
// Triggers on-demand import when document xattr doesn't match cas.
rawDoc, xattrs, cas, getErr := c.dataStore.GetWithXattrs(ctx, key, c.syncAndUserXattrKeys())
rawDoc, xattrs, cas, getErr := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys())
if getErr != nil {
return emptySyncData, getErr
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (

// unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map
func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], xattrs[base.VirtualXattrRevSeqNo], cas, unmarshalLevel)
return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], xattrs[base.VirtualXattrRevSeqNo], xattrs[base.GlobalXattrName], cas, unmarshalLevel)

}

Expand Down Expand Up @@ -2239,7 +2239,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
if expiry != nil {
initialExpiry = *expiry
}
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncMouRevSeqNoAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
// Be careful: this block can be invoked multiple times if there are races!
if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil {
return
Expand Down Expand Up @@ -2295,8 +2295,8 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
// Return the new raw document value for the bucket to store.
doc.SetCrc32cUserXattrHash()

var rawSyncXattr, rawMouXattr, rawVvXattr, rawDocBody []byte
rawDocBody, rawSyncXattr, rawVvXattr, rawMouXattr, err = doc.MarshalWithXattrs()
var rawSyncXattr, rawMouXattr, rawVvXattr, rawGlobalSync, rawDocBody []byte
rawDocBody, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalSync, err = doc.MarshalWithXattrs()
if err != nil {
return updatedDoc, err
}
Expand All @@ -2310,6 +2310,13 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
if rawMouXattr != nil && db.useMou() {
updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr
}
if rawGlobalSync != nil {
updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalSync
} else {
if currentXattrs[base.GlobalXattrName] != nil && !isNewDocCreation {
updatedDoc.XattrsToDelete = append(updatedDoc.XattrsToDelete, base.GlobalXattrName)
}
}

// Warn when sync data is larger than a configured threshold
if db.unsupportedOptions() != nil && db.unsupportedOptions().WarningThresholds != nil {
Expand Down
8 changes: 5 additions & 3 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
doc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.metadataOnlyUpdate)
}

_, rawSyncXattr, rawVvXattr, rawMouXattr, err := updatedDoc.MarshalWithXattrs()
_, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
updatedDoc := sgbucket.UpdatedDoc{
Doc: nil, // Resync does not require document body update
Xattrs: map[string][]byte{
Expand All @@ -1892,13 +1892,15 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(xattrMouCasPath(), sgbucket.MacroCas))
}
}

if rawGlobalXattr != nil {
updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr
}
return updatedDoc, err
}
opts := &sgbucket.MutateInOptions{
MacroExpansion: macroExpandSpec(base.SyncXattrName),
}
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc)
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc)
} else {
_, err = db.dataStore.Update(key, 0, func(currentValue []byte) ([]byte, *uint32, bool, error) {
// Be careful: this block can be invoked multiple times if there are races!
Expand Down
12 changes: 6 additions & 6 deletions db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,21 +237,21 @@ func (c *DatabaseCollection) unsupportedOptions() *UnsupportedOptions {
return c.dbCtx.Options.UnsupportedOptions
}

// syncAndUserXattrKeys returns the xattr keys for the user and sync xattrs.
func (c *DatabaseCollection) syncAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
// syncGlobalSyncAndUserXattrKeys returns the xattr keys for the user and sync xattrs.
func (c *DatabaseCollection) syncGlobalSyncAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName, base.GlobalXattrName}
userXattrKey := c.userXattrKey()
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
}
return xattrKeys
}

// syncMouRevSeqNoAndUserXattrKeys returns the xattr keys for the user, mou, revSeqNo and sync xattrs.
func (c *DatabaseCollection) syncMouRevSeqNoAndUserXattrKeys() []string {
// syncGlobalSyncMouRevSeqNoAndUserXattrKeys returns the xattr keys for the user, mou, revSeqNo and sync xattrs.
func (c *DatabaseCollection) syncGlobalSyncMouRevSeqNoAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
if c.useMou() {
xattrKeys = append(xattrKeys, base.MouXattrName, base.VirtualXattrRevSeqNo)
xattrKeys = append(xattrKeys, base.MouXattrName, base.VirtualXattrRevSeqNo, base.GlobalXattrName)
}
userXattrKey := c.userXattrKey()
if userXattrKey != "" {
Expand Down
57 changes: 44 additions & 13 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (sd *SyncData) HashRedact(salt string) SyncData {
// Document doesn't do any locking - document instances aren't intended to be shared across multiple goroutines.
type Document struct {
SyncData // Sync metadata
GlobalSyncData // Global sync metadata, this will hold non cluster specific sync metadata to be copied by XDCR
_body Body // Marshalled document body. Unmarshalled lazily - should be accessed using Body()
_rawBody []byte // Raw document body, as retrieved from the bucket. Marshaled lazily - should be accessed using BodyBytes()
ID string `json:"-"` // Doc id. (We're already using a custom MarshalJSON for *document that's based on body, so the json:"-" probably isn't needed here)
Expand All @@ -203,6 +204,10 @@ type Document struct {
RevSeqNo uint64 // Server rev seq no for a document
}

type GlobalSyncData struct {
GlobalAttachments AttachmentsMeta `json:"attachments_meta,omitempty"`
}

type historyOnlySyncData struct {
revOnlySyncData
History RevTree `json:"history"`
Expand Down Expand Up @@ -410,14 +415,14 @@ func unmarshalDocument(docid string, data []byte) (*Document, error) {
return doc, nil
}

func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, documentXattr []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, virtualXattr []byte, globalSyncData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {

if len(syncXattrData) == 0 && len(hlvXattrData) == 0 {
// If no xattr data, unmarshal as standard doc
doc, err = unmarshalDocument(docid, data)
} else {
doc = NewDocument(docid)
err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, documentXattr, unmarshalLevel)
err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, virtualXattr, globalSyncData, unmarshalLevel)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -466,7 +471,7 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey
var xattrValues map[string][]byte
var hlv *HybridLogicalVector
if dataType&base.MemcachedDataTypeXattr != 0 {
xattrKeys := []string{base.SyncXattrName, base.MouXattrName, base.VvXattrName}
xattrKeys := []string{base.SyncXattrName, base.MouXattrName, base.VvXattrName, base.GlobalXattrName}
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
}
Expand Down Expand Up @@ -532,7 +537,7 @@ func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, da
if err != nil {
return nil, err
}
return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], xattrs[base.VirtualXattrRevSeqNo], cas, DocUnmarshalAll)
return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], xattrs[base.VirtualXattrRevSeqNo], nil, cas, DocUnmarshalAll)
}

func (doc *SyncData) HasValidSyncData() bool {
Expand Down Expand Up @@ -1095,7 +1100,7 @@ func (doc *Document) MarshalJSON() (data []byte, err error) {
// unmarshalLevel is anything less than the full document + metadata, the raw data is retained for subsequent
// lazy unmarshalling as needed.
// Must handle cases where document body and hlvXattrData are present without syncXattrData for all DocumentUnmarshalLevel
func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, documentXattr []byte, unmarshalLevel DocumentUnmarshalLevel) error {
func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, virtualXattr []byte, globalSyncData []byte, unmarshalLevel DocumentUnmarshalLevel) error {
if doc.ID == "" {
base.WarnfCtx(ctx, "Attempted to unmarshal document without ID set")
return errors.New("Document was unmarshalled without ID set")
Expand All @@ -1117,9 +1122,9 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
}
if documentXattr != nil {
if virtualXattr != nil {
var revSeqNo string
err := base.JSONUnmarshal(documentXattr, &revSeqNo)
err := base.JSONUnmarshal(virtualXattr, &revSeqNo)
if err != nil {
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal doc virtual revSeqNo xattr during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
Expand All @@ -1131,6 +1136,12 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
doc.RevSeqNo = revNo
}
}
if len(globalSyncData) > 0 {
if err := base.JSONUnmarshal(globalSyncData, &doc.GlobalSyncData); err != nil {
base.WarnfCtx(ctx, "Failed to unmarshal globalSync xattr for key %v, globalSync will be ignored. Err: %v globalSync:%s", base.UD(doc.ID), err, globalSyncData)
}
doc.SyncData.Attachments = doc.GlobalSyncData.GlobalAttachments
}
doc._rawBody = data
// Unmarshal body if requested and present
if unmarshalLevel == DocUnmarshalAll && len(data) > 0 {
Expand All @@ -1151,6 +1162,12 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalNoHistory). Error: %v", base.UD(doc.ID), err))
}
}
if len(globalSyncData) > 0 {
if err := base.JSONUnmarshal(globalSyncData, &doc.GlobalSyncData); err != nil {
base.WarnfCtx(ctx, "Failed to unmarshal globalSync xattr for key %v, globalSync will be ignored. Err: %v globalSync:%s", base.UD(doc.ID), err, globalSyncData)
}
doc.SyncData.Attachments = doc.GlobalSyncData.GlobalAttachments
}
doc._rawBody = data
case DocUnmarshalHistory:
if syncXattrData != nil {
Expand Down Expand Up @@ -1211,7 +1228,7 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
}

// MarshalWithXattrs marshals the Document into body, and sync, vv and mou xattrs for persistence.
func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr, vvXattr, mouXattr []byte, err error) {
func (doc *Document) MarshalWithXattrs() (data, syncXattr, vvXattr, mouXattr, globalXattr []byte, err error) {
// Grab the rawBody if it's already marshalled, otherwise unmarshal the body
if doc._rawBody != nil {
if !doc.IsDeleted() {
Expand All @@ -1228,31 +1245,45 @@ func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr, vvXattr, mouXa
if !deleted {
data, err = base.JSONMarshal(body)
if err != nil {
return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc body with id: %s. Error: %v", base.UD(doc.ID), err))
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc body with id: %s. Error: %v", base.UD(doc.ID), err))
}
}
}
}
if doc.SyncData.HLV != nil {
vvXattr, err = base.JSONMarshal(&doc.SyncData.HLV)
if err != nil {
return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc vv with id: %s. Error: %v", base.UD(doc.ID), err))
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc vv with id: %s. Error: %v", base.UD(doc.ID), err))
}
}
// assign any attachments we have stored in document sync data to global sync data
// then nil the sync data attachments to prevent marshalling of it
doc.GlobalSyncData.GlobalAttachments = doc.Attachments
doc.Attachments = nil

syncXattr, err = base.JSONMarshal(doc.SyncData)
if err != nil {
return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err))
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err))
}

if doc.metadataOnlyUpdate != nil {
mouXattr, err = base.JSONMarshal(doc.metadataOnlyUpdate)
if err != nil {
return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc MouData with id: %s. Error: %v", base.UD(doc.ID), err))
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc MouData with id: %s. Error: %v", base.UD(doc.ID), err))
}
}
// marshal global xattrs if there are attachments defined
if len(doc.GlobalSyncData.GlobalAttachments) > 0 {
globalXattr, err = base.JSONMarshal(doc.GlobalSyncData)
if err != nil {
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc GlobalXattr with id: %s. Error: %v", base.UD(doc.ID), err))
}
// restore attachment meta to sync data post global xattr construction
doc.Attachments = make(AttachmentsMeta)
doc.Attachments = doc.GlobalSyncData.GlobalAttachments
}

return data, syncXattr, vvXattr, mouXattr, nil
return data, syncXattr, vvXattr, mouXattr, globalXattr, nil
}

// computeMetadataOnlyUpdate computes a new metadataOnlyUpdate based on the existing document's CAS and metadataOnlyUpdate
Expand Down
Loading

0 comments on commit c016a4a

Please sign in to comment.