From 3ae7174ac1d187ea900b5e2f2672cf7fde6649c8 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 10 Nov 2021 13:24:39 +0100 Subject: [PATCH] jobs: hang the job registry off the server context Prior to this patch, log messages related to jobs were disconnected from the server context and were thus missing the node ID and other log tags. This patch fixes it. Release note: None --- pkg/jobs/job_scheduler.go | 8 +++++--- pkg/jobs/job_scheduler_test.go | 6 +++--- pkg/jobs/registry.go | 12 ++++++++---- pkg/server/server_sql.go | 1 + 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 918d1a64faf4..84c13395435f 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -361,7 +361,7 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) { } for timer := time.NewTimer(initialDelay); ; timer.Reset( - getWaitPeriod(&s.Settings.SV, s.TestingKnobs)) { + getWaitPeriod(ctx, &s.Settings.SV, s.TestingKnobs)) { select { case <-stopper.ShouldQuiesce(): return @@ -421,7 +421,9 @@ const recheckEnabledAfterPeriod = 5 * time.Minute var warnIfPaceTooLow = log.Every(time.Minute) // Returns duration to wait before scanning system.scheduled_jobs. -func getWaitPeriod(sv *settings.Values, knobs base.ModuleTestingKnobs) time.Duration { +func getWaitPeriod( + ctx context.Context, sv *settings.Values, knobs base.ModuleTestingKnobs, +) time.Duration { if k, ok := knobs.(*TestingKnobs); ok && k.SchedulerDaemonScanDelay != nil { return k.SchedulerDaemonScanDelay() } @@ -433,7 +435,7 @@ func getWaitPeriod(sv *settings.Values, knobs base.ModuleTestingKnobs) time.Dura pace := schedulerPaceSetting.Get(sv) if pace < minPacePeriod { if warnIfPaceTooLow.ShouldLog() { - log.Warningf(context.Background(), + log.Warningf(ctx, "job.scheduler.pace setting too low (%s < %s)", pace, minPacePeriod) } pace = minPacePeriod diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index 89de07815a85..8936af22a2d6 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -217,17 +217,17 @@ func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) { schedulerEnabledSetting.Override(ctx, sv, false) // When disabled, we wait 5 minutes before rechecking. - require.EqualValues(t, 5*time.Minute, getWaitPeriod(sv, nil)) + require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, nil)) schedulerEnabledSetting.Override(ctx, sv, true) // When pace is too low, we use something more reasonable. schedulerPaceSetting.Override(ctx, sv, time.Nanosecond) - require.EqualValues(t, minPacePeriod, getWaitPeriod(sv, nil)) + require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, nil)) // Otherwise, we use user specified setting. pace := 42 * time.Second schedulerPaceSetting.Override(ctx, sv, pace) - require.EqualValues(t, pace, getWaitPeriod(sv, nil)) + require.EqualValues(t, pace, getWaitPeriod(ctx, sv, nil)) } type recordScheduleExecutor struct { diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 80249d35344c..991a93218d00 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -90,6 +90,8 @@ const ( // node simply behaves as though its leniency period is 0. Epoch-based // nodes will see time-based nodes delay the act of stealing a job. type Registry struct { + serverCtx context.Context + ac log.AmbientContext stopper *stop.Stopper db *kv.DB @@ -166,6 +168,7 @@ const PreventAdoptionFile = "DISABLE_STARTING_BACKGROUND_JOBS" // sql.newInternalPlanner. It returns a sql.JobExecCtx, but must be // coerced into that in the Resumer functions. func MakeRegistry( + ctx context.Context, ac log.AmbientContext, stopper *stop.Stopper, clock *hlc.Clock, @@ -181,6 +184,7 @@ func MakeRegistry( knobs *TestingKnobs, ) *Registry { r := &Registry{ + serverCtx: ctx, ac: ac, stopper: stopper, clock: clock, @@ -245,7 +249,7 @@ func (r *Registry) ID() base.SQLInstanceID { // makeCtx returns a new context from r's ambient context and an associated // cancel func. func (r *Registry) makeCtx() (context.Context, func()) { - return context.WithCancel(r.ac.AnnotateCtx(context.Background())) + return context.WithCancel(r.ac.AnnotateCtx(r.serverCtx)) } // MakeJobID generates a new job ID. @@ -825,7 +829,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { } }) - if err := stopper.RunAsyncTask(context.Background(), "jobs/cancel", func(ctx context.Context) { + if err := stopper.RunAsyncTask(ctx, "jobs/cancel", func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() @@ -849,7 +853,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { }); err != nil { return err } - if err := stopper.RunAsyncTask(context.Background(), "jobs/gc", func(ctx context.Context) { + if err := stopper.RunAsyncTask(ctx, "jobs/gc", func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() @@ -882,7 +886,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { }); err != nil { return err } - return stopper.RunAsyncTask(context.Background(), "jobs/adopt", func(ctx context.Context) { + return stopper.RunAsyncTask(ctx, "jobs/adopt", func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() lc, cleanup := makeLoopController(r.settings, adoptIntervalSetting, r.knobs.IntervalOverrides.Adopt) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index e30402b757e0..052f202f89c4 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -375,6 +375,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { td := tracedumper.NewTraceDumper(ctx, cfg.InflightTraceDirName, cfg.Settings) *jobRegistry = *jobs.MakeRegistry( + ctx, cfg.AmbientCtx, cfg.stopper, cfg.clock,