Skip to content

Commit

Permalink
changefeedccl: Improve shutdown checkpoint logic
Browse files Browse the repository at this point in the history
Rework shutdown checkpoint logic.  This is mostly
a code move PR that transfers the responsibility
for emitting shutdown logic to the trailing metadata
processor callback.  This is done so that there is a
single place where the decision to compute shutdown
checkpint is made, and this computation is done
before any of the contexts are canceled.

Introduce a private setting `changefeed.shutdown_checkpoint.enabled`
in order to disable shutdown checkpoint logic if some
issues are discovered.

Informs #116388

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Dec 18, 2023
1 parent fec5213 commit c6d20be
Showing 1 changed file with 85 additions and 68 deletions.
153 changes: 85 additions & 68 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type changeAggregator struct {
spec execinfrapb.ChangeAggregatorSpec
memAcc mon.BoundAccount

// checkForNodeDrain is a callback that returns an error
// if this node is being drained.
checkForNodeDrain func() error

// cancel cancels the context passed to all resources created while starting
// this aggregator.
cancel func()
Expand All @@ -60,13 +64,6 @@ type changeAggregator struct {
// kvFeedDoneCh is closed when the kvfeed exits.
kvFeedDoneCh chan struct{}

// drainWatchCh is signaled if the job registry on this node is being
// drained, which is a proxy for the node being drained. If a drain occurs,
// it will be blocked until we allow it to proceed by calling drainDone().
// This gives the aggregator time to checkpoint before shutting down.
drainWatchCh <-chan struct{}
drainDone func()

// sink is the Sink to write rows to. Resolved timestamps are never written
// by changeAggregator.
sink EventSink
Expand All @@ -79,9 +76,6 @@ type changeAggregator struct {
resolvedSpanBuf encDatumRowBuffer
// lastPush records the time when we last pushed data to the coordinator.
lastPush time.Time
// shutdownCheckpointEmitted indicates if aggregator emitted checkpoint
// information during aggregator shutdown.
shutdownCheckpointEmitted bool

// recentKVCount contains the number of emits since the last time a resolved
// span was forwarded to the frontier
Expand Down Expand Up @@ -139,19 +133,63 @@ func (o *changeAggregatorLowerBoundOracle) inclusiveLowerBoundTS() hlc.Timestamp
var _ execinfra.Processor = &changeAggregator{}
var _ execinfra.RowSource = &changeAggregator{}

var aggregatorEmitsShutdownCheckpoint = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"changefeed.shutdown_checkpoint.enabled",
"upon shutdown aggregator attempts to emit an up-to-date checkpoint",
true,
)

type drainWatcher <-chan struct{}

func (w drainWatcher) checkForNodeDrain() error {
select {
case <-w:
return changefeedbase.ErrNodeDraining
default:
return nil
}
}

func (w drainWatcher) enabled() bool {
return w != nil
}

func makeDrainWatcher(flowCtx *execinfra.FlowCtx) (w drainWatcher, _ func()) {
if !aggregatorEmitsShutdownCheckpoint.Get(&flowCtx.Cfg.Settings.SV) {
// Drain watcher disabled
return nil, func() {}
}

if cfKnobs, ok := flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok && cfKnobs != nil && cfKnobs.OnDrain != nil {
return cfKnobs.OnDrain(), func() {}
}
return flowCtx.Cfg.JobRegistry.OnDrain()
}

func newChangeAggregatorProcessor(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
processorID int32,
spec execinfrapb.ChangeAggregatorSpec,
post *execinfrapb.PostProcessSpec,
) (execinfra.Processor, error) {
) (_ execinfra.Processor, retErr error) {
// Setup monitoring for this node drain.
drainWatcher, drainDone := makeDrainWatcher(flowCtx)
defer func() {
if retErr != nil {
drainDone()
}
}()

memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changeagg-mem")
ca := &changeAggregator{
flowCtx: flowCtx,
spec: spec,
memAcc: memMonitor.MakeBoundAccount(),
flowCtx: flowCtx,
spec: spec,
memAcc: memMonitor.MakeBoundAccount(),
checkForNodeDrain: drainWatcher.checkForNodeDrain,
}

if err := ca.Init(
ctx,
ca,
Expand All @@ -161,9 +199,22 @@ func newChangeAggregatorProcessor(
processorID,
memMonitor,
execinfra.ProcStateOpts{
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
TrailingMetaCallback: func() (producerMeta []execinfrapb.ProducerMetadata) {
defer drainDone()

if drainWatcher.enabled() {
var meta execinfrapb.ChangefeedMeta
if err := drainWatcher.checkForNodeDrain(); err != nil {
// This node is draining. Indicate so in the trailing metadata.
nodeID, _ := flowCtx.Cfg.NodeID.OptionalNodeID()
meta.DrainInfo = &execinfrapb.ChangefeedMeta_DrainInfo{NodeID: nodeID}
}
ca.computeTrailingMetadata(&meta)
producerMeta = []execinfrapb.ProducerMetadata{{Changefeed: &meta}}
}

ca.close()
return nil
return producerMeta
},
},
); err != nil {
Expand Down Expand Up @@ -332,26 +383,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {

// Generate expensive checkpoint only after we ran for a while.
ca.lastSpanFlush = timeutil.Now()

if ca.knobs.OnDrain != nil {
ca.drainWatchCh = ca.knobs.OnDrain()
} else {
ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain()
}
}

// checkForNodeDrain returns an error if the node is draining.
func (ca *changeAggregator) checkForNodeDrain() error {
if ca.drainWatchCh == nil {
return errors.AssertionFailedf("cannot check for node drain if" +
" watch channel is nil")
}
select {
case <-ca.drainWatchCh:
return changefeedbase.ErrNodeDraining
default:
return nil
}
}

func (ca *changeAggregator) startKVFeed(
Expand Down Expand Up @@ -555,9 +586,6 @@ func (ca *changeAggregator) close() {
// Wait for the poller to finish shutting down.
ca.waitForKVFeedDone()

if ca.drainDone != nil {
ca.drainDone()
}
if ca.eventConsumer != nil {
_ = ca.eventConsumer.Close() // context cancellation expected here.
}
Expand Down Expand Up @@ -634,10 +662,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// polling below before the drain actually occurs and starts tearing
// things down.
if err := ca.checkForNodeDrain(); err != nil {
if shutdownErr := ca.emitShutdownCheckpoint(true /* drain */); shutdownErr != nil {
err = errors.CombineErrors(err, shutdownErr)
}
ca.cancel()
// NB: we do not invoke ca.cancel here -- just merely moving
// to drain state so that the trailing metadata callback
// has a chance to produce shutdown checkpoint.
ca.MoveToDraining(err)
break
}
Expand All @@ -664,32 +691,26 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
}
}

if !ca.shutdownCheckpointEmitted {
// Best effort attempt to emit shutdown checkpoint.
// Even if this call returns an error, we cannot do anything
// with this error (such as ca.MoveToDraining(err)) since the processor
// is already in the "StateTrailingMetadata" (and thus panics if you attempt
// to MoveToDraining again).
_ = ca.emitShutdownCheckpoint(false /* not a node drain */)
}

return nil, ca.DrainHelper()
}

func (ca *changeAggregator) emitShutdownCheckpoint(isDrain bool) error {
if !ca.Closed {
// Before emitting shutdown checkpoint, we must flush any buffered events.
// Only do that if the aggregator has not been closed yet since some sink
// implementations may not like requests to flush once they've been closed.
if err := ca.flushBufferedEvents(); err != nil {
return err
}
func (ca *changeAggregator) computeTrailingMetadata(meta *execinfrapb.ChangefeedMeta) {
if ca.eventConsumer == nil || ca.sink == nil {
// Shutdown may be called even if Start didn't succeed.
return
}

var meta execinfrapb.ChangefeedMeta
if isDrain {
nodeID, _ := ca.FlowCtx.Cfg.NodeID.OptionalNodeID()
meta.DrainInfo = &execinfrapb.ChangefeedMeta_DrainInfo{NodeID: nodeID}
// Before emitting trailing metadata, we must flush any buffered events.
// Note: we are not flushing KV feed -- blocking buffer may still have buffered
// elements; but we are not interested in flushing potentially large number of events;
// all we want to ensure is that any previously observed event (such as resolved timestamp)
// has been fully processed.
if err := ca.flushBufferedEvents(); err != nil {
// This method may be invoked during shutdown when the context already canceled.
// Regardless for the cause of this error, there is nothing we can do with it anyway.
// All we want to ensure is that if any error occurs we still return correct checkpoint,
// which in this case is nothing.
return
}

// Build out the list of frontier spans.
Expand All @@ -701,10 +722,6 @@ func (ca *changeAggregator) emitShutdownCheckpoint(isDrain bool) error {
})
return span.ContinueMatch
})

ca.AppendTrailingMeta(execinfrapb.ProducerMetadata{Changefeed: &meta})
ca.shutdownCheckpointEmitted = true
return nil
}

// tick is the workhorse behind Next(). It retrieves the next event from
Expand Down

0 comments on commit c6d20be

Please sign in to comment.