diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index fe72a8b5..3db80759 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/integration-tests-worker +## 1.0.61 + +### Patch Changes + +- Updated dependencies [42883f8] + - @openfn/ws-worker@1.6.7 + ## 1.0.60 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 9039a93d..1056d449 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.60", + "version": "1.0.61", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/lightning-mock/src/server.ts b/packages/lightning-mock/src/server.ts index f85c3462..d22f408d 100644 --- a/packages/lightning-mock/src/server.ts +++ b/packages/lightning-mock/src/server.ts @@ -69,7 +69,6 @@ const createLightningServer = (options: LightningOptions = {}) => { const runPrivateKey = options.runPrivateKey ? fromBase64(options.runPrivateKey) : undefined; - const state = { credentials: {}, runs: {}, diff --git a/packages/lightning-mock/src/start.ts b/packages/lightning-mock/src/start.ts index 9f78a12b..e86ec7c7 100644 --- a/packages/lightning-mock/src/start.ts +++ b/packages/lightning-mock/src/start.ts @@ -32,6 +32,7 @@ const server = createLightningServer({ port: args.port, logger, logLevel: args.log, + runPrivateKey: process.env.WORKER_RUNS_PRIVATE_KEY, }); // add a default credential diff --git a/packages/lightning-mock/src/tokens.ts b/packages/lightning-mock/src/tokens.ts index cb3aa160..da8e380b 100644 --- a/packages/lightning-mock/src/tokens.ts +++ b/packages/lightning-mock/src/tokens.ts @@ -8,7 +8,6 @@ export const generateRunToken = async ( if (privateKey) { try { const alg = 'RS256'; - const key = crypto.createPrivateKey(privateKey); const jwt = await new jose.SignJWT({ id: runId }) diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 60fddaae..ebc95f2d 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,11 @@ # ws-worker +## 1.6.7 + +### Patch Changes + +- 42883f8: Better handliung of claim backoffs when at capacity + ## 1.6.6 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 7b09cac1..5fbe9aa2 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.6.6", + "version": "1.6.7", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/api/claim.ts b/packages/ws-worker/src/api/claim.ts index d0520f38..bb3bdad1 100644 --- a/packages/ws-worker/src/api/claim.ts +++ b/packages/ws-worker/src/api/claim.ts @@ -11,7 +11,6 @@ const mockLogger = createMockLogger(); const verifyToken = async (token: string, publicKey: string) => { const key = crypto.createPublicKey(publicKey); - const { payload } = await jose.jwtVerify(token, key, { issuer: 'Lightning', }); @@ -35,9 +34,8 @@ const claim = ( const activeWorkers = Object.keys(app.workflows).length; if (activeWorkers >= maxWorkers) { - logger.debug( - `skipping claim attempt: server at capacity (${activeWorkers}/${maxWorkers})` - ); + // Important: stop the workloop so that we don't try and claim any more + app.workloop?.stop(`server at capacity (${activeWorkers}/${maxWorkers})`); return reject(new Error('Server at capacity')); } diff --git a/packages/ws-worker/src/api/destroy.ts b/packages/ws-worker/src/api/destroy.ts index fa2743c6..130d92a2 100644 --- a/packages/ws-worker/src/api/destroy.ts +++ b/packages/ws-worker/src/api/destroy.ts @@ -14,7 +14,7 @@ const destroy = async (app: ServerApp, logger: Logger) => { app.destroyed = true; // Immediately stop asking for more work - app.killWorkloop?.(); + app.workloop?.stop('server closed'); app.queueChannel?.leave(); // Shut down the HTTP server diff --git a/packages/ws-worker/src/api/workloop.ts b/packages/ws-worker/src/api/workloop.ts index ea746582..cbe6b7cc 100644 --- a/packages/ws-worker/src/api/workloop.ts +++ b/packages/ws-worker/src/api/workloop.ts @@ -5,13 +5,18 @@ import type { ServerApp } from '../server'; import type { CancelablePromise } from '../types'; import type { Logger } from '@openfn/logger'; +export type Workloop = { + stop: (reason?: string) => void; + isStopped: () => boolean; +}; + const startWorkloop = ( app: ServerApp, logger: Logger, minBackoff: number, maxBackoff: number, maxWorkers?: number -) => { +): Workloop => { let promise: CancelablePromise; let cancelled = false; @@ -37,11 +42,15 @@ const startWorkloop = ( }; workLoop(); - return () => { - logger.debug('cancelling workloop'); - cancelled = true; - promise.cancel(); - app.queueChannel?.leave(); + return { + stop: (reason = 'reason unknown') => { + if (!cancelled) { + logger.info(`cancelling workloop: ${reason}`); + cancelled = true; + promise.cancel(); + } + }, + isStopped: () => cancelled, }; }; diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index cbc294ae..c77fb755 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -8,7 +8,7 @@ import { createMockLogger, Logger } from '@openfn/logger'; import { ClaimRun } from '@openfn/lexicon/lightning'; import { INTERNAL_RUN_COMPLETE } from './events'; import destroy from './api/destroy'; -import startWorkloop from './api/workloop'; +import startWorkloop, { Workloop } from './api/workloop'; import claim from './api/claim'; import { Context, execute } from './api/execute'; import healthcheck from './middleware/healthcheck'; @@ -49,10 +49,11 @@ export interface ServerApp extends Koa { server: Server; engine: RuntimeEngine; options: ServerOptions; + workloop?: Workloop; execute: ({ id, token }: ClaimRun) => Promise; destroy: () => void; - killWorkloop?: () => void; + resumeWorkloop: () => void; } type SocketAndChannel = { @@ -83,17 +84,7 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { app.queueChannel = channel; // trigger the workloop - if (!options.noLoop) { - logger.info('Starting workloop'); - // TODO maybe namespace the workloop logger differently? It's a bit annoying - app.killWorkloop = startWorkloop( - app, - logger, - options.backoff?.min || MIN_BACKOFF, - options.backoff?.max || MAX_BACKOFF, - options.maxWorkflows - ); - } else { + if (options.noLoop) { // @ts-ignore const port = app.server?.address().port; logger.break(); @@ -103,20 +94,21 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { logger.info(` curl -X POST http://localhost:${port}/claim`); logger.break(); } + + app.resumeWorkloop(); }; // We were disconnected from the queue const onDisconnect = () => { - if (app.killWorkloop) { - app.killWorkloop(); - delete app.killWorkloop; - if (!app.destroyed) { - logger.info('Connection to lightning lost'); - logger.info( - 'Worker will automatically reconnect when lightning is back online' - ); - // So far as I know, the socket will try and reconnect in the background forever - } + if (!app.workloop?.isStopped()) { + app.workloop?.stop('Socket disconnected unexpectedly'); + } + if (!app.destroyed) { + logger.info('Connection to lightning lost'); + logger.info( + 'Worker will automatically reconnect when lightning is back online' + ); + // So far as I know, the socket will try and reconnect in the background forever } }; @@ -177,6 +169,25 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { app.options = options; + // Start the workloop (if not already started) + app.resumeWorkloop = () => { + if (options.noLoop) { + return; + } + + if (!app.workloop || app.workloop?.isStopped()) { + logger.info('Starting workloop'); + // TODO maybe namespace the workloop logger differently? It's a bit annoying + app.workloop = startWorkloop( + app, + logger, + options.backoff?.min || MIN_BACKOFF, + options.backoff?.max || MAX_BACKOFF, + options.maxWorkflows + ); + } + }; + // TODO this probably needs to move into ./api/ somewhere app.execute = async ({ id, token }: ClaimRun) => { if (app.socket) { @@ -206,6 +217,8 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { runChannel.leave(); app.events.emit(INTERNAL_RUN_COMPLETE); + + app.resumeWorkloop(); }; const context = execute( runChannel, diff --git a/packages/ws-worker/test/api/workloop.test.ts b/packages/ws-worker/test/api/workloop.test.ts index a1ce6df1..f67e5afb 100644 --- a/packages/ws-worker/test/api/workloop.test.ts +++ b/packages/ws-worker/test/api/workloop.test.ts @@ -6,32 +6,36 @@ import { mockChannel } from '../../src/mock/sockets'; import startWorkloop from '../../src/api/workloop'; import { CLAIM } from '../../src/events'; -let cancel: any; +let workloop: any; const logger = createMockLogger(); test.afterEach(() => { - cancel?.(); // cancel any workloops + workloop?.stop(); // cancel any workloops }); test('workloop can be cancelled', async (t) => { let count = 0; + const app = { + workflows: {}, queueChannel: mockChannel({ [CLAIM]: () => { count++; - cancel(); + workloop.stop(); return { runs: [] }; }, }), execute: () => {}, }; - cancel = startWorkloop(app as any, logger, 1, 1); + workloop = startWorkloop(app as any, logger, 1, 1); + t.false(workloop.isStopped()) await sleep(100); // A quirk of how cancel works is that the loop will be called a few times - t.assert(count <= 5); + t.true(count <= 5); + t.true(workloop.isStopped()) }); test('workloop sends the runs:claim event', (t) => { @@ -47,7 +51,7 @@ test('workloop sends the runs:claim event', (t) => { }), execute: () => {}, }; - cancel = startWorkloop(app as any, logger, 1, 1); + workloop = startWorkloop(app as any, logger, 1, 1); }); }); @@ -68,7 +72,7 @@ test('workloop sends the runs:claim event several times ', (t) => { }), execute: () => {}, }; - cancel = startWorkloop(app as any, logger, 1, 1); + workloop = startWorkloop(app as any, logger, 1, 1); }); }); @@ -88,6 +92,6 @@ test('workloop calls execute if runs:claim returns runs', (t) => { }, }; - cancel = startWorkloop(app as any, logger, 1, 1); + workloop = startWorkloop(app as any, logger, 1, 1); }); }); diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index f42edef7..42ed0223 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -46,6 +46,10 @@ test.before(async () => { // Note that if this is not passed, // JWT verification will be skipped runPublicKey: keys.public, + backoff: { + min: 1, + max: 1000, + } }); }); @@ -106,6 +110,110 @@ test.serial( } ); +test.serial( + `should not claim while at capacity, then resume`, + (t) => { + return new Promise((done) => { + + let runIsActive = false; + let runComplete = false; + let didClaimAfterComplete = false; + + const run = { + id: `a${++rollingRunId}`, + jobs: [ + { + id: 'j', + adaptor: '@openfn/language-common@1.0.0', + body: `fn(() => new Promise((resolve) => { + setTimeout(resolve, 500) + }))`, + }, + ], + }; + + + lng.on(e.CLAIM, () => { + if (runIsActive) { + t.fail('Claimed while run is active') + } + if (runComplete) { + didClaimAfterComplete = true; + } + }); + + lng.onSocketEvent(e.RUN_START, run.id, () => { + runIsActive = true; + }) + + lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { + runIsActive = false; + runComplete = true; + + setTimeout(() => { + t.true(didClaimAfterComplete); + done() + }, 10) + }); + + lng.enqueueRun(run); + }); + } +); + +test.serial( + `should reset backoff after claim`, + (t) => { + return new Promise((done) => { + + let lastClaim = Date.now() + let lastClaimDiff = 0; + + const run = { + id: `a${++rollingRunId}`, + jobs: [ + { + id: 'j', + adaptor: '@openfn/language-common@1.0.0', + body: `fn(() => new Promise((resolve) => { + setTimeout(resolve, 500) + }))`, + }, + ], + }; + + + lng.on(e.CLAIM, () => { + lastClaimDiff = Date.now() - lastClaim; + lastClaim = Date.now() + }); + + lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { + // set this articially high - if there are no more claims, the test will fail + lastClaimDiff = 10000; + + // When the run is finished, the claims should resume + // but with a smaller backoff + setTimeout(() => { + t.log('Backoff after run:', lastClaimDiff) + t.true(lastClaimDiff < 5) + done() + }, 10) + }); + + + setTimeout(() => { + t.log('Backoff before run:', lastClaimDiff) + // let the backoff increase a bit + // the last claim diff should be at least 30ms + t.true(lastClaimDiff > 30) + + lng.enqueueRun(run); + }, 600) + }); + } +); + test.todo('worker should log when a run token is verified'); // Perhaps a workflow exception is the most responsible thing right now