From 6ef9187c9748629ec3b05eb1e98456a6e713002d Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 26 Sep 2024 11:48:44 +0100 Subject: [PATCH 1/8] worker: rejig workloop and fix capacity backoff bbehaviour --- packages/ws-worker/src/api/claim.ts | 5 +-- packages/ws-worker/src/api/destroy.ts | 2 +- packages/ws-worker/src/api/workloop.ts | 21 ++++++---- packages/ws-worker/src/server.ts | 57 +++++++++++++++----------- 4 files changed, 51 insertions(+), 34 deletions(-) diff --git a/packages/ws-worker/src/api/claim.ts b/packages/ws-worker/src/api/claim.ts index d0520f38c..a9ac0ad1d 100644 --- a/packages/ws-worker/src/api/claim.ts +++ b/packages/ws-worker/src/api/claim.ts @@ -35,9 +35,8 @@ const claim = ( const activeWorkers = Object.keys(app.workflows).length; if (activeWorkers >= maxWorkers) { - logger.debug( - `skipping claim attempt: server at capacity (${activeWorkers}/${maxWorkers})` - ); + // Important: stop the workloop so that we don't try and claim any more + app.workloop?.stop(`server at capacity (${activeWorkers}/${maxWorkers})`); return reject(new Error('Server at capacity')); } diff --git a/packages/ws-worker/src/api/destroy.ts b/packages/ws-worker/src/api/destroy.ts index fa2743c64..130d92a21 100644 --- a/packages/ws-worker/src/api/destroy.ts +++ b/packages/ws-worker/src/api/destroy.ts @@ -14,7 +14,7 @@ const destroy = async (app: ServerApp, logger: Logger) => { app.destroyed = true; // Immediately stop asking for more work - app.killWorkloop?.(); + app.workloop?.stop('server closed'); app.queueChannel?.leave(); // Shut down the HTTP server diff --git a/packages/ws-worker/src/api/workloop.ts b/packages/ws-worker/src/api/workloop.ts index ea7465826..93c46e284 100644 --- a/packages/ws-worker/src/api/workloop.ts +++ b/packages/ws-worker/src/api/workloop.ts @@ -5,13 +5,18 @@ import type { ServerApp } from '../server'; import type { CancelablePromise } from '../types'; import type { Logger } from '@openfn/logger'; +export type Workloop = { + stop: (reason?: string) => void; + isStopped: () => boolean; +} + const startWorkloop = ( app: ServerApp, logger: Logger, minBackoff: number, maxBackoff: number, maxWorkers?: number -) => { +): Workloop => { let promise: CancelablePromise; let cancelled = false; @@ -37,12 +42,14 @@ const startWorkloop = ( }; workLoop(); - return () => { - logger.debug('cancelling workloop'); - cancelled = true; - promise.cancel(); - app.queueChannel?.leave(); - }; + return { + stop: (reason = 'reason unknown') => { + logger.info(`cancelling workloop: ${reason}`); + cancelled = true; + promise.cancel(); + }, + isStopped: () => cancelled + } }; export default startWorkloop; diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index cbc294ae7..44dedd905 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -8,7 +8,7 @@ import { createMockLogger, Logger } from '@openfn/logger'; import { ClaimRun } from '@openfn/lexicon/lightning'; import { INTERNAL_RUN_COMPLETE } from './events'; import destroy from './api/destroy'; -import startWorkloop from './api/workloop'; +import startWorkloop, { Workloop } from './api/workloop'; import claim from './api/claim'; import { Context, execute } from './api/execute'; import healthcheck from './middleware/healthcheck'; @@ -49,10 +49,11 @@ export interface ServerApp extends Koa { server: Server; engine: RuntimeEngine; options: ServerOptions; + workloop?: Workloop; execute: ({ id, token }: ClaimRun) => Promise; destroy: () => void; - killWorkloop?: () => void; + resumeWorkloop: () => void; } type SocketAndChannel = { @@ -83,17 +84,7 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { app.queueChannel = channel; // trigger the workloop - if (!options.noLoop) { - logger.info('Starting workloop'); - // TODO maybe namespace the workloop logger differently? It's a bit annoying - app.killWorkloop = startWorkloop( - app, - logger, - options.backoff?.min || MIN_BACKOFF, - options.backoff?.max || MAX_BACKOFF, - options.maxWorkflows - ); - } else { + if (options.noLoop) { // @ts-ignore const port = app.server?.address().port; logger.break(); @@ -103,20 +94,19 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { logger.info(` curl -X POST http://localhost:${port}/claim`); logger.break(); } + + app.resumeWorkloop() }; // We were disconnected from the queue const onDisconnect = () => { - if (app.killWorkloop) { - app.killWorkloop(); - delete app.killWorkloop; - if (!app.destroyed) { - logger.info('Connection to lightning lost'); - logger.info( - 'Worker will automatically reconnect when lightning is back online' - ); - // So far as I know, the socket will try and reconnect in the background forever - } + app.workloop?.stop('Socket disconnected unexpectedly') + if (!app.destroyed) { + logger.info('Connection to lightning lost'); + logger.info( + 'Worker will automatically reconnect when lightning is back online' + ); + // So far as I know, the socket will try and reconnect in the background forever } }; @@ -177,6 +167,25 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { app.options = options; + // Start the workloop (if not already started) + app.resumeWorkloop = () => { + if (options.noLoop) { + return; + } + + if (!app.workloop || app.workloop?.isStopped()) { + logger.info('Starting workloop'); + // TODO maybe namespace the workloop logger differently? It's a bit annoying + app.workloop = startWorkloop( + app, + logger, + options.backoff?.min || MIN_BACKOFF, + options.backoff?.max || MAX_BACKOFF, + options.maxWorkflows + ); + } + } + // TODO this probably needs to move into ./api/ somewhere app.execute = async ({ id, token }: ClaimRun) => { if (app.socket) { @@ -206,6 +215,8 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { runChannel.leave(); app.events.emit(INTERNAL_RUN_COMPLETE); + + app.resumeWorkloop(); }; const context = execute( runChannel, From cadf42ceb50cc003d62a47c568678315876c44f5 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 26 Sep 2024 12:15:28 +0100 Subject: [PATCH 2/8] mock: fix private key handling for legit run token generation --- packages/lightning-mock/src/server.ts | 1 - packages/lightning-mock/src/start.ts | 1 + packages/lightning-mock/src/tokens.ts | 1 - 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/lightning-mock/src/server.ts b/packages/lightning-mock/src/server.ts index f85c34625..d22f408d3 100644 --- a/packages/lightning-mock/src/server.ts +++ b/packages/lightning-mock/src/server.ts @@ -69,7 +69,6 @@ const createLightningServer = (options: LightningOptions = {}) => { const runPrivateKey = options.runPrivateKey ? fromBase64(options.runPrivateKey) : undefined; - const state = { credentials: {}, runs: {}, diff --git a/packages/lightning-mock/src/start.ts b/packages/lightning-mock/src/start.ts index 9f78a12b2..d9f15186e 100644 --- a/packages/lightning-mock/src/start.ts +++ b/packages/lightning-mock/src/start.ts @@ -32,6 +32,7 @@ const server = createLightningServer({ port: args.port, logger, logLevel: args.log, + runPrivateKey: process.env.WORKER_RUNS_PRIVATE_KEY }); // add a default credential diff --git a/packages/lightning-mock/src/tokens.ts b/packages/lightning-mock/src/tokens.ts index cb3aa1608..da8e380bd 100644 --- a/packages/lightning-mock/src/tokens.ts +++ b/packages/lightning-mock/src/tokens.ts @@ -8,7 +8,6 @@ export const generateRunToken = async ( if (privateKey) { try { const alg = 'RS256'; - const key = crypto.createPrivateKey(privateKey); const jwt = await new jose.SignJWT({ id: runId }) From 96e5ef1ad58b881f3bdd5a63fcac941338741a1b Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 26 Sep 2024 12:21:10 +0100 Subject: [PATCH 3/8] worker: log tweaks --- packages/ws-worker/src/api/claim.ts | 1 - packages/ws-worker/src/api/workloop.ts | 8 +++++--- packages/ws-worker/src/server.ts | 4 +++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/ws-worker/src/api/claim.ts b/packages/ws-worker/src/api/claim.ts index a9ac0ad1d..bb3bdad15 100644 --- a/packages/ws-worker/src/api/claim.ts +++ b/packages/ws-worker/src/api/claim.ts @@ -11,7 +11,6 @@ const mockLogger = createMockLogger(); const verifyToken = async (token: string, publicKey: string) => { const key = crypto.createPublicKey(publicKey); - const { payload } = await jose.jwtVerify(token, key, { issuer: 'Lightning', }); diff --git a/packages/ws-worker/src/api/workloop.ts b/packages/ws-worker/src/api/workloop.ts index 93c46e284..9f5155b9d 100644 --- a/packages/ws-worker/src/api/workloop.ts +++ b/packages/ws-worker/src/api/workloop.ts @@ -44,9 +44,11 @@ const startWorkloop = ( return { stop: (reason = 'reason unknown') => { - logger.info(`cancelling workloop: ${reason}`); - cancelled = true; - promise.cancel(); + if (!cancelled) { + logger.info(`cancelling workloop: ${reason}`); + cancelled = true; + promise.cancel(); + } }, isStopped: () => cancelled } diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index 44dedd905..6e90d68c6 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -100,7 +100,9 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { // We were disconnected from the queue const onDisconnect = () => { - app.workloop?.stop('Socket disconnected unexpectedly') + if (!app.workloop?.isStopped()) { + app.workloop?.stop('Socket disconnected unexpectedly') + } if (!app.destroyed) { logger.info('Connection to lightning lost'); logger.info( From c0a322df6c412015ae2554cec491881ce6291b7f Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 26 Sep 2024 12:49:53 +0100 Subject: [PATCH 4/8] worker: update tests --- packages/ws-worker/test/api/workloop.test.ts | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/packages/ws-worker/test/api/workloop.test.ts b/packages/ws-worker/test/api/workloop.test.ts index a1ce6df1c..f67e5afb4 100644 --- a/packages/ws-worker/test/api/workloop.test.ts +++ b/packages/ws-worker/test/api/workloop.test.ts @@ -6,32 +6,36 @@ import { mockChannel } from '../../src/mock/sockets'; import startWorkloop from '../../src/api/workloop'; import { CLAIM } from '../../src/events'; -let cancel: any; +let workloop: any; const logger = createMockLogger(); test.afterEach(() => { - cancel?.(); // cancel any workloops + workloop?.stop(); // cancel any workloops }); test('workloop can be cancelled', async (t) => { let count = 0; + const app = { + workflows: {}, queueChannel: mockChannel({ [CLAIM]: () => { count++; - cancel(); + workloop.stop(); return { runs: [] }; }, }), execute: () => {}, }; - cancel = startWorkloop(app as any, logger, 1, 1); + workloop = startWorkloop(app as any, logger, 1, 1); + t.false(workloop.isStopped()) await sleep(100); // A quirk of how cancel works is that the loop will be called a few times - t.assert(count <= 5); + t.true(count <= 5); + t.true(workloop.isStopped()) }); test('workloop sends the runs:claim event', (t) => { @@ -47,7 +51,7 @@ test('workloop sends the runs:claim event', (t) => { }), execute: () => {}, }; - cancel = startWorkloop(app as any, logger, 1, 1); + workloop = startWorkloop(app as any, logger, 1, 1); }); }); @@ -68,7 +72,7 @@ test('workloop sends the runs:claim event several times ', (t) => { }), execute: () => {}, }; - cancel = startWorkloop(app as any, logger, 1, 1); + workloop = startWorkloop(app as any, logger, 1, 1); }); }); @@ -88,6 +92,6 @@ test('workloop calls execute if runs:claim returns runs', (t) => { }, }; - cancel = startWorkloop(app as any, logger, 1, 1); + workloop = startWorkloop(app as any, logger, 1, 1); }); }); From 42883f8f1aa2473a2e185335236711e9746db684 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 26 Sep 2024 12:53:56 +0100 Subject: [PATCH 5/8] changeset --- .changeset/red-flies-mix.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/red-flies-mix.md diff --git a/.changeset/red-flies-mix.md b/.changeset/red-flies-mix.md new file mode 100644 index 000000000..846899480 --- /dev/null +++ b/.changeset/red-flies-mix.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': patch +--- + +Better handliung of claim backoffs when at capacity From a2349e8811956d8c84576a2f4c998abed2934ecf Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 26 Sep 2024 13:14:42 +0100 Subject: [PATCH 6/8] worker: tests on backoff --- packages/ws-worker/test/lightning.test.ts | 108 ++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index f42edef77..42ed02230 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -46,6 +46,10 @@ test.before(async () => { // Note that if this is not passed, // JWT verification will be skipped runPublicKey: keys.public, + backoff: { + min: 1, + max: 1000, + } }); }); @@ -106,6 +110,110 @@ test.serial( } ); +test.serial( + `should not claim while at capacity, then resume`, + (t) => { + return new Promise((done) => { + + let runIsActive = false; + let runComplete = false; + let didClaimAfterComplete = false; + + const run = { + id: `a${++rollingRunId}`, + jobs: [ + { + id: 'j', + adaptor: '@openfn/language-common@1.0.0', + body: `fn(() => new Promise((resolve) => { + setTimeout(resolve, 500) + }))`, + }, + ], + }; + + + lng.on(e.CLAIM, () => { + if (runIsActive) { + t.fail('Claimed while run is active') + } + if (runComplete) { + didClaimAfterComplete = true; + } + }); + + lng.onSocketEvent(e.RUN_START, run.id, () => { + runIsActive = true; + }) + + lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { + runIsActive = false; + runComplete = true; + + setTimeout(() => { + t.true(didClaimAfterComplete); + done() + }, 10) + }); + + lng.enqueueRun(run); + }); + } +); + +test.serial( + `should reset backoff after claim`, + (t) => { + return new Promise((done) => { + + let lastClaim = Date.now() + let lastClaimDiff = 0; + + const run = { + id: `a${++rollingRunId}`, + jobs: [ + { + id: 'j', + adaptor: '@openfn/language-common@1.0.0', + body: `fn(() => new Promise((resolve) => { + setTimeout(resolve, 500) + }))`, + }, + ], + }; + + + lng.on(e.CLAIM, () => { + lastClaimDiff = Date.now() - lastClaim; + lastClaim = Date.now() + }); + + lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { + // set this articially high - if there are no more claims, the test will fail + lastClaimDiff = 10000; + + // When the run is finished, the claims should resume + // but with a smaller backoff + setTimeout(() => { + t.log('Backoff after run:', lastClaimDiff) + t.true(lastClaimDiff < 5) + done() + }, 10) + }); + + + setTimeout(() => { + t.log('Backoff before run:', lastClaimDiff) + // let the backoff increase a bit + // the last claim diff should be at least 30ms + t.true(lastClaimDiff > 30) + + lng.enqueueRun(run); + }, 600) + }); + } +); + test.todo('worker should log when a run token is verified'); // Perhaps a workflow exception is the most responsible thing right now From dcc38728fddbd67455b20da4e6c7e3b23462f49e Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 26 Sep 2024 13:59:42 +0100 Subject: [PATCH 7/8] format --- packages/lightning-mock/src/start.ts | 2 +- packages/ws-worker/src/api/workloop.ts | 6 +++--- packages/ws-worker/src/server.ts | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/lightning-mock/src/start.ts b/packages/lightning-mock/src/start.ts index d9f15186e..e86ec7c7f 100644 --- a/packages/lightning-mock/src/start.ts +++ b/packages/lightning-mock/src/start.ts @@ -32,7 +32,7 @@ const server = createLightningServer({ port: args.port, logger, logLevel: args.log, - runPrivateKey: process.env.WORKER_RUNS_PRIVATE_KEY + runPrivateKey: process.env.WORKER_RUNS_PRIVATE_KEY, }); // add a default credential diff --git a/packages/ws-worker/src/api/workloop.ts b/packages/ws-worker/src/api/workloop.ts index 9f5155b9d..cbe6b7cc3 100644 --- a/packages/ws-worker/src/api/workloop.ts +++ b/packages/ws-worker/src/api/workloop.ts @@ -8,7 +8,7 @@ import type { Logger } from '@openfn/logger'; export type Workloop = { stop: (reason?: string) => void; isStopped: () => boolean; -} +}; const startWorkloop = ( app: ServerApp, @@ -50,8 +50,8 @@ const startWorkloop = ( promise.cancel(); } }, - isStopped: () => cancelled - } + isStopped: () => cancelled, + }; }; export default startWorkloop; diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index 6e90d68c6..c77fb755c 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -95,13 +95,13 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { logger.break(); } - app.resumeWorkloop() + app.resumeWorkloop(); }; // We were disconnected from the queue const onDisconnect = () => { if (!app.workloop?.isStopped()) { - app.workloop?.stop('Socket disconnected unexpectedly') + app.workloop?.stop('Socket disconnected unexpectedly'); } if (!app.destroyed) { logger.info('Connection to lightning lost'); @@ -186,7 +186,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { options.maxWorkflows ); } - } + }; // TODO this probably needs to move into ./api/ somewhere app.execute = async ({ id, token }: ClaimRun) => { From 24612d5c46465e7b9602f72d1b55a59447b3ee2f Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 26 Sep 2024 14:36:44 +0100 Subject: [PATCH 8/8] version: worker@1.6.7 --- .changeset/red-flies-mix.md | 5 ----- integration-tests/worker/CHANGELOG.md | 7 +++++++ integration-tests/worker/package.json | 2 +- packages/ws-worker/CHANGELOG.md | 6 ++++++ packages/ws-worker/package.json | 2 +- 5 files changed, 15 insertions(+), 7 deletions(-) delete mode 100644 .changeset/red-flies-mix.md diff --git a/.changeset/red-flies-mix.md b/.changeset/red-flies-mix.md deleted file mode 100644 index 846899480..000000000 --- a/.changeset/red-flies-mix.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@openfn/ws-worker': patch ---- - -Better handliung of claim backoffs when at capacity diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index fe72a8b57..3db80759b 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/integration-tests-worker +## 1.0.61 + +### Patch Changes + +- Updated dependencies [42883f8] + - @openfn/ws-worker@1.6.7 + ## 1.0.60 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 9039a93db..1056d4491 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.60", + "version": "1.0.61", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 60fddaae4..ebc95f2d1 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,11 @@ # ws-worker +## 1.6.7 + +### Patch Changes + +- 42883f8: Better handliung of claim backoffs when at capacity + ## 1.6.6 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 7b09cac12..5fbe9aa29 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.6.6", + "version": "1.6.7", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module",