Skip to content

Commit

Permalink
feat: replace multi by lua scripts in moveToFailed (#2958)
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Dec 9, 2024
1 parent dfa3e8b commit c19c914
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 118 deletions.
135 changes: 53 additions & 82 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -643,83 +643,64 @@ export class Job<
token: string,
fetchNext = false,
): Promise<void | any[]> {
const client = await this.queue.client;
const message = err?.message;

this.failedReason = message;

let command: string;
const multi = client.multi();

this.saveStacktrace(multi, err);

//
// Check if an automatic retry should be performed
//
let finishedOn: number;
const [shouldRetry, retryDelay] = await this.shouldRetryJob(err);
if (shouldRetry) {
if (retryDelay) {
const args = this.scripts.moveToDelayedArgs(
this.id,
Date.now(),
token,
retryDelay,
);
this.scripts.execCommand(multi, 'moveToDelayed', args);
command = 'moveToDelayed';
} else {
// Retry immediately
this.scripts.execCommand(
multi,
'retryJob',
this.scripts.retryJobArgs(this.id, this.opts.lifo, token),
);
command = 'retryJob';
}
} else {
const args = this.scripts.moveToFailedArgs(
this,
message,
this.opts.removeOnFail,
token,
fetchNext,
);

this.scripts.execCommand(multi, 'moveToFinished', args);
finishedOn = args[this.scripts.moveToFinishedKeys.length + 1] as number;
command = 'moveToFinished';
}
this.failedReason = err?.message;

return this.queue.trace<Promise<void | any[]>>(
SpanKind.INTERNAL,
this.getSpanOperation(command),
this.getSpanOperation('moveToFailed'),
this.queue.name,
async (span, dstPropagationMedatadata) => {
if (dstPropagationMedatadata) {
this.scripts.execCommand(multi, 'updateJobOption', [
this.toKey(this.id),
'tm',
dstPropagationMedatadata,
]);
}

const results = await multi.exec();
const anyError = results.find(result => result[0]);
if (anyError) {
throw new Error(
`Error "moveToFailed" with command ${command}: ${anyError}`,
let result;

this.updateStacktrace(err);

const fieldsToUpdate = {
failedReason: this.failedReason,
stacktrace: JSON.stringify(this.stacktrace),
tm: dstPropagationMedatadata,
};

//
// Check if an automatic retry should be performed
//
let finishedOn: number;
const [shouldRetry, retryDelay] = await this.shouldRetryJob(err);

if (shouldRetry) {
if (retryDelay) {
// Retry with delay
result = await this.scripts.moveToDelayed(
this.id,
Date.now(),
retryDelay,
token,
{ fieldsToUpdate },
);
} else {
// Retry immediately
result = await this.scripts.retryJob(
this.id,
this.opts.lifo,
token,
{
fieldsToUpdate,
},
);
}
} else {
const args = this.scripts.moveToFailedArgs(
this,
this.failedReason,
this.opts.removeOnFail,
token,
fetchNext,
fieldsToUpdate,
);
}

const result = results[results.length - 1][1] as number;
if (result < 0) {
throw this.scripts.finishedErrors({
code: result,
jobId: this.id,
command,
state: 'active',
});
result = await this.scripts.moveToFinished(this.id, args);
finishedOn = args[
this.scripts.moveToFinishedKeys.length + 1
] as number;
}

if (finishedOn && typeof finishedOn === 'number') {
Expand All @@ -732,9 +713,7 @@ export class Job<

this.attemptsMade += 1;

if (Array.isArray(result)) {
return raw2NextJobData(result);
}
return result;
},
);
}
Expand Down Expand Up @@ -1249,7 +1228,7 @@ export class Job<
}
}

protected saveStacktrace(multi: ChainableCommander, err: Error): void {
protected updateStacktrace(err: Error) {
this.stacktrace = this.stacktrace || [];

if (err?.stack) {
Expand All @@ -1260,14 +1239,6 @@ export class Job<
this.stacktrace = this.stacktrace.slice(-this.opts.stackTraceLimit);
}
}

const args = this.scripts.saveStacktraceArgs(
this.id,
JSON.stringify(this.stacktrace),
err?.message,
);

this.scripts.execCommand(multi, 'saveStacktrace', args);
}
}

Expand Down
71 changes: 56 additions & 15 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ import {
RedisJobOptions,
} from '../types';
import { ErrorCode } from '../enums';
import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils';
import {
array2obj,
getParentKey,
isRedisVersionLowerThan,
objectToFlatArray,
} from '../utils';
import { ChainableCommander } from 'ioredis';
import { version as packageVersion } from '../version';
export type JobData = [JobJsonRaw | number, string?];
Expand Down Expand Up @@ -457,6 +462,23 @@ export class Scripts {
return this.execCommand(client, 'extendLock', args);
}

