Skip to content

Commit

Permalink
CBG-3715: populate pRev on mou (#7099)
Browse files Browse the repository at this point in the history
  • Loading branch information
gregns1 authored and adamcfraser committed Nov 27, 2024
1 parent 20a687e commit 864b8a2
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 42 deletions.
4 changes: 2 additions & 2 deletions base/bucket_gocb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,11 @@ func TestXattrWriteCasSimple(t *testing.T) {
assert.Equal(t, Crc32cHashString(valBytes), macroBodyHashString)

// Validate against $document.value_crc32c
_, xattrs, _, err = dataStore.GetWithXattrs(ctx, key, []string{"$document"})
_, xattrs, _, err = dataStore.GetWithXattrs(ctx, key, []string{VirtualDocumentXattr})
require.NoError(t, err)

var retrievedVxattr map[string]interface{}
require.NoError(t, json.Unmarshal(xattrs["$document"], &retrievedVxattr))
require.NoError(t, json.Unmarshal(xattrs[VirtualDocumentXattr], &retrievedVxattr))

vxattrCrc32c, ok := retrievedVxattr["value_crc32c"].(string)
assert.True(t, ok, "Unable to retrieve virtual xattr crc32c as string")
Expand Down
5 changes: 5 additions & 0 deletions base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ const (
// Intended to be used in Meta Map and related tests
MetaMapXattrsKey = "xattrs"

// VirtualXattrRevSeqNo is used to fetch rev seq no from documents virtual xattr
VirtualXattrRevSeqNo = "$document.revid"
// VirtualDocumentXattr is used to fetch the documents virtual xattr
VirtualDocumentXattr = "$document"

// Prefix for transaction metadata documents
TxnPrefix = "_txn:"

Expand Down
22 changes: 18 additions & 4 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (

// unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map
func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], cas, unmarshalLevel)
return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], xattrs[base.VirtualXattrRevSeqNo], cas, unmarshalLevel)

}

Expand Down Expand Up @@ -245,7 +245,15 @@ func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid str
importDb := DatabaseCollectionWithUser{DatabaseCollection: c, user: nil}
var importErr error

docOut, importErr = importDb.ImportDocRaw(ctx, docid, rawDoc, xattrs, isDelete, cas, nil, ImportOnDemand)
importOpts := importDocOptions{
isDelete: isDelete,
mode: ImportOnDemand,
revSeqNo: 0, // pending work in CBG-4203
expiry: nil,
}

// RevSeqNo is 0 here pending work in CBG-4203
docOut, importErr = importDb.ImportDocRaw(ctx, docid, rawDoc, xattrs, importOpts, cas)

