Skip to content

Commit

Permalink
address commnets
Browse files Browse the repository at this point in the history
  • Loading branch information
gregns1 committed Oct 31, 2023
1 parent b65c7b8 commit 5c022b7
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 97 deletions.
77 changes: 35 additions & 42 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,16 +1044,24 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
return newRevID, doc, err
}

func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, docHistory []string, docHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *CurrentVersionVector, newRevID string, err error) {
newRev := docHistory[0]
generation, _ := ParseRevID(ctx, newRev)
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, docHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *CurrentVersionVector, newRevID string, err error) {
var matchRev string
if existingDoc != nil {
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattr, existingDoc.UserXattr, existingDoc.Cas, DocUnmarshalRev)
if unmarshalErr != nil {
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc")
}
matchRev = doc.CurrentRev
}
generation, _ := ParseRevID(ctx, matchRev)
if generation < 0 {
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Invalid revision ID")
}
generation++

docUpdateEvent := ExistingVersion
allowImport := db.UseXattrs()
doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// (Be careful: this block can be invoked multiple times if there are races!)

var isSgWrite bool
Expand All @@ -1075,55 +1083,40 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
}
}

// Find the point where this doc's history branches from the current rev:
currentRevIndex := len(docHistory)
parent := ""
for i, revid := range docHistory {
if doc.History.contains(revid) {
currentRevIndex = i
parent = revid
break
}
}
if currentRevIndex == 0 {
base.DebugfCtx(ctx, base.KeyCRUD, "PutExistingRevWithBody(%q): No new revisions to add", base.UD(newDoc.ID))
newDoc.RevID = newRev
return nil, nil, false, nil, base.ErrUpdateCancel // No new revisions to add
}

// Conflict check here
// if doc has no HLV defined this is a new doc we haven't seen before, skip conflict check
if doc.HLV == nil {
doc.HLV = &HybridLogicalVector{}
}
if !docHLV.IsInConflict(*doc.HLV) {
// update hlv for all newer incoming source version pairs
doc.HLV.AddNewerVersions(docHLV)
} else {
base.InfofCtx(ctx, base.KeyCRUD, "conflict detected between the two HLV's for doc %s", base.UD(doc.ID))
// cancel rest of update, HLV needs to be sent back to client with merge versions populated
return nil, nil, false, nil, base.HTTPErrorf(http.StatusConflict, "Document revision conflict")
}

// Add all the new-to-me revisions to the rev tree:
for i := currentRevIndex - 1; i >= 0; i-- {
err := doc.History.addRevision(newDoc.ID,
RevInfo{
ID: docHistory[i],
Parent: parent,
Deleted: i == 0 && newDoc.Deleted})

if err != nil {
return nil, nil, false, nil, err
if !docHLV.IsInConflict(*doc.HLV) {
// update hlv for all newer incoming source version pairs
doc.HLV.AddNewerVersions(docHLV)
} else {
base.InfofCtx(ctx, base.KeyCRUD, "conflict detected between the two HLV's for doc %s", base.UD(doc.ID))
// cancel rest of update, HLV needs to be sent back to client with merge versions populated
return nil, nil, false, nil, base.HTTPErrorf(http.StatusConflict, "Document revision conflict")
}
parent = docHistory[i]
}

parentRevID := doc.History[newRev].Parent
// Process the attachments, replacing bodies with digests.
newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, generation, parentRevID, docHistory)
newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, generation, matchRev, nil)
if err != nil {
return nil, nil, false, nil, err
}

// generate rev id for new arriving doc
strippedBody, _ := stripInternalProperties(newDoc._body)
encoding, err := base.JSONMarshalCanonical(strippedBody)
if err != nil {
return nil, nil, false, nil, err
}
newRev := CreateRevIDWithBytes(generation, matchRev, encoding)

if err := doc.History.addRevision(newDoc.ID, RevInfo{ID: newRev, Parent: matchRev, Deleted: newDoc.Deleted}); err != nil {
base.InfofCtx(ctx, base.KeyCRUD, "Failed to add revision ID: %s, for doc: %s, error: %v", newRev, base.UD(newDoc.ID), err)
return nil, nil, false, nil, base.ErrRevTreeAddRevFailure
}

newDoc.RevID = newRev

Expand All @@ -1139,7 +1132,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
cv.VersionCAS = version
}

return doc, cv, newRev, err
return doc, cv, newRevID, err
}

// Adds an existing revision to a document along with its history (list of rev IDs.)
Expand Down
58 changes: 14 additions & 44 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1664,7 +1664,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
// PUT an update to the above doc
body = Body{"key1": "value11"}
body[BodyRev] = rev
rev2, _, err := collection.Put(ctx, key, body)
_, _, err = collection.Put(ctx, key, body)
require.NoError(t, err)

// grab the new version for the above update to assert against later in test
Expand Down Expand Up @@ -1693,7 +1693,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
_, rawDoc, err := collection.GetDocumentWithRaw(ctx, key, DocUnmarshalSync)
require.NoError(t, err)

doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, []string{"3-c", rev2, rev}, incomingHLV, rawDoc)
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, rawDoc)
require.NoError(t, err)
// assert on returned CV
assert.Equal(t, "test", cv.SourceID)
Expand All @@ -1713,7 +1713,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
// update the pv map so we can assert we have correct pv map in HLV
pv[bucketUUID] = docUpdateVersion
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
assert.Equal(t, "3-c", syncData.CurrentRev)
assert.Equal(t, "3-60b024c44c283b369116c2c2570e8088", syncData.CurrentRev)
}

// TestPutExistingCurrentVersionWithConflict:
Expand All @@ -1735,7 +1735,7 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
key := "doc1"
body := Body{"key1": "value1"}

