diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index ccf550fad8..7d5272a924 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -316,11 +316,7 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG // avoids file system usage, in conjunction with managerLoadDataDir=false in options. dataDir := "" - // eventHandlers: SG doesn't currently do any processing on manager events: - // - OnRegisterPIndex - // - OnUnregisterPIndex - // - OnFeedError - var eventHandlers cbgt.ManagerEventHandlers + eventHandlers := &sgMgrEventHandlers{ctx: ctx} // Specify one feed per pindex options := make(map[string]string) @@ -698,3 +694,57 @@ func GetDefaultImportPartitions(serverless bool) uint16 { return DefaultImportPartitions } } + +type sgMgrEventHandlers struct { + ctx context.Context +} + +func (meh *sgMgrEventHandlers) OnRefreshManagerOptions(options map[string]string) { + // No-op for SG +} + +func (meh *sgMgrEventHandlers) OnRegisterPIndex(pindex *cbgt.PIndex) { + // No-op for SG +} + +func (meh *sgMgrEventHandlers) OnUnregisterPIndex(pindex *cbgt.PIndex) { + // No-op for SG +} + +// OnFeedError is required to trigger reconnection to a feed on an closed connection (EOF). +// Handling below based on cbft implementation - checks whether the underlying source (bucket) +// still exists with VerifySourceNotExists, and if it exists, calls NotifyMgrOnClose. +// This will trigger cbgt closing and then attempting to reconnect to the feed. +func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, err error) { + + DebugfCtx(meh.ctx, KeyDCP, "cbgt Mgr OnFeedError, srcType: %s, feed name: %s, err: %v", + srcType, r.Name(), err) + + dcpFeed, ok := r.(cbgt.FeedEx) + if !ok { + return + } + + gone, indexUUID, er := dcpFeed.VerifySourceNotExists() + DebugfCtx(meh.ctx, KeyDCP, "cbgt Mgr OnFeedError, VerifySourceNotExists,"+ + " srcType: %s, gone: %t, indexUUID: %s, err: %v", + srcType, gone, indexUUID, er) + if !gone { + // If we get an EOF error from the feeds and the bucket is still alive, + // then there could at the least two potential error scenarios. + // + // 1. Faulty kv node is failed over. + // 2. Ephemeral network connection issues with the host. + // + // In either case, the current feed instance turns dangling. + // Hence we can close the feeds so that they get refreshed to fix + // the connectivity problems either during the next rebalance + // (new kv node after failover-recovery rebalance) or + // on the next janitor work cycle(ephemeral network issue to the same node). + if strings.Contains(err.Error(), "EOF") { + InfofCtx(meh.ctx, KeyDCP, "Handling EOF on cbgt feed - notifying manager to trigger reconnection to feed. indexUUID: %v, err: %v", indexUUID, err) + dcpFeed.NotifyMgrOnClose() + } + } + +}