Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(jobs): Type-safe arguments passing to jobs from scheduler #11371

Merged
merged 1 commit into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions packages/jobs/src/core/JobManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ export class JobManager<
logger: this.logger,
})

return <T extends Job<TQueues, any[]>>(
job: T,
jobArgs?: Parameters<T['perform']>,
jobOptions?: ScheduleJobOptions,
return <TJob extends Job<TQueues, any[]>>(
job: TJob,
...argsAndOptions: Parameters<TJob['perform']> extends []
? [undefined?, ScheduleJobOptions?]
: [Parameters<TJob['perform']>, ScheduleJobOptions?]
) => {
return scheduler.schedule({ job, jobArgs, jobOptions })
return scheduler.schedule(job, ...argsAndOptions)
}
}

Expand Down
23 changes: 11 additions & 12 deletions packages/jobs/src/core/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ export class Scheduler<TAdapter extends BaseAdapter> {

buildPayload<TJob extends Job<QueueNames, unknown[]>>(
job: TJob,
args?: Parameters<TJob['perform']>,
options?: ScheduleJobOptions,
...argsAndOptions: Parameters<TJob['perform']> extends []
? [undefined?, ScheduleJobOptions?]
: [Parameters<TJob['perform']>, ScheduleJobOptions?]
): SchedulePayload {
const [args, options] = argsAndOptions
const queue = job.queue
const priority = job.priority ?? DEFAULT_PRIORITY
const wait = options?.wait ?? DEFAULT_WAIT
Expand All @@ -72,16 +74,13 @@ export class Scheduler<TAdapter extends BaseAdapter> {
}
}

async schedule<TJob extends Job<QueueNames, unknown[]>>({
job,
jobArgs,
jobOptions,
}: {
job: TJob
jobArgs?: Parameters<TJob['perform']>
jobOptions?: ScheduleJobOptions
}) {
const payload = this.buildPayload(job, jobArgs, jobOptions)
async schedule<TJob extends Job<QueueNames, unknown[]>>(
job: TJob,
...argsAndOptions: Parameters<TJob['perform']> extends []
? [undefined?, ScheduleJobOptions?]
: [Parameters<TJob['perform']>, ScheduleJobOptions?]
) {
const payload = this.buildPayload(job, ...argsAndOptions)

this.logger.info(payload, `[RedwoodJob] Scheduling ${job.name}`)

Expand Down
108 changes: 104 additions & 4 deletions packages/jobs/src/core/__tests__/JobManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,111 @@ describe('createScheduler()', () => {

scheduler(mockJob, mockArgs, mockOptions)

expect(Scheduler.prototype.schedule).toHaveBeenCalledWith({
job: mockJob,
jobArgs: mockArgs,
jobOptions: mockOptions,
expect(Scheduler.prototype.schedule).toHaveBeenCalledWith(
mockJob,
mockArgs,
mockOptions,
)
})

it('returns a function that takes an array of arguments to pass to perform()', () => {
const manager = new JobManager({
adapters: {
mock: mockAdapter,
},
queues: ['default'] as const,
logger: mockLogger,
workers: [],
})

interface MockJobArgs {
foo: string
bar: number
}

const mockJob = manager.createJob({
queue: 'default',
perform: ({ foo, bar }: MockJobArgs) => {
return void (foo + bar)
},
})

const scheduler = manager.createScheduler({ adapter: 'mock' })

// Should be correct. No red squiggly lines
scheduler(mockJob, [{ foo: 'foo', bar: 645 }], { wait: 30 })

// Uncomment the line below and you should see an error because passing
// `undefined` should not be allowed when arguments are required
// scheduler(mockJob, undefined, { wait: 30 })

// Uncomment the line below and you should see an error because not passing
// anything should not be allowed when arguments are required
// scheduler(mockJob)
})

it("returns a function that doesn't need arguments to pass to perform()", () => {
const manager = new JobManager({
adapters: {
mock: mockAdapter,
},
queues: ['default'] as const,
logger: mockLogger,
workers: [],
})

const mockJob = manager.createJob({
queue: 'default',
perform: () => {
return void 'no args'
},
})

const scheduler = manager.createScheduler({ adapter: 'mock' })

// Should be correct
scheduler(mockJob, undefined, { wait: 30 })
// Should be correct
scheduler(mockJob, undefined)
// Should be correct
scheduler(mockJob)

// Uncomment the line below and you'll see an error right now. When the
// job's perform() method doesn't take any arguments you're not allowed to
// pass it an empty array. We could possibly allow this in the future if
// we want to, but for now it's an error.
// scheduler(mockJob, [], { wait: 30 })
})

it('returns a function with only optional arguments to pass to perform()', () => {
const manager = new JobManager({
adapters: {
mock: mockAdapter,
},
queues: ['default'] as const,
logger: mockLogger,
workers: [],
})

const mockJob = manager.createJob({
queue: 'default',
perform: (first?: string, second?: string) => {
return void (first || '' + second)
},
})

const scheduler = manager.createScheduler({ adapter: 'mock' })

// Should be correct
scheduler(mockJob, ['1st', '2nd'])
// Should be correct
scheduler(mockJob, [])

// Uncomment any of the lines below and you'll see an error. Ideally I think
// this should be allowed, because all arguments are optional. But on this
// first iteration I couldn't figure out how to make that work.
// scheduler(mockJob, undefined)
// scheduler(mockJob)
})
})

Expand Down
8 changes: 4 additions & 4 deletions packages/jobs/src/core/__tests__/Scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ describe('schedule()', () => {
wait: 10,
}

await scheduler.schedule({ job, jobArgs: args, jobOptions: options })
await scheduler.schedule(job, args, options)

expect(mockAdapter.schedule).toHaveBeenCalledWith(
expect.objectContaining({
Expand Down Expand Up @@ -240,8 +240,8 @@ describe('schedule()', () => {
wait: 10,
}

await expect(
scheduler.schedule({ job, jobArgs: args, jobOptions: options }),
).rejects.toThrow(errors.SchedulingError)
await expect(scheduler.schedule(job, args, options)).rejects.toThrow(
errors.SchedulingError,
)
})
})
Loading