rev, _, err := collection.Put(ctx, key, body)
_, _, err := collection.Put(ctx, key, body)
require.NoError(t, err)

// assert on the HLV values after the above creation of the doc
Expand All @@ -1759,7 +1759,7 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
require.NoError(t, err)

// assert that a conflict is correctly identified and the resulting doc and cv are nil
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, []string{"2-b", rev}, incomingHLV, rawDoc)
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, rawDoc)
require.Error(t, err)
assert.ErrorContains(t, err, "Document revision conflict")
assert.Nil(t, cv)
Expand All @@ -1783,52 +1783,23 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
bucketUUID := db.BucketUUID
collection := GetSingleDatabaseCollectionWithUser(t, db)

// create a new doc
key := "doc1"
body := Body{"key1": "value1"}

rev, _, err := collection.Put(ctx, key, body)
require.NoError(t, err)

// assert on the HLV values after the above creation of the doc
syncData, err := collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)

// store the cas version allocated to the above doc creation for creation of incoming HLV later in test
originalDocVersion := syncData.HLV.Version

// PUT an update to the above doc
body = Body{"key1": "value11"}
body[BodyRev] = rev
rev2, _, err := collection.Put(ctx, key, body)
require.NoError(t, err)

// grab the new version for the above update to assert against later in test
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
docUpdateVersion := syncData.HLV.Version

// construct a mock doc update coming over a replicator
body = Body{"key1": "value2"}
newDoc := constructDocumentFromBody(key, body)
body := Body{"key1": "value2"}
newDoc := constructDocumentFromBody("doc2", body)

// construct a HLV that simulates a doc update happening on a client
// this means moving the current source version pair to PV and adding new sourceID and version pair to CV
pv := make(map[string]uint64)
pv[bucketUUID] = originalDocVersion
pv[bucketUUID] = 2
// create a version larger than the allocated version above
incomingVersion := docUpdateVersion + 10
incomingVersion := uint64(2 + 10)
incomingHLV := HybridLogicalVector{
SourceID: "test",
Version: incomingVersion,
PreviousVersions: pv,
}
// call PutExistingCurrentVersion with nil existing doc
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, []string{"3-c", rev2, rev}, incomingHLV, nil)
// call PutExistingCurrentVersion with empty existing doc
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, &sgbucket.BucketDocument{})
require.NoError(t, err)
assert.NotNil(t, doc)
// assert on returned CV value
Expand All @@ -1839,14 +1810,13 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
// assert on the sync data from the above update to the doc
// CV should be equal to CV of update on client but the cvCAS should be updated with the new update and
// PV should contain the old CV pair
syncData, err = collection.GetDocSyncData(ctx, "doc1")
syncData, err := collection.GetDocSyncData(ctx, "doc2")
assert.NoError(t, err)
uintCAS = base.HexCasToUint64(syncData.Cas)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, "test", syncData.HLV.SourceID)
assert.Equal(t, incomingVersion, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
// update the pv map so we can assert we have correct pv map in HLV
pv[bucketUUID] = docUpdateVersion
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
assert.Equal(t, "3-c", syncData.CurrentRev)
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
}
30 changes: 19 additions & 11 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,27 @@ func (hlv *HybridLogicalVector) GetVersion(sourceID string) uint64 {
// AddNewerVersions will take a hlv and add any newer source/version pairs found across CV and PV found in the other HLV taken as parameter
func (hlv *HybridLogicalVector) AddNewerVersions(otherVector HybridLogicalVector) {
if otherVector.PreviousVersions != nil || len(otherVector.PreviousVersions) != 0 {
_, localVersion := hlv.GetCurrentVersion()
// Iterate through incoming vector previous versions, compare to current version cas of local HLV
// Any more recent versions in incoming vectors previous versions need to be added to local HLV previous versions
// Iterate through incoming vector previous versions, update with the version from other vector
// for source if the local version for that source is lower
for i, v := range otherVector.PreviousVersions {
if v > localVersion {
hlv.addToPreviousVersions(i, v)
if hlv.PreviousVersions[i] < v {
hlv.setPreviousVersion(i, v)
}
}
}

// create current version for incoming vector and attempt to add it to the local HLV, AddVersion will handle if attempting to add older
// version than local HLVs CV pair
otherVectorCV := CurrentVersionVector{SourceID: otherVector.SourceID, VersionCAS: otherVector.Version}
_ = hlv.AddVersion(otherVectorCV)
err := hlv.AddVersion(otherVectorCV)
if err != nil {
// we get here if the other HLV has lower current version than local HLV, so we
// need to check if the current source between the two are not equal then the
// incoming CV pair needs to be added to PV on local HLV
if hlv.SourceID != otherVector.SourceID {
hlv.setPreviousVersion(otherVector.SourceID, otherVector.Version)
}
}
}

func (hlv HybridLogicalVector) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -305,10 +313,10 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi
return outputSpec
}

// addToPreviousVersions will take a source/version pair and add it to the HLV previous versions map
func (hlv *HybridLogicalVector) addToPreviousVersions(source string, version uint64) {
if hlv.MergeVersions == nil {
hlv.MergeVersions = make(map[string]uint64)
// setPreviousVersion will take a source/version pair and add it to the HLV previous versions map
func (hlv *HybridLogicalVector) setPreviousVersion(source string, version uint64) {
if hlv.PreviousVersions == nil {
hlv.PreviousVersions = make(map[string]uint64)
}
hlv.MergeVersions[source] = version
hlv.PreviousVersions[source] = version
}

0 comments on commit 5c022b7

Please sign in to comment.