if importErr == base.ErrImportCancelledFilter {
// If the import was cancelled due to filter, treat as 404 not imported
Expand Down Expand Up @@ -868,7 +876,13 @@ func (db *DatabaseCollectionWithUser) OnDemandImportForWrite(ctx context.Context
// Use an admin-scoped database for import
importDb := DatabaseCollectionWithUser{DatabaseCollection: db.DatabaseCollection, user: nil}

importedDoc, importErr := importDb.ImportDoc(ctx, docid, doc, isDelete, nil, ImportOnDemand) // nolint:staticcheck
importOpts := importDocOptions{
expiry: nil,
mode: ImportOnDemand,
isDelete: isDelete,
revSeqNo: 0, // pending work in CBG-4203
}
importedDoc, importErr := importDb.ImportDoc(ctx, docid, doc, importOpts) // nolint:staticcheck

if importErr == base.ErrImportCancelledFilter {
// Document exists, but existing doc wasn't imported based on import filter. Treat write as insert
Expand Down Expand Up @@ -2225,7 +2239,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
if expiry != nil {
initialExpiry = *expiry
}
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncMouAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncMouRevSeqNoAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
// Be careful: this block can be invoked multiple times if there are races!
if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil {
return
Expand Down
4 changes: 2 additions & 2 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1874,7 +1874,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,

// Update metadataOnlyUpdate based on previous Cas, metadataOnlyUpdate
if db.useMou() {
doc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.metadataOnlyUpdate)
doc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.metadataOnlyUpdate)
}

_, rawSyncXattr, rawVvXattr, rawMouXattr, err := updatedDoc.MarshalWithXattrs()
Expand All @@ -1898,7 +1898,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
opts := &sgbucket.MutateInOptions{
MacroExpansion: macroExpandSpec(base.SyncXattrName),
}
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncMouAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc)
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc)
} else {
_, err = db.dataStore.Update(key, 0, func(currentValue []byte) ([]byte, *uint32, bool, error) {
// Be careful: this block can be invoked multiple times if there are races!
Expand Down
6 changes: 3 additions & 3 deletions db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,11 @@ func (c *DatabaseCollection) syncAndUserXattrKeys() []string {
return xattrKeys
}

// syncMouAndUserXattrKeys returns the xattr keys for the user, mou and sync xattrs.
func (c *DatabaseCollection) syncMouAndUserXattrKeys() []string {
// syncMouRevSeqNoAndUserXattrKeys returns the xattr keys for the user, mou, revSeqNo and sync xattrs.
func (c *DatabaseCollection) syncMouRevSeqNoAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
if c.useMou() {
xattrKeys = append(xattrKeys, base.MouXattrName)
xattrKeys = append(xattrKeys, base.MouXattrName, base.VirtualXattrRevSeqNo)
}
userXattrKey := c.userXattrKey()
if userXattrKey != "" {
Expand Down
36 changes: 27 additions & 9 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"errors"
"fmt"
"math"
"strconv"
"time"

sgbucket "github.com/couchbase/sg-bucket"
Expand Down Expand Up @@ -63,8 +64,9 @@ type ChannelSetEntry struct {
}

type MetadataOnlyUpdate struct {
CAS string `json:"cas,omitempty"`
PreviousCAS string `json:"pCas,omitempty"`
CAS string `json:"cas,omitempty"`
PreviousCAS string `json:"pCas,omitempty"`
PreviousRevSeqNo uint64 `json:"pRev,omitempty"`
}

// The sync-gateway metadata stored in the "_sync" property of a Couchbase document.
Expand Down Expand Up @@ -198,6 +200,7 @@ type Document struct {
RevID string
DocAttachments AttachmentsMeta
inlineSyncData bool
RevSeqNo uint64 // Server rev seq no for a document
}

type historyOnlySyncData struct {
Expand Down Expand Up @@ -407,14 +410,14 @@ func unmarshalDocument(docid string, data []byte) (*Document, error) {
return doc, nil
}

func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, syncXattrData, hlvXattrData, mouXattrData, userXattrData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, documentXattr []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {

if len(syncXattrData) == 0 && len(hlvXattrData) == 0 {
// If no xattr data, unmarshal as standard doc
doc, err = unmarshalDocument(docid, data)
} else {
doc = NewDocument(docid)
err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, unmarshalLevel)
err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, documentXattr, unmarshalLevel)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -529,7 +532,7 @@ func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, da
if err != nil {
return nil, err
}
return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], cas, DocUnmarshalAll)
return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], xattrs[base.VirtualXattrRevSeqNo], cas, DocUnmarshalAll)
}

func (doc *SyncData) HasValidSyncData() bool {
Expand Down Expand Up @@ -1092,7 +1095,7 @@ func (doc *Document) MarshalJSON() (data []byte, err error) {
// unmarshalLevel is anything less than the full document + metadata, the raw data is retained for subsequent
// lazy unmarshalling as needed.
// Must handle cases where document body and hlvXattrData are present without syncXattrData for all DocumentUnmarshalLevel
func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData []byte, unmarshalLevel DocumentUnmarshalLevel) error {
func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, documentXattr []byte, unmarshalLevel DocumentUnmarshalLevel) error {
if doc.ID == "" {
base.WarnfCtx(ctx, "Attempted to unmarshal document without ID set")
return errors.New("Document was unmarshalled without ID set")
Expand All @@ -1114,6 +1117,20 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
}
if documentXattr != nil {
var revSeqNo string
err := base.JSONUnmarshal(documentXattr, &revSeqNo)
if err != nil {
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal doc virtual revSeqNo xattr during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
if revSeqNo != "" {
revNo, err := strconv.ParseUint(revSeqNo, 10, 64)
if err != nil {
return pkgerrors.WithStack(base.RedactErrorf("Failed convert rev seq number %q during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", revSeqNo, base.UD(doc.ID), err))
}
doc.RevSeqNo = revNo
}
}
doc._rawBody = data
// Unmarshal body if requested and present
if unmarshalLevel == DocUnmarshalAll && len(data) > 0 {
Expand Down Expand Up @@ -1239,7 +1256,7 @@ func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr, vvXattr, mouXa
}

// computeMetadataOnlyUpdate computes a new metadataOnlyUpdate based on the existing document's CAS and metadataOnlyUpdate
func computeMetadataOnlyUpdate(currentCas uint64, currentMou *MetadataOnlyUpdate) *MetadataOnlyUpdate {
func computeMetadataOnlyUpdate(currentCas uint64, revNo uint64, currentMou *MetadataOnlyUpdate) *MetadataOnlyUpdate {
var prevCas string
currentCasString := base.CasToString(currentCas)
if currentMou != nil && currentCasString == currentMou.CAS {
Expand All @@ -1249,8 +1266,9 @@ func computeMetadataOnlyUpdate(currentCas uint64, currentMou *MetadataOnlyUpdate
}

metadataOnlyUpdate := &MetadataOnlyUpdate{
CAS: expandMacroCASValue, // when non-empty, this is replaced with cas macro expansion
PreviousCAS: prevCas,
CAS: expandMacroCASValue, // when non-empty, this is replaced with cas macro expansion
PreviousCAS: prevCas,
PreviousRevSeqNo: revNo,
}
return metadataOnlyUpdate
}
Expand Down
10 changes: 5 additions & 5 deletions db/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func BenchmarkDocUnmarshal(b *testing.B) {
b.Run(bm.name, func(b *testing.B) {
ctx := base.TestCtx(b)
for i := 0; i < b.N; i++ {
_, _ = unmarshalDocumentWithXattrs(ctx, "doc_1k", doc1k_body, doc1k_meta, nil, nil, nil, 1, bm.unmarshalLevel)
_, _ = unmarshalDocumentWithXattrs(ctx, "doc_1k", doc1k_body, doc1k_meta, nil, nil, nil, nil, 1, bm.unmarshalLevel)
}
})
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestParseVersionVectorSyncData(t *testing.T) {

sync_meta := []byte(doc_meta_no_vv)
vv_meta := []byte(doc_meta_vv)
doc, err := unmarshalDocumentWithXattrs(ctx, "doc_1k", nil, sync_meta, vv_meta, nil, nil, 1, DocUnmarshalNoHistory)
doc, err := unmarshalDocumentWithXattrs(ctx, "doc_1k", nil, sync_meta, vv_meta, nil, nil, nil, 1, DocUnmarshalNoHistory)
require.NoError(t, err)

strCAS := string(base.Uint64CASToLittleEndianHex(123456))
Expand All @@ -274,7 +274,7 @@ func TestParseVersionVectorSyncData(t *testing.T) {
assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions))
assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions))

doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, 1, DocUnmarshalAll)
doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, nil, 1, DocUnmarshalAll)
require.NoError(t, err)

// assert on doc version vector values
Expand All @@ -284,7 +284,7 @@ func TestParseVersionVectorSyncData(t *testing.T) {
assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions))
assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions))

doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, 1, DocUnmarshalNoHistory)
doc, err = unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, nil, 1, DocUnmarshalNoHistory)
require.NoError(t, err)

// assert on doc version vector values
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestRevAndVersion(t *testing.T) {
require.NoError(t, err)

newDocument := NewDocument("docID")
err = newDocument.UnmarshalWithXattrs(ctx, marshalledDoc, marshalledXattr, marshalledVvXattr, DocUnmarshalAll)
err = newDocument.UnmarshalWithXattrs(ctx, marshalledDoc, marshalledXattr, marshalledVvXattr, nil, DocUnmarshalAll)
require.NoError(t, err)
require.Equal(t, test.revTreeID, newDocument.CurrentRev)
require.Equal(t, expectedSequence, newDocument.Sequence)
Expand Down
21 changes: 18 additions & 3 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,13 @@ func TestHLVImport(t *testing.T) {
standardImportBody := []byte(`{"prop":"value"}`)
cas, err := collection.dataStore.WriteCas(standardImportKey, 0, 0, standardImportBody, sgbucket.Raw)
require.NoError(t, err, "write error")
_, err = collection.ImportDocRaw(ctx, standardImportKey, standardImportBody, nil, false, cas, nil, ImportFromFeed)
importOpts := importDocOptions{
isDelete: false,
expiry: nil,
mode: ImportFromFeed,
revSeqNo: 1,
}
_, err = collection.ImportDocRaw(ctx, standardImportKey, standardImportBody, nil, importOpts, cas)
require.NoError(t, err, "import error")
importedDoc, _, err := collection.GetDocWithXattrs(ctx, standardImportKey, DocUnmarshalAll)
Expand All @@ -296,11 +302,20 @@ func TestHLVImport(t *testing.T) {
existingHLVKey := "existingHLV_" + t.Name()
_ = hlvHelper.insertWithHLV(ctx, existingHLVKey)
existingBody, existingXattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, existingHLVKey, []string{base.SyncXattrName, base.VvXattrName})
existingBody, existingXattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, existingHLVKey, []string{base.SyncXattrName, base.VvXattrName, base.VirtualXattrRevSeqNo})
require.NoError(t, err)
encodedCAS = EncodeValue(cas)
_, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattrs, false, cas, nil, ImportFromFeed)
docxattr, _ := existingXattrs[base.VirtualXattrRevSeqNo]
revSeqNo := RetrieveDocRevSeqNo(t, docxattr)
importOpts = importDocOptions{
isDelete: false,
expiry: nil,
mode: ImportFromFeed,
revSeqNo: revSeqNo,
}
_, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattrs, importOpts, cas)
require.NoError(t, err, "import error")
importedDoc, _, err = collection.GetDocWithXattrs(ctx, existingHLVKey, DocUnmarshalAll)
Expand Down
21 changes: 14 additions & 7 deletions db/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ const (
ImportOnDemand // On-demand import. Reattempt import on cas write failure of the imported doc until either the import succeeds, or existing doc is an SG write.
)

