Skip to content

Commit

Permalink
Worker: limit payload size (#740)
Browse files Browse the repository at this point in the history
* worker: allow a limit on payload size

* changeset

* worker: ensure server default payload limit is set

* tests: worker tests for payload limit

* tests: add test for log limits

* worker: dont log data which exceeds the payload limit

* update changeset

* worker: log the payload limit at the start of a run

* worker: add output_dataclip_error

* fix tests

* worker: rename payload_memory_limit_mb to payload_limit_mb

* versions: worker@1.5.0

* worker: WORKER_MAX_PAYLPAD_MEMORY_MB -> WORKER_MAX_PAYLOAD_MB

* lexicon: fix typing

* tests: update
  • Loading branch information
josephjclark authored Jul 31, 2024
1 parent 5c72cb6 commit 7f14570
Show file tree
Hide file tree
Showing 18 changed files with 408 additions and 40 deletions.
7 changes: 7 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/integration-tests-worker

## 1.0.53

### Patch Changes

- Updated dependencies [f363254]
- @openfn/ws-worker@1.5.0

## 1.0.52

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.52",
"version": "1.0.53",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <admin@openfn.org>",
"license": "ISC",
Expand Down
122 changes: 122 additions & 0 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -787,5 +787,127 @@ test.serial('set a timeout on a run', (t) => {
});
});

test.serial('set a default payload limit on the worker', (t) => {
return new Promise(async (done) => {
if (!worker.destroyed) {
await worker.destroy();
}

({ worker } = await initWorker(
lightningPort,
{
maxWorkers: 1,
// use the dummy repo to remove autoinstall
repoDir: path.resolve('./dummy-repo'),
},
{
payloadLimitMb: 0,
}
));

const run = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/test-adaptor@1.0.0',
body: `fn((s) => ({ data: 'aaaa' }))`,
},
],
};

lightning.once('step:complete', (evt) => {
const { reason, output_dataclip_id, output_dataclip } = evt.payload;
t.is(reason, 'success');
t.falsy(output_dataclip_id);
t.falsy(output_dataclip);

done();
});

lightning.enqueueRun(run);
});
});

test.serial('override the worker payload through run options', (t) => {
return new Promise(async (done) => {
if (!worker.destroyed) {
await worker.destroy();
}

({ worker } = await initWorker(
lightningPort,
{
maxWorkers: 1,
// use the dummy repo to remove autoinstall
repoDir: path.resolve('./dummy-repo'),
},
{ payloadLimitMb: 0 }
));

const run = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/test-adaptor@1.0.0',
body: `fn((s) => ({ data: 'aaaa' }))`,
},
],
options: {
payload_limit_mb: 100,
},
};

lightning.once('step:complete', (evt) => {
const { reason, output_dataclip_id, output_dataclip } = evt.payload;
t.is(reason, 'success');
t.truthy(output_dataclip_id);
t.deepEqual(output_dataclip, JSON.stringify({ data: 'aaaa' }));

done();
});

lightning.enqueueRun(run);
});
});

test.serial('Redact logs which exceed the payload limit', (t) => {
return new Promise(async (done) => {
if (!worker.destroyed) {
await worker.destroy();
}

({ worker } = await initWorker(lightningPort, {
maxWorkers: 1,
// use the dummy repo to remove autoinstall
repoDir: path.resolve('./dummy-repo'),
}));

const run = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/test-adaptor@1.0.0',
body: `fn((s) => { console.log('a'); return s;})`,
},
],
options: {
payload_limit_mb: 0,
},
};

lightning.on('run:log', (evt) => {
if (evt.payload.source === 'JOB') {
t.regex(evt.payload.message[0], /redacted/i);
}
});

lightning.enqueueRun(run);

lightning.once('run:complete', () => {
done();
});
});
});

// REMEMBER the default worker was destroyed at this point!
// If you want to use a worker, you'll have to create your own
5 changes: 3 additions & 2 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ export type LightningPlanOptions = {
start?: StepId;
output_dataclips?: boolean;

// future options
run_memory_limit?: number
run_memory_limit_mb?: number;
payload_limit_mb?: number;
};

