Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: hang the job registry off the server context #72605

Merged
merged 1 commit into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
}

for timer := time.NewTimer(initialDelay); ; timer.Reset(
getWaitPeriod(&s.Settings.SV, s.TestingKnobs)) {
getWaitPeriod(ctx, &s.Settings.SV, s.TestingKnobs)) {
select {
case <-stopper.ShouldQuiesce():
return
Expand Down Expand Up @@ -421,7 +421,9 @@ const recheckEnabledAfterPeriod = 5 * time.Minute
var warnIfPaceTooLow = log.Every(time.Minute)

// Returns duration to wait before scanning system.scheduled_jobs.
func getWaitPeriod(sv *settings.Values, knobs base.ModuleTestingKnobs) time.Duration {
func getWaitPeriod(
ctx context.Context, sv *settings.Values, knobs base.ModuleTestingKnobs,
) time.Duration {
if k, ok := knobs.(*TestingKnobs); ok && k.SchedulerDaemonScanDelay != nil {
return k.SchedulerDaemonScanDelay()
}
Expand All @@ -433,7 +435,7 @@ func getWaitPeriod(sv *settings.Values, knobs base.ModuleTestingKnobs) time.Dura
pace := schedulerPaceSetting.Get(sv)
if pace < minPacePeriod {
if warnIfPaceTooLow.ShouldLog() {
log.Warningf(context.Background(),
log.Warningf(ctx,
"job.scheduler.pace setting too low (%s < %s)", pace, minPacePeriod)
}
pace = minPacePeriod
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,17 @@ func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) {
schedulerEnabledSetting.Override(ctx, sv, false)

// When disabled, we wait 5 minutes before rechecking.
require.EqualValues(t, 5*time.Minute, getWaitPeriod(sv, nil))
require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, nil))
schedulerEnabledSetting.Override(ctx, sv, true)

// When pace is too low, we use something more reasonable.
schedulerPaceSetting.Override(ctx, sv, time.Nanosecond)
require.EqualValues(t, minPacePeriod, getWaitPeriod(sv, nil))
require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, nil))

// Otherwise, we use user specified setting.
pace := 42 * time.Second
schedulerPaceSetting.Override(ctx, sv, pace)
require.EqualValues(t, pace, getWaitPeriod(sv, nil))
require.EqualValues(t, pace, getWaitPeriod(ctx, sv, nil))
}

type recordScheduleExecutor struct {
Expand Down
12 changes: 8 additions & 4 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ const (
// node simply behaves as though its leniency period is 0. Epoch-based
// nodes will see time-based nodes delay the act of stealing a job.
type Registry struct {
serverCtx context.Context

ac log.AmbientContext
stopper *stop.Stopper
db *kv.DB
Expand Down Expand Up @@ -166,6 +168,7 @@ const PreventAdoptionFile = "DISABLE_STARTING_BACKGROUND_JOBS"
// sql.newInternalPlanner. It returns a sql.JobExecCtx, but must be
// coerced into that in the Resumer functions.
func MakeRegistry(
ctx context.Context,
ac log.AmbientContext,
stopper *stop.Stopper,
clock *hlc.Clock,
Expand All @@ -181,6 +184,7 @@ func MakeRegistry(
knobs *TestingKnobs,
) *Registry {
r := &Registry{
serverCtx: ctx,
ac: ac,
stopper: stopper,
clock: clock,
Expand Down Expand Up @@ -245,7 +249,7 @@ func (r *Registry) ID() base.SQLInstanceID {
// makeCtx returns a new context from r's ambient context and an associated
// cancel func.
func (r *Registry) makeCtx() (context.Context, func()) {
return context.WithCancel(r.ac.AnnotateCtx(context.Background()))
return context.WithCancel(r.ac.AnnotateCtx(r.serverCtx))
}

// MakeJobID generates a new job ID.
Expand Down Expand Up @@ -825,7 +829,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
}
})

if err := stopper.RunAsyncTask(context.Background(), "jobs/cancel", func(ctx context.Context) {
if err := stopper.RunAsyncTask(ctx, "jobs/cancel", func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()

Expand All @@ -849,7 +853,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
}); err != nil {
return err
}
if err := stopper.RunAsyncTask(context.Background(), "jobs/gc", func(ctx context.Context) {
if err := stopper.RunAsyncTask(ctx, "jobs/gc", func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()

Expand Down Expand Up @@ -882,7 +886,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
}); err != nil {
return err
}
return stopper.RunAsyncTask(context.Background(), "jobs/adopt", func(ctx context.Context) {
return stopper.RunAsyncTask(ctx, "jobs/adopt", func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()
lc, cleanup := makeLoopController(r.settings, adoptIntervalSetting, r.knobs.IntervalOverrides.Adopt)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {

td := tracedumper.NewTraceDumper(ctx, cfg.InflightTraceDirName, cfg.Settings)
*jobRegistry = *jobs.MakeRegistry(
ctx,
cfg.AmbientCtx,
cfg.stopper,
cfg.clock,
Expand Down