type importDocOptions struct {
expiry *uint32
isDelete bool
revSeqNo uint64
mode ImportMode
}

// Imports a document that was written by someone other than sync gateway, given the existing state of the doc in raw bytes
func (db *DatabaseCollectionWithUser) ImportDocRaw(ctx context.Context, docid string, value []byte, xattrs map[string][]byte, isDelete bool, cas uint64, expiry *uint32, mode ImportMode) (docOut *Document, err error) {
func (db *DatabaseCollectionWithUser) ImportDocRaw(ctx context.Context, docid string, value []byte, xattrs map[string][]byte, importOpts importDocOptions, cas uint64) (docOut *Document, err error) {

var body Body
if isDelete {
if importOpts.isDelete {
body = Body{}
} else {
err := body.Unmarshal(value)
Expand All @@ -58,11 +65,11 @@ func (db *DatabaseCollectionWithUser) ImportDocRaw(ctx context.Context, docid st
Cas: cas,
}

return db.importDoc(ctx, docid, body, expiry, isDelete, existingBucketDoc, mode)
return db.importDoc(ctx, docid, body, importOpts.expiry, importOpts.isDelete, importOpts.revSeqNo, existingBucketDoc, importOpts.mode)
}

// Import a document, given the existing state of the doc in *document format.
func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid string, existingDoc *Document, isDelete bool, expiry *uint32, mode ImportMode) (docOut *Document, err error) {
func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid string, existingDoc *Document, importOpts importDocOptions) (docOut *Document, err error) {

if existingDoc == nil {
return nil, base.RedactErrorf("No existing doc present when attempting to import %s", base.UD(docid))
Expand Down Expand Up @@ -98,7 +105,7 @@ func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid strin
return nil, err
}

return db.importDoc(ctx, docid, existingDoc.Body(ctx), expiry, isDelete, existingBucketDoc, mode)
return db.importDoc(ctx, docid, existingDoc.Body(ctx), importOpts.expiry, importOpts.isDelete, importOpts.revSeqNo, existingBucketDoc, importOpts.mode)
}

// Import document
Expand All @@ -108,7 +115,7 @@ func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid strin
// isDelete - whether the document to be imported is a delete
// existingDoc - bytes/cas/expiry of the document to be imported (including xattr when available)
// mode - ImportMode - ImportFromFeed or ImportOnDemand
func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid string, body Body, expiry *uint32, isDelete bool, existingDoc *sgbucket.BucketDocument, mode ImportMode) (docOut *Document, err error) {
func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid string, body Body, expiry *uint32, isDelete bool, revNo uint64, existingDoc *sgbucket.BucketDocument, mode ImportMode) (docOut *Document, err error) {

base.DebugfCtx(ctx, base.KeyImport, "Attempting to import doc %q...", base.UD(docid))
importStartTime := time.Now()
Expand Down Expand Up @@ -330,7 +337,7 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin

// If this is a metadata-only update, set metadataOnlyUpdate based on old doc's cas and mou
if metadataOnlyUpdate && db.useMou() {
newDoc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.metadataOnlyUpdate)
newDoc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, revNo, doc.metadataOnlyUpdate)
}

return newDoc, nil, !shouldGenerateNewRev, updatedExpiry, nil
Expand Down
8 changes: 7 additions & 1 deletion db/import_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,14 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab
return
default:
}
importOpts := importDocOptions{
isDelete: isDelete,
mode: ImportFromFeed,
expiry: &event.Expiry,
revSeqNo: event.RevNo,
}

_, err := collection.ImportDocRaw(ctx, docID, rawBody, rawXattrs, isDelete, event.Cas, &event.Expiry, ImportFromFeed)
_, err := collection.ImportDocRaw(ctx, docID, rawBody, rawXattrs, importOpts, event.Cas)
if err != nil {
if err == base.ErrImportCasFailure {
base.DebugfCtx(ctx, base.KeyImport, "Not importing mutation - document %s has been subsequently updated and will be imported based on that mutation.", base.UD(docID))
Expand Down
Loading

0 comments on commit 864b8a2

Please sign in to comment.