From ddd92b68ae501395b5c15ab7db8c92a649cbd7b4 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 12 Feb 2024 10:00:28 +0000 Subject: [PATCH 1/3] CBG-3775: Fix for memeory issues seen in blipcollection contexts --- db/blip_handler.go | 54 ++++++---------- rest/blip_api_attachment_test.go | 107 +++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 36 deletions(-) diff --git a/db/blip_handler.go b/db/blip_handler.go index 7f116753c6..9db54c7f72 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -58,6 +58,7 @@ type blipHandler struct { *BlipSyncContext db *Database // Handler-specific copy of the BlipSyncContext's blipContextDb collection *DatabaseCollectionWithUser // Handler-specific copy of the BlipSyncContext's collection specific DB + collectionCtx *blipSyncCollectionContext // Sync-specific data for this collection collectionIdx *int // index into BlipSyncContext.collectionMapping for the collection loggingCtx context.Context // inherited from BlipSyncContext.loggingCtx with additional handler-only information (like keyspace) serialNumber uint64 // This blip handler's serial number to differentiate logs w/ other handlers @@ -166,7 +167,12 @@ func collectionBlipHandler(next blipHandlerFunc) blipHandlerFunc { if err != nil { return err } - bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.loggingCtx, bh.collection.DatabaseCollection)) + bh.collectionCtx, err = bh.collections.get(nil) + if err != nil { + bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.loggingCtx, bh.collection.DatabaseCollection)) + bh.collectionCtx, _ = bh.collections.get(nil) + } + //bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.loggingCtx, bh.collection.DatabaseCollection)) return next(bh, bm) } if !bh.collections.hasNamedCollections() { @@ -179,12 +185,12 @@ func collectionBlipHandler(next blipHandlerFunc) blipHandlerFunc { } bh.collectionIdx = &collectionIndex - collectionCtx, err := bh.collections.get(&collectionIndex) + bh.collectionCtx, err = bh.collections.get(&collectionIndex) if err != nil { return base.HTTPErrorf(http.StatusBadRequest, fmt.Sprintf("%s", err)) } bh.collection = &DatabaseCollectionWithUser{ - DatabaseCollection: collectionCtx.dbCollection, + DatabaseCollection: bh.collectionCtx.dbCollection, user: bh.db.user, } bh.loggingCtx = base.CollectionLogCtx(bh.BlipSyncContext.loggingCtx, bh.collection.Name) @@ -260,10 +266,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { } // Ensure that only _one_ subChanges subscription can be open on this blip connection at any given time. SG #3222. - collectionCtx, err := bh.collections.get(bh.collectionIdx) - if err != nil { - return base.HTTPErrorf(http.StatusBadRequest, fmt.Sprintf("%s", err)) - } + collectionCtx := bh.collectionCtx collectionCtx.changesCtxLock.Lock() defer collectionCtx.changesCtxLock.Unlock() if !collectionCtx.activeSubChanges.CASRetry(false, true) { @@ -359,10 +362,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { } func (bh *blipHandler) handleUnsubChanges(rq *blip.Message) error { - collectionCtx, err := bh.collections.get(bh.collectionIdx) - if err != nil { - return err - } + collectionCtx := bh.collectionCtx collectionCtx.changesCtxLock.Lock() defer collectionCtx.changesCtxLock.Unlock() collectionCtx.changesCtxCancel() @@ -632,10 +632,7 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error { return err } - collectionCtx, err := bh.collections.get(bh.collectionIdx) - if err != nil { - return err - } + collectionCtx := bh.collectionCtx bh.logEndpointEntry(rq.Profile(), fmt.Sprintf("#Changes:%d", len(changeList))) if len(changeList) == 0 { @@ -889,17 +886,12 @@ func (bh *blipHandler) handleNoRev(rq *blip.Message) error { base.InfofCtx(bh.loggingCtx, base.KeySyncMsg, "%s: norev for doc %q / %q seq:%q - error: %q - reason: %q", rq.String(), base.UD(docID), revID, seqStr, rq.Properties[NorevMessageError], rq.Properties[NorevMessageReason]) - collectionCtx, err := bh.collections.get(bh.collectionIdx) - if err != nil { - return err - } - - if collectionCtx.sgr2PullProcessedSeqCallback != nil { + if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil { seq, err := ParseJSONSequenceID(seqStr) if err != nil { base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from norev message: %v - not tracking for checkpointing", seqStr, err) } else { - collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID}) + bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID}) } } @@ -971,19 +963,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err return err } - collectionCtx, err := bh.collections.get(bh.collectionIdx) - if err != nil { - return err - } - stats.docsPurgedCount.Add(1) - if collectionCtx.sgr2PullProcessedSeqCallback != nil { + if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil { seqStr := rq.Properties[RevMessageSequence] seq, err := ParseJSONSequenceID(seqStr) if err != nil { base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqStr, err) } else { - collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID}) + bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID}) } } return nil @@ -1193,18 +1180,13 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err return err } - collectionCtx, err := bh.collections.get(bh.collectionIdx) - if err != nil { - return err - } - - if collectionCtx.sgr2PullProcessedSeqCallback != nil { + if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil { seqProperty := rq.Properties[RevMessageSequence] seq, err := ParseJSONSequenceID(seqProperty) if err != nil { base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqProperty, err) } else { - collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID}) + bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID}) } } diff --git a/rest/blip_api_attachment_test.go b/rest/blip_api_attachment_test.go index 69cd0ca280..9603c24f75 100644 --- a/rest/blip_api_attachment_test.go +++ b/rest/blip_api_attachment_test.go @@ -103,6 +103,113 @@ func TestBlipPushPullV2AttachmentV2Client(t *testing.T) { assert.Equal(t, int64(11), rt.GetDatabase().DbStats.CBLReplicationPush().AttachmentPushBytes.Value()) } +// +//func TestGreg(t *testing.T) { +// base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll) +// rtConfig := RestTesterConfig{ +// DatabaseConfig: &DatabaseConfig{ +// DbConfig: DbConfig{ +// AllowConflicts: base.BoolPtr(false), +// DeltaSync: &DeltaSyncConfig{ +// Enabled: base.BoolPtr(true), +// }, +// }, +// }, +// GuestEnabled: true, +// } +// +// const docID = "doc1" +// rt := NewRestTesterDefaultCollection(t, &rtConfig) +// defer rt.Close() +// +// btc, err := NewBlipTesterClientOptsWithRT(t, rt, nil) +// require.NoError(t, err) +// defer btc.Close() +// +// //btc2 := btcRunner.NewBlipTesterClientOptsWithRT(rt, nil) +// //defer btc2.Close() +// +// err = btc.StartPull() +// assert.NoError(t, err) +// +// //err = btcRunner.StartPull(btc2.id) +// //assert.NoError(t, err) +// +// // Create doc revision with attachment on SG. +// bodyText := `{"greetings":[{"hi": "alice"}],"_attachments":{"hello.txt":{"data":"aGVsbG8gd29ybGQ="}}}` +// version := btc.rt.PutDoc(docID, bodyText) +// +// data, ok := btc.WaitForRev(docID, version.Rev) +// assert.True(t, ok) +// bodyTextExpected := `{"greetings":[{"hi":"alice"}],"_attachments":{"hello.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}` +// require.JSONEq(t, bodyTextExpected, string(data)) +// +// for i := 0; i < 50000; i++ { +// subChangesRequest := blip.NewRequest() +// subChangesRequest.SetProfile(db.MessageProposeChanges) +// subChangesRequest.SetBody([]byte("[]")) +// err := btc.pushReplication.sendMsg(subChangesRequest) +// assert.NoError(t, err) +// proposeChangesResponse := subChangesRequest.Response() +// _, err = proposeChangesResponse.Body() +// assert.NoError(t, err, "Error getting changes response body") +// } +// +// // Update the replicated doc at client along with keeping the same attachment stub. +// bodyText = `{"greetings":[{"hi":"bob"}],"_attachments":{"hello.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}` +// rev, err := btc.PushRev(docID, version.Rev, []byte(bodyText)) +// require.NoError(t, err) +// +// // Wait for the document to be replicated at SG +// _, ok = btc.pushReplication.WaitForMessage(2) +// assert.True(t, ok) +// +// resp := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/"+docID+"?rev="+rev, "") +// assert.Equal(t, http.StatusOK, resp.Code) +// var respBody db.Body +// assert.NoError(t, base.JSONUnmarshal(resp.Body.Bytes(), &respBody)) +// +// bodyText = `{"greetings":[{"hi":"bob"}],"_attachments":{"hello111.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}` +// rev, err = btc.PushRev(docID, rev, []byte(bodyText)) +// require.NoError(t, err) +// +// assert.Equal(t, docID, respBody[db.BodyId]) +// greetings := respBody["greetings"].([]interface{}) +// assert.Len(t, greetings, 1) +// assert.Equal(t, map[string]interface{}{"hi": "bob"}, greetings[0]) +// +// attachments, ok := respBody[db.BodyAttachments].(map[string]interface{}) +// require.True(t, ok) +// assert.Len(t, attachments, 1) +// hello, ok := attachments["hello.txt"].(map[string]interface{}) +// require.True(t, ok) +// assert.Equal(t, "sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=", hello["digest"]) +// assert.Equal(t, float64(11), hello["length"]) +// assert.Equal(t, float64(1), hello["revpos"]) +// assert.True(t, hello["stub"].(bool)) +// +// bodyText = `{"greetings":[{"hi": "alice"}],"_attachments":{"hello1.txt":{"data":"aGVsbG8gd29ybGQ="}}}` +// version = btc.rt.PutDoc("doc10", bodyText) +// data, ok = btc.WaitForRev("doc10", version.Rev) +// assert.True(t, ok) +// +// bodyText = `{"greetings":[{"hi": "alice"}],"_attachments":{"hello11.txt":{"data":"aGVsbG8gd29ybGQ="}}}` +// version = btc.rt.PutDoc("doc11", bodyText) +// data, ok = btc.WaitForRev("doc11", version.Rev) +// assert.True(t, ok) +// +// time.Sleep(5 * time.Second) +// +// f, err := os.Create("/Users/gregorynewman-smith/Documents/Mobile/3.1-sync_gateway/sync_gateway/db/prof.out") +// require.NoError(t, err) +// err = pprof.WriteHeapProfile(f) +// require.NoError(t, err) +// +// fileCloseError := f.Close() +// require.NoError(t, fileCloseError) +// +//} + // Test pushing and pulling v2 attachments with v3 client // 1. Create test client. // 2. Start continuous push and pull replication in client From 92725a6bf5270f32afb5529c23f9da0600120007 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 12 Feb 2024 10:03:53 +0000 Subject: [PATCH 2/3] remove repro test --- rest/blip_api_attachment_test.go | 107 ------------------------------- 1 file changed, 107 deletions(-) diff --git a/rest/blip_api_attachment_test.go b/rest/blip_api_attachment_test.go index 9603c24f75..69cd0ca280 100644 --- a/rest/blip_api_attachment_test.go +++ b/rest/blip_api_attachment_test.go @@ -103,113 +103,6 @@ func TestBlipPushPullV2AttachmentV2Client(t *testing.T) { assert.Equal(t, int64(11), rt.GetDatabase().DbStats.CBLReplicationPush().AttachmentPushBytes.Value()) } -// -//func TestGreg(t *testing.T) { -// base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll) -// rtConfig := RestTesterConfig{ -// DatabaseConfig: &DatabaseConfig{ -// DbConfig: DbConfig{ -// AllowConflicts: base.BoolPtr(false), -// DeltaSync: &DeltaSyncConfig{ -// Enabled: base.BoolPtr(true), -// }, -// }, -// }, -// GuestEnabled: true, -// } -// -// const docID = "doc1" -// rt := NewRestTesterDefaultCollection(t, &rtConfig) -// defer rt.Close() -// -// btc, err := NewBlipTesterClientOptsWithRT(t, rt, nil) -// require.NoError(t, err) -// defer btc.Close() -// -// //btc2 := btcRunner.NewBlipTesterClientOptsWithRT(rt, nil) -// //defer btc2.Close() -// -// err = btc.StartPull() -// assert.NoError(t, err) -// -// //err = btcRunner.StartPull(btc2.id) -// //assert.NoError(t, err) -// -// // Create doc revision with attachment on SG. -// bodyText := `{"greetings":[{"hi": "alice"}],"_attachments":{"hello.txt":{"data":"aGVsbG8gd29ybGQ="}}}` -// version := btc.rt.PutDoc(docID, bodyText) -// -// data, ok := btc.WaitForRev(docID, version.Rev) -// assert.True(t, ok) -// bodyTextExpected := `{"greetings":[{"hi":"alice"}],"_attachments":{"hello.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}` -// require.JSONEq(t, bodyTextExpected, string(data)) -// -// for i := 0; i < 50000; i++ { -// subChangesRequest := blip.NewRequest() -// subChangesRequest.SetProfile(db.MessageProposeChanges) -// subChangesRequest.SetBody([]byte("[]")) -// err := btc.pushReplication.sendMsg(subChangesRequest) -// assert.NoError(t, err) -// proposeChangesResponse := subChangesRequest.Response() -// _, err = proposeChangesResponse.Body() -// assert.NoError(t, err, "Error getting changes response body") -// } -// -// // Update the replicated doc at client along with keeping the same attachment stub. -// bodyText = `{"greetings":[{"hi":"bob"}],"_attachments":{"hello.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}` -// rev, err := btc.PushRev(docID, version.Rev, []byte(bodyText)) -// require.NoError(t, err) -// -// // Wait for the document to be replicated at SG -// _, ok = btc.pushReplication.WaitForMessage(2) -// assert.True(t, ok) -// -// resp := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/"+docID+"?rev="+rev, "") -// assert.Equal(t, http.StatusOK, resp.Code) -// var respBody db.Body -// assert.NoError(t, base.JSONUnmarshal(resp.Body.Bytes(), &respBody)) -// -// bodyText = `{"greetings":[{"hi":"bob"}],"_attachments":{"hello111.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}` -// rev, err = btc.PushRev(docID, rev, []byte(bodyText)) -// require.NoError(t, err) -// -// assert.Equal(t, docID, respBody[db.BodyId]) -// greetings := respBody["greetings"].([]interface{}) -// assert.Len(t, greetings, 1) -// assert.Equal(t, map[string]interface{}{"hi": "bob"}, greetings[0]) -// -// attachments, ok := respBody[db.BodyAttachments].(map[string]interface{}) -// require.True(t, ok) -// assert.Len(t, attachments, 1) -// hello, ok := attachments["hello.txt"].(map[string]interface{}) -// require.True(t, ok) -// assert.Equal(t, "sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=", hello["digest"]) -// assert.Equal(t, float64(11), hello["length"]) -// assert.Equal(t, float64(1), hello["revpos"]) -// assert.True(t, hello["stub"].(bool)) -// -// bodyText = `{"greetings":[{"hi": "alice"}],"_attachments":{"hello1.txt":{"data":"aGVsbG8gd29ybGQ="}}}` -// version = btc.rt.PutDoc("doc10", bodyText) -// data, ok = btc.WaitForRev("doc10", version.Rev) -// assert.True(t, ok) -// -// bodyText = `{"greetings":[{"hi": "alice"}],"_attachments":{"hello11.txt":{"data":"aGVsbG8gd29ybGQ="}}}` -// version = btc.rt.PutDoc("doc11", bodyText) -// data, ok = btc.WaitForRev("doc11", version.Rev) -// assert.True(t, ok) -// -// time.Sleep(5 * time.Second) -// -// f, err := os.Create("/Users/gregorynewman-smith/Documents/Mobile/3.1-sync_gateway/sync_gateway/db/prof.out") -// require.NoError(t, err) -// err = pprof.WriteHeapProfile(f) -// require.NoError(t, err) -// -// fileCloseError := f.Close() -// require.NoError(t, fileCloseError) -// -//} - // Test pushing and pulling v2 attachments with v3 client // 1. Create test client. // 2. Start continuous push and pull replication in client From 1ce13b5e71b05500da09be32c103090d82e949ac Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 12 Feb 2024 10:18:43 +0000 Subject: [PATCH 3/3] remove comment --- db/blip_handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/db/blip_handler.go b/db/blip_handler.go index 9db54c7f72..f429cf5891 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -172,7 +172,6 @@ func collectionBlipHandler(next blipHandlerFunc) blipHandlerFunc { bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.loggingCtx, bh.collection.DatabaseCollection)) bh.collectionCtx, _ = bh.collections.get(nil) } - //bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.loggingCtx, bh.collection.DatabaseCollection)) return next(bh, bm) } if !bh.collections.hasNamedCollections() {