diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 3298f9ff6135..8e06445cb82c 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -52,6 +52,10 @@ type changeAggregator struct { spec execinfrapb.ChangeAggregatorSpec memAcc mon.BoundAccount + // checkForNodeDrain is a callback that returns an error + // if this node is being drained. + checkForNodeDrain func() error + // cancel cancels the context passed to all resources created while starting // this aggregator. cancel func() @@ -60,13 +64,6 @@ type changeAggregator struct { // kvFeedDoneCh is closed when the kvfeed exits. kvFeedDoneCh chan struct{} - // drainWatchCh is signaled if the job registry on this node is being - // drained, which is a proxy for the node being drained. If a drain occurs, - // it will be blocked until we allow it to proceed by calling drainDone(). - // This gives the aggregator time to checkpoint before shutting down. - drainWatchCh <-chan struct{} - drainDone func() - // sink is the Sink to write rows to. Resolved timestamps are never written // by changeAggregator. sink EventSink @@ -79,9 +76,6 @@ type changeAggregator struct { resolvedSpanBuf encDatumRowBuffer // lastPush records the time when we last pushed data to the coordinator. lastPush time.Time - // shutdownCheckpointEmitted indicates if aggregator emitted checkpoint - // information during aggregator shutdown. - shutdownCheckpointEmitted bool // recentKVCount contains the number of emits since the last time a resolved // span was forwarded to the frontier @@ -139,19 +133,63 @@ func (o *changeAggregatorLowerBoundOracle) inclusiveLowerBoundTS() hlc.Timestamp var _ execinfra.Processor = &changeAggregator{} var _ execinfra.RowSource = &changeAggregator{} +var aggregatorEmitsShutdownCheckpoint = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "changefeed.shutdown_checkpoint.enabled", + "upon shutdown aggregator attempts to emit an up-to-date checkpoint", + true, +) + +type drainWatcher <-chan struct{} + +func (w drainWatcher) checkForNodeDrain() error { + select { + case <-w: + return changefeedbase.ErrNodeDraining + default: + return nil + } +} + +func (w drainWatcher) enabled() bool { + return w != nil +} + +func makeDrainWatcher(flowCtx *execinfra.FlowCtx) (w drainWatcher, _ func()) { + if !aggregatorEmitsShutdownCheckpoint.Get(&flowCtx.Cfg.Settings.SV) { + // Drain watcher disabled + return nil, func() {} + } + + if cfKnobs, ok := flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok && cfKnobs != nil && cfKnobs.OnDrain != nil { + return cfKnobs.OnDrain(), func() {} + } + return flowCtx.Cfg.JobRegistry.OnDrain() +} + func newChangeAggregatorProcessor( ctx context.Context, flowCtx *execinfra.FlowCtx, processorID int32, spec execinfrapb.ChangeAggregatorSpec, post *execinfrapb.PostProcessSpec, -) (execinfra.Processor, error) { +) (_ execinfra.Processor, retErr error) { + // Setup monitoring for this node drain. + drainWatcher, drainDone := makeDrainWatcher(flowCtx) + defer func() { + if retErr != nil { + drainDone() + } + }() + memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changeagg-mem") ca := &changeAggregator{ - flowCtx: flowCtx, - spec: spec, - memAcc: memMonitor.MakeBoundAccount(), + flowCtx: flowCtx, + spec: spec, + memAcc: memMonitor.MakeBoundAccount(), + checkForNodeDrain: drainWatcher.checkForNodeDrain, } + if err := ca.Init( ctx, ca, @@ -161,9 +199,22 @@ func newChangeAggregatorProcessor( processorID, memMonitor, execinfra.ProcStateOpts{ - TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { + TrailingMetaCallback: func() (producerMeta []execinfrapb.ProducerMetadata) { + defer drainDone() + + if drainWatcher.enabled() { + var meta execinfrapb.ChangefeedMeta + if err := drainWatcher.checkForNodeDrain(); err != nil { + // This node is draining. Indicate so in the trailing metadata. + nodeID, _ := flowCtx.Cfg.NodeID.OptionalNodeID() + meta.DrainInfo = &execinfrapb.ChangefeedMeta_DrainInfo{NodeID: nodeID} + } + ca.computeTrailingMetadata(&meta) + producerMeta = []execinfrapb.ProducerMetadata{{Changefeed: &meta}} + } + ca.close() - return nil + return producerMeta }, }, ); err != nil { @@ -332,26 +383,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { // Generate expensive checkpoint only after we ran for a while. ca.lastSpanFlush = timeutil.Now() - - if ca.knobs.OnDrain != nil { - ca.drainWatchCh = ca.knobs.OnDrain() - } else { - ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain() - } -} - -// checkForNodeDrain returns an error if the node is draining. -func (ca *changeAggregator) checkForNodeDrain() error { - if ca.drainWatchCh == nil { - return errors.AssertionFailedf("cannot check for node drain if" + - " watch channel is nil") - } - select { - case <-ca.drainWatchCh: - return changefeedbase.ErrNodeDraining - default: - return nil - } } func (ca *changeAggregator) startKVFeed( @@ -555,9 +586,6 @@ func (ca *changeAggregator) close() { // Wait for the poller to finish shutting down. ca.waitForKVFeedDone() - if ca.drainDone != nil { - ca.drainDone() - } if ca.eventConsumer != nil { _ = ca.eventConsumer.Close() // context cancellation expected here. } @@ -634,10 +662,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // polling below before the drain actually occurs and starts tearing // things down. if err := ca.checkForNodeDrain(); err != nil { - if shutdownErr := ca.emitShutdownCheckpoint(true /* drain */); shutdownErr != nil { - err = errors.CombineErrors(err, shutdownErr) - } - ca.cancel() + // NB: we do not invoke ca.cancel here -- just merely moving + // to drain state so that the trailing metadata callback + // has a chance to produce shutdown checkpoint. ca.MoveToDraining(err) break } @@ -664,32 +691,26 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet } } - if !ca.shutdownCheckpointEmitted { - // Best effort attempt to emit shutdown checkpoint. - // Even if this call returns an error, we cannot do anything - // with this error (such as ca.MoveToDraining(err)) since the processor - // is already in the "StateTrailingMetadata" (and thus panics if you attempt - // to MoveToDraining again). - _ = ca.emitShutdownCheckpoint(false /* not a node drain */) - } - return nil, ca.DrainHelper() } -func (ca *changeAggregator) emitShutdownCheckpoint(isDrain bool) error { - if !ca.Closed { - // Before emitting shutdown checkpoint, we must flush any buffered events. - // Only do that if the aggregator has not been closed yet since some sink - // implementations may not like requests to flush once they've been closed. - if err := ca.flushBufferedEvents(); err != nil { - return err - } +func (ca *changeAggregator) computeTrailingMetadata(meta *execinfrapb.ChangefeedMeta) { + if ca.eventConsumer == nil || ca.sink == nil { + // Shutdown may be called even if Start didn't succeed. + return } - var meta execinfrapb.ChangefeedMeta - if isDrain { - nodeID, _ := ca.FlowCtx.Cfg.NodeID.OptionalNodeID() - meta.DrainInfo = &execinfrapb.ChangefeedMeta_DrainInfo{NodeID: nodeID} + // Before emitting trailing metadata, we must flush any buffered events. + // Note: we are not flushing KV feed -- blocking buffer may still have buffered + // elements; but we are not interested in flushing potentially large number of events; + // all we want to ensure is that any previously observed event (such as resolved timestamp) + // has been fully processed. + if err := ca.flushBufferedEvents(); err != nil { + // This method may be invoked during shutdown when the context already canceled. + // Regardless for the cause of this error, there is nothing we can do with it anyway. + // All we want to ensure is that if any error occurs we still return correct checkpoint, + // which in this case is nothing. + return } // Build out the list of frontier spans. @@ -701,10 +722,6 @@ func (ca *changeAggregator) emitShutdownCheckpoint(isDrain bool) error { }) return span.ContinueMatch }) - - ca.AppendTrailingMeta(execinfrapb.ProducerMetadata{Changefeed: &meta}) - ca.shutdownCheckpointEmitted = true - return nil } // tick is the workhorse behind Next(). It retrieves the next event from