Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3775: [3.1.4 Backport] Fix for memeory issues seen in blipcollection contexts #6679

Merged
merged 3 commits into from
Feb 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 17 additions & 36 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type blipHandler struct {
*BlipSyncContext
db *Database // Handler-specific copy of the BlipSyncContext's blipContextDb
collection *DatabaseCollectionWithUser // Handler-specific copy of the BlipSyncContext's collection specific DB
collectionCtx *blipSyncCollectionContext // Sync-specific data for this collection
collectionIdx *int // index into BlipSyncContext.collectionMapping for the collection
loggingCtx context.Context // inherited from BlipSyncContext.loggingCtx with additional handler-only information (like keyspace)
serialNumber uint64 // This blip handler's serial number to differentiate logs w/ other handlers
Expand Down Expand Up @@ -166,7 +167,11 @@ func collectionBlipHandler(next blipHandlerFunc) blipHandlerFunc {
if err != nil {
return err
}
bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.loggingCtx, bh.collection.DatabaseCollection))
bh.collectionCtx, err = bh.collections.get(nil)
if err != nil {
bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.loggingCtx, bh.collection.DatabaseCollection))
bh.collectionCtx, _ = bh.collections.get(nil)
}
return next(bh, bm)
}
if !bh.collections.hasNamedCollections() {
Expand All @@ -179,12 +184,12 @@ func collectionBlipHandler(next blipHandlerFunc) blipHandlerFunc {
}

bh.collectionIdx = &collectionIndex
collectionCtx, err := bh.collections.get(&collectionIndex)
bh.collectionCtx, err = bh.collections.get(&collectionIndex)
if err != nil {
return base.HTTPErrorf(http.StatusBadRequest, fmt.Sprintf("%s", err))
}
bh.collection = &DatabaseCollectionWithUser{
DatabaseCollection: collectionCtx.dbCollection,
DatabaseCollection: bh.collectionCtx.dbCollection,
user: bh.db.user,
}
bh.loggingCtx = base.CollectionLogCtx(bh.BlipSyncContext.loggingCtx, bh.collection.Name)
Expand Down Expand Up @@ -260,10 +265,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
}

// Ensure that only _one_ subChanges subscription can be open on this blip connection at any given time. SG #3222.
collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return base.HTTPErrorf(http.StatusBadRequest, fmt.Sprintf("%s", err))
}
collectionCtx := bh.collectionCtx
collectionCtx.changesCtxLock.Lock()
defer collectionCtx.changesCtxLock.Unlock()
if !collectionCtx.activeSubChanges.CASRetry(false, true) {
Expand Down Expand Up @@ -359,10 +361,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
}

func (bh *blipHandler) handleUnsubChanges(rq *blip.Message) error {
collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}
collectionCtx := bh.collectionCtx
collectionCtx.changesCtxLock.Lock()
defer collectionCtx.changesCtxLock.Unlock()
collectionCtx.changesCtxCancel()
Expand Down Expand Up @@ -632,10 +631,7 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
return err
}

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}
collectionCtx := bh.collectionCtx

bh.logEndpointEntry(rq.Profile(), fmt.Sprintf("#Changes:%d", len(changeList)))
if len(changeList) == 0 {
Expand Down Expand Up @@ -889,17 +885,12 @@ func (bh *blipHandler) handleNoRev(rq *blip.Message) error {
base.InfofCtx(bh.loggingCtx, base.KeySyncMsg, "%s: norev for doc %q / %q seq:%q - error: %q - reason: %q",
rq.String(), base.UD(docID), revID, seqStr, rq.Properties[NorevMessageError], rq.Properties[NorevMessageReason])

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}

if collectionCtx.sgr2PullProcessedSeqCallback != nil {
if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
seq, err := ParseJSONSequenceID(seqStr)
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from norev message: %v - not tracking for checkpointing", seqStr, err)
} else {
collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
}
}

Expand Down Expand Up @@ -971,19 +962,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
return err
}

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}

stats.docsPurgedCount.Add(1)
if collectionCtx.sgr2PullProcessedSeqCallback != nil {
if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
seqStr := rq.Properties[RevMessageSequence]
seq, err := ParseJSONSequenceID(seqStr)
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqStr, err)
} else {
collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
}
}
return nil
Expand Down Expand Up @@ -1193,18 +1179,13 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
return err
}

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}

if collectionCtx.sgr2PullProcessedSeqCallback != nil {
if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
seqProperty := rq.Properties[RevMessageSequence]
seq, err := ParseJSONSequenceID(seqProperty)
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqProperty, err)
} else {
collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
}
}

Expand Down
Loading