Skip to content

Commit

Permalink
Use a file with wx flag to control concurrency
Browse files Browse the repository at this point in the history
Issue: ZENKO-4941
  • Loading branch information
williamlardier committed Dec 23, 2024
1 parent 2f2b59c commit 9339a9a
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions tests/ctst/steps/utils/kubernetes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import fs from 'fs';
import * as path from 'path';
import lockFile from 'proper-lockfile';
import { KubernetesHelper, Utils } from 'cli-testing';
import Zenko from 'world/Zenko';
import {
Expand Down Expand Up @@ -83,24 +82,27 @@ export async function createJobAndWaitForCompletion(
const watchClient = createKubeWatchClient(world);

const lockFilePath = path.join('/tmp', `${jobName}.lock`);
let releaseLock: (() => Promise<void>) | false = false;

if (!fs.existsSync(lockFilePath)) {
fs.writeFileSync(lockFilePath, '');
let lockAquired = false;
let tries = 600;
while (!lockAquired && tries > 0) {
try {
fs.writeFileSync(lockFilePath, 'lock', {
flag: 'wx',
});
lockAquired = true;
} catch {
world.logger.debug(`Failed to acquire lock for job: ${jobName}`, {
tries,
});
}
tries--;
if (!lockAquired) {
await Utils.sleep(1000);
}
}

try {
releaseLock = await lockFile.lock(lockFilePath, {
// Expect the jobs in the queue does not take more than 5 minutes to complete
stale: 10 * 60 * 1000,
// use a linear backoff strategy
retries: {
retries: 610,
factor: 1,
minTimeout: 1000,
maxTimeout: 1000,
},
});
world.logger.debug(`Acquired lock for job: ${jobName}`);

// Read the cron job and prepare the job spec
Expand Down Expand Up @@ -159,11 +161,7 @@ export async function createJobAndWaitForCompletion(
});
throw err;
} finally {
// Ensure the lock is released
if (releaseLock) {
await releaseLock();
world.logger.debug(`Released lock for job: ${jobName}`);
}
fs.unlinkSync(lockFilePath);
}
}

Expand Down

0 comments on commit 9339a9a

Please sign in to comment.