/**
Expand Down Expand Up @@ -178,6 +178,7 @@ export type StepCompletePayload = ExitReason & {
step_id: string;
output_dataclip?: string;
output_dataclip_id?: string;
output_dataclip_error?: 'DATACLIP_TOO_LARGE';
thread_id?: string;
mem: {
job: number;
Expand Down
6 changes: 6 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# ws-worker

## 1.5.0

### Minor Changes

- f363254: Allow a payload limit to be set for large dataclips and logs (payload_limit_mb)

## 1.4.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.4.1",
"version": "1.5.0",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
32 changes: 23 additions & 9 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
getWithReply,
createRunState,
throttle as createThrottle,
stringify,
} from '../util';
import {
RUN_COMPLETE,
Expand All @@ -25,6 +26,7 @@ import handleRunError from '../events/run-error';

import type { Channel, RunState, JSONLog } from '../types';
import { WorkerRunOptions } from '../util/convert-lightning-plan';
import ensurePayloadSize from '../util/ensure-payload-size';

const enc = new TextDecoder('utf-8');

Expand Down Expand Up @@ -210,20 +212,32 @@ export function onJobError(context: Context, event: any) {
}
}

export function onJobLog({ channel, state }: Context, event: JSONLog) {
export function onJobLog({ channel, state, options }: Context, event: JSONLog) {
const timeInMicroseconds = BigInt(event.time) / BigInt(1e3);

// lightning-friendly log object
const log: RunLogPayload = {
run_id: state.plan.id!,
let message = event.message;
try {
// The message body, the actual thing that is logged,
// may be always encoded into a string
// may be encoded into a string
// Parse it here before sending on to lightning
// TODO this needs optimising!
message:
typeof event.message === 'string'
? JSON.parse(event.message)
: event.message,
if (typeof event.message === 'string') {
ensurePayloadSize(event.message, options?.payloadLimitMb);
message = JSON.parse(message);
} else if (event.message) {
const payload = stringify(event.message);
ensurePayloadSize(payload, options?.payloadLimitMb);
}
} catch (e) {
message = [
`(Log message redacted: exceeds ${options.payloadLimitMb}mb memory limit)`,
];
}

// lightning-friendly log object
const log: RunLogPayload = {
run_id: state.plan.id!,
message: message,
source: event.name,
level: event.level,
timestamp: timeInMicroseconds.toString(),
Expand Down
11 changes: 10 additions & 1 deletion packages/ws-worker/src/events/run-start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export default async function onRunStart(
context: Context,
event: WorkflowStartPayload
) {
const { channel, state } = context;
const { channel, state, options = {} } = context;
// Cheat on the timestamp time to make sure this is the first thing in the log
const time = (timestamp() - BigInt(10e6)).toString();

Expand All @@ -36,6 +36,15 @@ export default async function onRunStart(

await sendEvent<RunStartPayload>(channel, RUN_START, { versions });

if ('payloadLimitMb' in options) {
await onJobLog(versionLogContext, {
time,
message: [`Payload limit: ${options.payloadLimitMb}mb`],
level: 'info',
name: 'RTE',
});
}

const versionMessage = calculateVersionString(versions);

await onJobLog(versionLogContext, {
Expand Down
51 changes: 34 additions & 17 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import crypto from 'node:crypto';
import type { StepCompletePayload } from '@openfn/lexicon/lightning';
import type { JobCompletePayload } from '@openfn/engine-multi';
import { timestamp } from '@openfn/logger';

import { STEP_COMPLETE } from '../events';
import { stringify } from '../util';
import { calculateJobExitReason } from '../api/reasons';
import { sendEvent, Context } from '../api/execute';
import { sendEvent, onJobLog, Context } from '../api/execute';
import ensurePayloadSize from '../util/ensure-payload-size';

export default function onStepComplete(
{ channel, state, options }: Context,
export default async function onStepComplete(
context: Context,
event: JobCompletePayload,
// TODO this isn't terribly graceful, but accept an error for crashes
error?: any
) {
const { channel, state, options } = context;
const dataclipId = crypto.randomUUID();

const step_id = state.activeStep as string;
Expand Down Expand Up @@ -41,30 +44,44 @@ export default function onStepComplete(
state.inputDataclips[nextJobId] = dataclipId;
});

const { reason, error_message, error_type } = calculateJobExitReason(
job_id,
event.state,
error
);
state.reasons[job_id] = { reason, error_message, error_type };

const evt = {
step_id,
job_id,
output_dataclip_id: dataclipId,

reason,
error_message,
error_type,

mem: event.mem,
duration: event.duration,
thread_id: event.threadId,
} as StepCompletePayload;

if (!options || options.outputDataclips !== false) {
evt.output_dataclip = stringify(outputState);
try {
if (!options || options.outputDataclips !== false) {
const payload = stringify(outputState);
ensurePayloadSize(payload, options?.payloadLimitMb);

// Write the dataclip if it's not too big
evt.output_dataclip = payload;
}
evt.output_dataclip_id = dataclipId;
} catch (e) {
evt.output_dataclip_error = 'DATACLIP_TOO_LARGE';

const time = (timestamp() - BigInt(10e6)).toString();
// If the dataclip is too big, return the step without it
// (the workflow will carry on internally)
await onJobLog(context, {
time,
message: [
'Dataclip too large. This dataclip will not be sent back to lighting.',
],
level: 'info',
name: 'R/T',
});
}

const reason = calculateJobExitReason(job_id, event.state, error);
state.reasons[job_id] = reason;

Object.assign(evt, reason);

return sendEvent<StepCompletePayload>(channel, STEP_COMPLETE, evt);
}
11 changes: 9 additions & 2 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export type ServerOptions = {
min?: number;
max?: number;
};

payloadLimitMb?: number; // max memory limit for socket payload (ie, step:complete, log)
};

// this is the server/koa API
Expand Down Expand Up @@ -164,7 +166,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {

router.get('/', healthcheck);

app.options = options || {};
app.options = options;

// TODO this probably needs to move into ./api/ somewhere
app.execute = async ({ id, token }: ClaimRun) => {
Expand All @@ -174,10 +176,15 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
const {
channel: runChannel,
plan,
options,
options = {},
input,
} = await joinRunChannel(app.socket, token, id, logger);

// Default the payload limit if it's not otherwise set on the run options
if (!('payloadLimitMb' in options)) {
options.payloadLimitMb = app.options.payloadLimitMb;
}

// Callback to be triggered when the work is done (including errors)
const onFinish = () => {
logger.debug(`workflow ${id} complete: releasing worker`);
Expand Down
1 change: 1 addition & 0 deletions packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ function engineReady(engine: any) {
max: maxBackoff,
},
maxWorkflows: args.capacity,
payloadLimitMb: args.payloadMemory,
};

if (args.lightningPublicKey) {
Expand Down
Loading

0 comments on commit 7f14570

Please sign in to comment.