async extendLocks(
jobIds: string[],
tokens: string[],
duration: number,
): Promise<string[]> {
const client = await this.queue.client;

const args = [
this.queue.keys.stalled,
this.queue.toKey(''),
pack(tokens),
pack(jobIds),
duration,
];
return this.execCommand(client, 'extendLocks', args);
}

async updateData<T = any, R = any, N extends string = string>(
job: MinimalJob<T, R, N>,
data: T,
Expand Down Expand Up @@ -547,6 +569,7 @@ export class Scripts {
token: string,
timestamp: number,
fetchNext = true,
fieldsToUpdate?: Record<string, any>,
): (string | number | boolean | Buffer)[] {
const queueKeys = this.queue.keys;
const opts: WorkerOptions = <WorkerOptions>this.queue.opts;
Expand Down Expand Up @@ -584,6 +607,7 @@ export class Scripts {
idof: !!job.opts?.ignoreDependencyOnFailure,
rdof: !!job.opts?.removeDependencyOnFailure,
}),
fieldsToUpdate ? pack(objectToFlatArray(fieldsToUpdate)) : void 0,
];

return keys.concat(args);
Expand Down Expand Up @@ -787,6 +811,7 @@ export class Scripts {
removeOnFailed: boolean | number | KeepJobs,
token: string,
fetchNext = false,
fieldsToUpdate?: Record<string, any>,
): (string | number | boolean | Buffer)[] {
const timestamp = Date.now();
return this.moveToFinishedArgs(
Expand All @@ -798,6 +823,7 @@ export class Scripts {
token,
timestamp,
fetchNext,
fieldsToUpdate,
);
}

Expand Down Expand Up @@ -916,9 +942,9 @@ export class Scripts {
token: string,
delay: number,
opts: MoveToDelayedOpts = {},
): (string | number)[] {
): (string | number | Buffer)[] {
const queueKeys = this.queue.keys;
const keys: (string | number)[] = [
const keys: (string | number | Buffer)[] = [
queueKeys.marker,
queueKeys.active,
queueKeys.prioritized,
Expand All @@ -936,19 +962,12 @@ export class Scripts {
token,
delay,
opts.skipAttempt ? '1' : '0',
opts.fieldsToUpdate
? pack(objectToFlatArray(opts.fieldsToUpdate))
: void 0,
]);
}

saveStacktraceArgs(
jobId: string,
stacktrace: string,
failedReason: string,
): string[] {
const keys: string[] = [this.queue.toKey(jobId)];

return keys.concat([stacktrace, failedReason]);
}

moveToWaitingChildrenArgs(
jobId: string,
token: string,
Expand Down Expand Up @@ -1106,8 +1125,9 @@ export class Scripts {
jobId: string,
lifo: boolean,
token: string,
): (string | number)[] {
const keys: (string | number)[] = [
fieldsToUpdate?: Record<string, any>,
): (string | number | Buffer)[] {
const keys: (string | number | Buffer)[] = [
this.queue.keys.active,
this.queue.keys.wait,
this.queue.keys.paused,
Expand All @@ -1129,9 +1149,30 @@ export class Scripts {
pushCmd,
jobId,
token,
fieldsToUpdate ? pack(objectToFlatArray(fieldsToUpdate)) : void 0,
]);
}

async retryJob(
jobId: string,
lifo: boolean,
token: string,
fieldsToUpdate?: Record<string, any>,
): Promise<void> {
const client = await this.queue.client;

const args = this.retryJobArgs(jobId, lifo, token, fieldsToUpdate);
const result = await this.execCommand(client, 'retryJob', args);
if (result < 0) {
throw this.finishedErrors({
code: result,
jobId,
command: 'retryJob',
state: 'active',
});
}
}

protected moveJobsToWaitArgs(
state: FinishedStatus | 'delayed',
count: number,
Expand Down
30 changes: 12 additions & 18 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1198,26 +1198,20 @@ will never work with more accuracy than 1ms. */
});

try {
const pipeline = (await this.client).pipeline();
for (const job of jobs) {
await this.scripts.extendLock(
job.id,
job.token,
this.opts.lockDuration,
pipeline,
const erroredJobIds = await this.scripts.extendLocks(
jobs.map(job => job.id),
jobs.map(job => job.token),
this.opts.lockDuration,
);

for (const jobId of erroredJobIds) {
// TODO: Send signal to process function that the job has been lost.

this.emit(
'error',
new Error(`could not renew lock for job ${jobId}`),
);
}
const result = (await pipeline.exec()) as [Error, string][];

for (const [err, jobId] of result) {
if (err) {
// TODO: signal process function that the job has been lost.
this.emit(
'error',
new Error(`could not renew lock for job ${jobId}`),
);
}
}
} catch (err) {
this.emit('error', <Error>err);
}
Expand Down
Loading

0 comments on commit c19c914

Please sign in to comment.