Skip to content

Commit

Permalink
CBG-2899 CBG-Implement cbgt mgrEventHandlers
Browse files Browse the repository at this point in the history
OnFeedError is required when using a gocbcore feed to trigger reconnection to a feed after receiving an EOF.
  • Loading branch information
adamcfraser committed Apr 26, 2023
1 parent a8924b4 commit dbedd9a
Showing 1 changed file with 55 additions and 5 deletions.
60 changes: 55 additions & 5 deletions base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,7 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG
// avoids file system usage, in conjunction with managerLoadDataDir=false in options.
dataDir := ""

// eventHandlers: SG doesn't currently do any processing on manager events:
// - OnRegisterPIndex
// - OnUnregisterPIndex
// - OnFeedError
var eventHandlers cbgt.ManagerEventHandlers
eventHandlers := &sgMgrEventHandlers{ctx: ctx}

// Specify one feed per pindex
options := make(map[string]string)
Expand Down Expand Up @@ -698,3 +694,57 @@ func GetDefaultImportPartitions(serverless bool) uint16 {
return DefaultImportPartitions
}
}

type sgMgrEventHandlers struct {
ctx context.Context
}

func (meh *sgMgrEventHandlers) OnRefreshManagerOptions(options map[string]string) {
// No-op for SG
}

func (meh *sgMgrEventHandlers) OnRegisterPIndex(pindex *cbgt.PIndex) {
// No-op for SG
}

func (meh *sgMgrEventHandlers) OnUnregisterPIndex(pindex *cbgt.PIndex) {
// No-op for SG
}

// OnFeedError is required to trigger reconnection to a feed on an closed connection (EOF).
// Handling below based on cbft implementation - checks whether the underlying source (bucket)
// still exists with VerifySourceNotExists, and if it exists, calls NotifyMgrOnClose.
// This will trigger cbgt closing and then attempting to reconnect to the feed.
func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, err error) {

DebugfCtx(meh.ctx, KeyDCP, "cbgt Mgr OnFeedError, srcType: %s, feed name: %s, err: %v",
srcType, r.Name(), err)

dcpFeed, ok := r.(cbgt.FeedEx)
if !ok {
return
}

gone, indexUUID, er := dcpFeed.VerifySourceNotExists()
DebugfCtx(meh.ctx, KeyDCP, "cbgt Mgr OnFeedError, VerifySourceNotExists,"+
" srcType: %s, gone: %t, indexUUID: %s, err: %v",
srcType, gone, indexUUID, er)
if !gone {
// If we get an EOF error from the feeds and the bucket is still alive,
// then there could at the least two potential error scenarios.
//
// 1. Faulty kv node is failed over.
// 2. Ephemeral network connection issues with the host.
//
// In either case, the current feed instance turns dangling.
// Hence we can close the feeds so that they get refreshed to fix
// the connectivity problems either during the next rebalance
// (new kv node after failover-recovery rebalance) or
// on the next janitor work cycle(ephemeral network issue to the same node).
if strings.Contains(err.Error(), "EOF") {
InfofCtx(meh.ctx, KeyDCP, "Handling EOF on cbgt feed - notifying manager to trigger reconnection to feed. indexUUID: %v, err: %v", indexUUID, err)
dcpFeed.NotifyMgrOnClose()
}
}

}

0 comments on commit dbedd9a

Please sign in to comment.