diff --git a/db/blip_handler.go b/db/blip_handler.go index 379a5926f9..94789d113d 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -822,6 +822,8 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error { defer func() { bh.replicationStats.HandleChangesTime.Add(time.Since(startTime).Nanoseconds()) }() + changesContainLegacyRevs := false // keep track if proposed changes have legacy revs for delta sync purposes + versionVectorProtocol := bh.useHLV() for i, change := range changeList { docID := change[0].(string) @@ -832,9 +834,16 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error { } var status ProposedRevStatus var currentRev string - if bh.useHLV() { + + changeIsVector := false + if versionVectorProtocol { + // only check if rev is vector in VV replication mode + changeIsVector = strings.Contains(rev, "@") + } + if versionVectorProtocol && changeIsVector { status, currentRev = bh.collection.CheckProposedVersion(bh.loggingCtx, docID, rev, parentRevID) } else { + changesContainLegacyRevs = true status, currentRev = bh.collection.CheckProposedRev(bh.loggingCtx, docID, rev, parentRevID) } if status == ProposedRev_OK_IsNew { @@ -866,8 +875,8 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error { } output.Write([]byte("]")) response := rq.Response() - // Disable delta sync for protocol versions < 4, CBG-3748 (backwards compatibility for revID delta sync) - if bh.sgCanUseDeltas && bh.useHLV() { + // Disable delta sync for protocol versions < 4 or changes batches that have legacy revs in them, CBG-3748 (backwards compatibility for revID delta sync) + if bh.sgCanUseDeltas && bh.useHLV() && !changesContainLegacyRevs { base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on proposeChanges response") response.Properties[ChangesResponseDeltas] = trueProperty } @@ -887,13 +896,13 @@ func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sen } else if base.IsFleeceDeltaError(err) { // Something went wrong in the diffing library. We want to know about this! base.WarnfCtx(ctx, "Falling back to full body replication. Error generating delta from %s to %s for key %s - err: %v", deltaSrcRevID, revID, base.UD(docID), err) - return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx) + return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false) } else if err == base.ErrDeltaSourceIsTombstone { base.TracefCtx(ctx, base.KeySync, "Falling back to full body replication. Delta source %s is tombstone. Unable to generate delta to %s for key %s", deltaSrcRevID, revID, base.UD(docID)) - return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx) + return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false) } else if err != nil { base.DebugfCtx(ctx, base.KeySync, "Falling back to full body replication. Couldn't get delta from %s to %s for key %s - err: %v", deltaSrcRevID, revID, base.UD(docID), err) - return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx) + return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false) } if redactedRev != nil { @@ -909,12 +918,12 @@ func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sen if revDelta == nil { base.DebugfCtx(ctx, base.KeySync, "Falling back to full body replication. Couldn't get delta from %s to %s for key %s", deltaSrcRevID, revID, base.UD(docID)) - return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx) + return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false) } resendFullRevisionFunc := func() error { base.InfofCtx(ctx, base.KeySync, "Resending revision as full body. Peer couldn't process delta %s from %s to %s for key %s", base.UD(revDelta.DeltaBytes), deltaSrcRevID, revID, base.UD(docID)) - return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx) + return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false) } base.TracefCtx(ctx, base.KeySync, "docID: %s - delta: %v", base.UD(docID), base.UD(string(revDelta.DeltaBytes))) @@ -1059,7 +1068,8 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err historyStr := rq.Properties[RevMessageHistory] var incomingHLV *HybridLogicalVector // Build history/HLV - if !bh.useHLV() { + changeIsVector := strings.Contains(rev, "@") + if !bh.useHLV() || !changeIsVector { newDoc.RevID = rev history = []string{rev} if historyStr != "" { @@ -1287,7 +1297,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err // If the doc is a tombstone we want to allow conflicts when running SGR2 // bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2 forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2) - if bh.useHLV() { + if bh.useHLV() && changeIsVector { _, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc) } else if bh.conflictResolver != nil { _, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV) diff --git a/db/blip_sync_context.go b/db/blip_sync_context.go index c16165773c..75a2b5303f 100644 --- a/db/blip_sync_context.go +++ b/db/blip_sync_context.go @@ -340,6 +340,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b if err != nil { return err } + versionVectorProtocol := bsc.useHLV() for i, knownRevsArrayInterface := range answer { seq := changeArray[i][0].(SequenceID) @@ -347,6 +348,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b rev := changeArray[i][2].(string) if knownRevsArray, ok := knownRevsArrayInterface.([]interface{}); ok { + legacyRev := false deltaSrcRevID := "" knownRevs := knownRevsByDoc[docID] if knownRevs == nil { @@ -358,10 +360,10 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b // revtree clients. For HLV clients, use the cv as deltaSrc if bsc.useDeltas && len(knownRevsArray) > 0 { if revID, ok := knownRevsArray[0].(string); ok { - if bsc.useHLV() { + if versionVectorProtocol { msgHLV, err := extractHLVFromBlipMessage(revID) if err != nil { - base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", docID) + base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", base.UD(docID)) deltaSrcRevID = "" // will force falling back to full body replication below } else { deltaSrcRevID = msgHLV.GetCurrentVersionString() @@ -375,7 +377,12 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b for _, rev := range knownRevsArray { if revID, ok := rev.(string); ok { msgHLV, err := extractHLVFromBlipMessage(revID) - if err == nil { + if err != nil { + // assume we have received legacy rev if we cannot parse hlv from known revs, and we are in vv replication + if versionVectorProtocol { + legacyRev = true + } + } else { // extract cv as string revID = msgHLV.GetCurrentVersionString() } @@ -394,7 +401,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b if deltaSrcRevID != "" && bsc.useHLV() { err = bsc.sendRevAsDelta(ctx, sender, docID, rev, deltaSrcRevID, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx) } else { - err = bsc.sendRevision(ctx, sender, docID, rev, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx) + err = bsc.sendRevision(ctx, sender, docID, rev, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx, legacyRev) } if err != nil { return err @@ -652,11 +659,11 @@ func (bsc *BlipSyncContext) sendNoRev(sender *blip.Sender, docID, revID string, } // Pushes a revision body to the client -func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sender, docID, revID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int) error { +func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sender, docID, revID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int, legacyRev bool) error { var originalErr error var docRev DocumentRevision - if bsc.activeCBMobileSubprotocol <= CBMobileReplicationV3 { + if !bsc.useHLV() { docRev, originalErr = handleChangesResponseCollection.GetRev(ctx, docID, revID, true, nil) } else { // extract CV string rev representation @@ -743,13 +750,19 @@ func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sende bsc.replicationStats.SendReplacementRevCount.Add(1) } var history []string - if bsc.activeCBMobileSubprotocol <= CBMobileReplicationV3 { + if !bsc.useHLV() { history = toHistory(docRev.History, knownRevs, maxHistory) } else { if docRev.hlvHistory != "" { history = append(history, docRev.hlvHistory) } } + if legacyRev { + // append current revID and rest of rev tree after hlv history + revTreeHistory := toHistory(docRev.History, knownRevs, maxHistory) + history = append(history, docRev.RevID) + history = append(history, revTreeHistory...) + } properties := blipRevMessageProperties(history, docRev.Deleted, seq, replacedRevID) if base.LogDebugEnabled(ctx, base.KeySync) { diff --git a/rest/blip_legacy_revid_test.go b/rest/blip_legacy_revid_test.go new file mode 100644 index 0000000000..bb23180cf4 --- /dev/null +++ b/rest/blip_legacy_revid_test.go @@ -0,0 +1,434 @@ +/* +Copyright 2024-Present Couchbase, Inc. + +Use of this software is governed by the Business Source License included in +the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that +file, in accordance with the Business Source License, use of this software will +be governed by the Apache License, Version 2.0, included in the file +licenses/APL2.txt. +*/ + +package rest + +import ( + "encoding/json" + "log" + "strings" + "sync" + "testing" + "time" + + "github.com/couchbase/go-blip" + "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestLegacyProposeChanges: +// - Build propose changes request of docs that are all new to SGW in legacy format +// - Assert that the response is as expected (empty response) +func TestLegacyProposeChanges(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + + proposeChangesRequest := bt.newRequest() + proposeChangesRequest.SetProfile("proposeChanges") + proposeChangesRequest.SetCompressed(true) + + changesBody := ` +[["foo", "1-abc"], +["foo2", "1-abc"]] +` + proposeChangesRequest.SetBody([]byte(changesBody)) + sent := bt.sender.Send(proposeChangesRequest) + assert.True(t, sent) + proposeChangesResponse := proposeChangesRequest.Response() + body, err := proposeChangesResponse.Body() + require.NoError(t, err) + + var changeList [][]interface{} + err = base.JSONUnmarshal(body, &changeList) + require.NoError(t, err) + + assert.Len(t, changeList, 0) +} + +// TestProposeChangesHandlingWithExistingRevs: +// - Build up propose changes request for conflicting and non conflicting docs with legacy revs +// - Assert that the response sent from SGW is as expected +func TestProposeChangesHandlingWithExistingRevs(t *testing.T) { + base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + + resp := rt.PutDoc("conflictingInsert", `{"version":1}`) + conflictingInsertRev := resp.RevTreeID + + resp = rt.PutDoc("conflictingUpdate", `{"version":1}`) + conflictingUpdateRev1 := resp.RevTreeID + conflictingUpdateRev2 := rt.UpdateDocRev("conflictingUpdate", resp.RevTreeID, `{"version":2}`) + + resp = rt.PutDoc("newUpdate", `{"version":1}`) + newUpdateRev1 := resp.RevTreeID + + resp = rt.PutDoc("existingDoc", `{"version":1}`) + existingDocRev := resp.RevTreeID + + type proposeChangesCase struct { + key string + revID string + parentRevID string + expectedValue interface{} + } + + proposeChangesCases := []proposeChangesCase{ + proposeChangesCase{ + key: "conflictingInsert", + revID: "1-abc", + parentRevID: "", + expectedValue: map[string]interface{}{"status": float64(db.ProposedRev_Conflict), "rev": conflictingInsertRev}, + }, + proposeChangesCase{ + key: "newInsert", + revID: "1-abc", + parentRevID: "", + expectedValue: float64(db.ProposedRev_OK), + }, + proposeChangesCase{ + key: "conflictingUpdate", + revID: "2-abc", + parentRevID: conflictingUpdateRev1, + expectedValue: map[string]interface{}{"status": float64(db.ProposedRev_Conflict), "rev": conflictingUpdateRev2}, + }, + proposeChangesCase{ + key: "newUpdate", + revID: "2-abc", + parentRevID: newUpdateRev1, + expectedValue: float64(db.ProposedRev_OK), + }, + proposeChangesCase{ + key: "existingDoc", + revID: existingDocRev, + parentRevID: "", + expectedValue: float64(db.ProposedRev_Exists), + }, + } + + proposeChangesRequest := bt.newRequest() + proposeChangesRequest.SetProfile("proposeChanges") + proposeChangesRequest.SetCompressed(true) + proposeChangesRequest.Properties[db.ProposeChangesConflictsIncludeRev] = "true" + + proposedChanges := make([][]interface{}, 0) + for _, c := range proposeChangesCases { + changeEntry := []interface{}{ + c.key, + c.revID, + } + if c.parentRevID != "" { + changeEntry = append(changeEntry, c.parentRevID) + } + proposedChanges = append(proposedChanges, changeEntry) + } + proposeChangesBody, marshalErr := json.Marshal(proposedChanges) + require.NoError(t, marshalErr) + + proposeChangesRequest.SetBody(proposeChangesBody) + sent := bt.sender.Send(proposeChangesRequest) + assert.True(t, sent) + proposeChangesResponse := proposeChangesRequest.Response() + bodyReader, err := proposeChangesResponse.BodyReader() + require.NoError(t, err) + + var changeList []interface{} + decoder := base.JSONDecoder(bodyReader) + decodeErr := decoder.Decode(&changeList) + require.NoError(t, decodeErr) + + for i, entry := range changeList { + assert.Equal(t, proposeChangesCases[i].expectedValue, entry) + } +} + +// TestProcessLegacyRev: +// - Create doc on SGW +// - Push new revision of this doc form client in legacy rev mode +// - Assert that the new doc is created and given a new source version pair +// - Send a new rev that SGW hasn;t yet seen unsolicited and assert that the doc is added correctly and given a source version pair +func TestProcessLegacyRev(t *testing.T) { + base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + collection, _ := rt.GetSingleTestDatabaseCollection() + + // add doc to SGW + docVersion := rt.PutDocDirectly("doc1", db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + // Send another rev of same doc + history := []string{rev1ID} + sent, _, _, err := bt.SendRevWithHistory("doc1", "2-bcd", history, []byte(`{"key": "val"}`), blip.Properties{}) + assert.True(t, sent) + assert.NoError(t, err) + require.NoError(t, rt.WaitForVersion("doc1", DocVersion{RevTreeID: "2-bcd"})) + + // assert we can fetch this doc rev + resp := rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1?rev=2-bcd", "") + RequireStatus(t, resp, 200) + + // assert this legacy doc has been given source version pair + docSource, docVrs := collection.GetDocumentCurrentVersion(t, "doc1") + assert.Equal(t, docVersion.CV.SourceID, docSource) + assert.NotEqual(t, docVersion.CV.Value, docVrs) + + // try new rev to process + _, _, _, err = bt.SendRev( + "foo", + "1-abc", + []byte(`{"key": "val"}`), + blip.Properties{}, + ) + assert.NoError(t, err) + + require.NoError(t, rt.WaitForVersion("foo", DocVersion{RevTreeID: "1-abc"})) + // assert we can fetch this doc rev + resp = rt.SendAdminRequest("GET", "/{{.keyspace}}/foo?rev=1-abc", "") + RequireStatus(t, resp, 200) + + // assert this legacy doc has been given source version pair + docSource, docVrs = collection.GetDocumentCurrentVersion(t, "doc1") + assert.NotEqual(t, "", docSource) + assert.NotEqual(t, uint64(0), docVrs) +} + +// TestChangesResponseLegacyRev: +// - Create doc +// - Update doc through SGW, creating a new revision +// - Send subChanges request and have custom changes handler to force a revID change being constructed +// - Have custom rev handler to assert the subsequent rev message is as expected with cv as rev + full rev +// tree in history. No hlv in history is expected here. +func TestChangesResponseLegacyRev(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + + docVersion := rt.PutDocDirectly("doc1", db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + docVersion2 := rt.UpdateDocDirectly("doc1", docVersion, db.Body{"test": "update"}) + // wait for pending change to avoid flakes where changes feed didn't pick up this change + rt.WaitForPendingChanges() + receivedChangesRequestWg := sync.WaitGroup{} + revsFinishedWg := sync.WaitGroup{} + + bt.blipContext.HandlerForProfile["rev"] = func(request *blip.Message) { + defer revsFinishedWg.Done() + log.Printf("received rev request") + + // assert the rev property contains cv + rev := request.Properties["rev"] + assert.Equal(t, docVersion2.CV.String(), rev) + + // assert that history contain current revID and previous revID + history := request.Properties["history"] + historyList := strings.Split(history, ",") + assert.Len(t, historyList, 2) + assert.Equal(t, docVersion2.RevTreeID, historyList[0]) + assert.Equal(t, docVersion.RevTreeID, historyList[1]) + } + + bt.blipContext.HandlerForProfile["changes"] = func(request *blip.Message) { + + log.Printf("got changes message: %+v", request) + body, err := request.Body() + log.Printf("changes body: %v, err: %v", string(body), err) + + knownRevs := []interface{}{} + + if string(body) != "null" { + var changesReqs [][]interface{} + err = base.JSONUnmarshal(body, &changesReqs) + require.NoError(t, err) + + knownRevs = make([]interface{}, len(changesReqs)) + + for i, changesReq := range changesReqs { + docID := changesReq[1].(string) + revID := changesReq[2].(string) + log.Printf("change: %s %s", docID, revID) + + // fill known rev with revision 1 of doc1, this will replicate a situation where client has legacy rev of + // a document that SGW had a newer version of + knownRevs[i] = []string{rev1ID} + } + } + + if !request.NoReply() { + response := request.Response() + emptyResponseValBytes, err := base.JSONMarshal(knownRevs) + require.NoError(t, err) + response.SetBody(emptyResponseValBytes) + } + receivedChangesRequestWg.Done() + } + + subChangesRequest := bt.newRequest() + subChangesRequest.SetProfile("subChanges") + subChangesRequest.Properties["continuous"] = "false" + sent := bt.sender.Send(subChangesRequest) + assert.True(t, sent) + // changes will be called again with empty changes so hence the wait group of 2 + receivedChangesRequestWg.Add(2) + + // expect 1 rev message + revsFinishedWg.Add(1) + + subChangesResponse := subChangesRequest.Response() + assert.Equal(t, subChangesRequest.SerialNumber(), subChangesResponse.SerialNumber()) + + timeoutErr := WaitWithTimeout(&receivedChangesRequestWg, time.Second*10) + require.NoError(t, timeoutErr, "Timed out waiting") + + timeoutErr = WaitWithTimeout(&revsFinishedWg, time.Second*10) + require.NoError(t, timeoutErr, "Timed out waiting") + +} + +// TestChangesResponseWithHLVInHistory: +// - Create doc +// - Update doc with hlv agent to mock update from a another peer +// - Send subChanges request and have custom changes handler to force a revID change being constructed +// - Have custom rev handler to asser the subsequent rev message is as expected with cv as rev and pv + full rev +// tree in history +func TestChangesResponseWithHLVInHistory(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + collection, ctx := rt.GetSingleTestDatabaseCollection() + + docVersion := rt.PutDocDirectly("doc1", db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + newDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) + require.NoError(t, err) + + agent := db.NewHLVAgent(t, rt.GetSingleDataStore(), "newSource", base.VvXattrName) + _ = agent.UpdateWithHLV(ctx, "doc1", newDoc.Cas, newDoc.HLV) + + // force import + newDoc, err = collection.GetDocument(ctx, "doc1", db.DocUnmarshalAll) + require.NoError(t, err) + // wait for pending change to avoid flakes where changes feed didn't pick up this change + rt.WaitForPendingChanges() + + receivedChangesRequestWg := sync.WaitGroup{} + revsFinishedWg := sync.WaitGroup{} + + bt.blipContext.HandlerForProfile["rev"] = func(request *blip.Message) { + defer revsFinishedWg.Done() + log.Printf("received rev request") + + // assert the rev property contains cv + rev := request.Properties["rev"] + assert.Equal(t, newDoc.HLV.GetCurrentVersionString(), rev) + + // assert that history contain current revID and previous revID + pv of HLV + history := request.Properties["history"] + historyList := strings.Split(history, ",") + assert.Len(t, historyList, 3) + assert.Equal(t, newDoc.CurrentRev, historyList[1]) + assert.Equal(t, docVersion.RevTreeID, historyList[2]) + assert.Equal(t, docVersion.CV.String(), historyList[0]) + } + + bt.blipContext.HandlerForProfile["changes"] = func(request *blip.Message) { + + log.Printf("got changes message: %+v", request) + body, err := request.Body() + log.Printf("changes body: %v, err: %v", string(body), err) + + knownRevs := []interface{}{} + + if string(body) != "null" { + var changesReqs [][]interface{} + err = base.JSONUnmarshal(body, &changesReqs) + require.NoError(t, err) + + knownRevs = make([]interface{}, len(changesReqs)) + + for i, changesReq := range changesReqs { + docID := changesReq[1].(string) + revID := changesReq[2].(string) + log.Printf("change: %s %s", docID, revID) + + // fill known rev with revision 1 of doc1, this will replicate a situation where client has legacy rev of + // a document that SGW had a newer version of + knownRevs[i] = []string{rev1ID} + } + } + + if !request.NoReply() { + response := request.Response() + emptyResponseValBytes, err := base.JSONMarshal(knownRevs) + require.NoError(t, err) + response.SetBody(emptyResponseValBytes) + } + receivedChangesRequestWg.Done() + } + + subChangesRequest := bt.newRequest() + subChangesRequest.SetProfile("subChanges") + subChangesRequest.Properties["continuous"] = "false" + sent := bt.sender.Send(subChangesRequest) + assert.True(t, sent) + // changes will be called again with empty changes so hence the wait group of 2 + receivedChangesRequestWg.Add(2) + + // expect 1 rev message + revsFinishedWg.Add(1) + + subChangesResponse := subChangesRequest.Response() + assert.Equal(t, subChangesRequest.SerialNumber(), subChangesResponse.SerialNumber()) + + timeoutErr := WaitWithTimeout(&receivedChangesRequestWg, time.Second*10) + require.NoError(t, timeoutErr, "Timed out waiting") + + timeoutErr = WaitWithTimeout(&revsFinishedWg, time.Second*10) + require.NoError(t, timeoutErr, "Timed out waiting") +}