From fba5443eefb19d653ed83eb5d2fa06a7373bfde9 Mon Sep 17 00:00:00 2001 From: Josh Wulf Date: Fri, 7 Jun 2024 22:03:10 +1200 Subject: [PATCH] feat(zeebe): add multi-tenant support to workers adds tenantIds: string[] to stream and polling worker config fixes #171 --- src/__tests__/config/jest.cleanup.ts | 44 ++++--- .../multi-tenant-stream-worker-test.bpmn | 48 ++++++++ .../testdata/multi-tenant-worker-test.bpmn | 48 ++++++++ .../multitenant-worker-mt.spec.ts | 110 ++++++++++++++++++ src/zeebe/lib/ZBStreamWorker.ts | 7 -- src/zeebe/lib/ZBWorkerBase.ts | 8 +- src/zeebe/lib/interfaces-1.0.ts | 8 ++ src/zeebe/zb/ZeebeGrpcClient.ts | 15 ++- 8 files changed, 260 insertions(+), 28 deletions(-) create mode 100644 src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn create mode 100644 src/__tests__/testdata/multi-tenant-worker-test.bpmn create mode 100644 src/__tests__/zeebe/multitenancy/multitenant-worker-mt.spec.ts diff --git a/src/__tests__/config/jest.cleanup.ts b/src/__tests__/config/jest.cleanup.ts index 780ccf8a..2bdfb119 100644 --- a/src/__tests__/config/jest.cleanup.ts +++ b/src/__tests__/config/jest.cleanup.ts @@ -35,7 +35,6 @@ export const cleanUp = async () => { const processIds = (bpmn as any[]).map( (b) => b?.['bpmn:definitions']?.['bpmn:process']?.['@_id'] ) - const operate = new OperateApiClient() const zeebe = new ZeebeGrpcClient({ config: { zeebeGrpcSettings: { ZEEBE_CLIENT_LOG_LEVEL: 'NONE' }, @@ -43,24 +42,37 @@ export const cleanUp = async () => { }) for (const id of processIds) { if (id) { - const res = await operate.searchProcessInstances({ - filter: { bpmnProcessId: id, state: 'ACTIVE' }, - }) - const instancesKeys = res.items.map((instance) => instance.key) - if (instancesKeys.length > 0) { - console.log(`Cancelling ${instancesKeys.length} instances for ${id}`) - } - for (const key of instancesKeys) { - try { - await zeebe.cancelProcessInstance(key) - console.log(`Cancelled process instance ${key}`) - } catch (e) { - console.log('Failed to cancel process instance', key) - console.log((e as Error).message) + // Are we running in a multi-tenant environment? + const multiTenant = !!process.env.CAMUNDA_TENANT_ID + const tenantIds = multiTenant + ? ['', 'red', 'green'] + : [undefined] + for (const tenantId of tenantIds) { + const operate = new OperateApiClient({ + config: { + CAMUNDA_TENANT_ID: tenantId, + }, + }) + const res = await operate.searchProcessInstances({ + filter: { bpmnProcessId: id, state: 'ACTIVE' }, + }) + const instancesKeys = res.items.map((instance) => instance.key) + if (instancesKeys.length > 0) { console.log( - `Don't worry about it - Operate is eventually consistent.` + `Cancelling ${instancesKeys.length} instances for ${id} in tenant '${tenantId}'...` ) } + for (const key of instancesKeys) { + try { + await zeebe.cancelProcessInstance(key) + console.log(`Cancelled process instance ${key}`) + } catch (e) { + if (!(e as Error).message.startsWith('5 NOT_FOUND')) { + console.log('Failed to cancel process instance', key) + console.log((e as Error).message) + } + } + } } } } diff --git a/src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn b/src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn new file mode 100644 index 00000000..86b6df12 --- /dev/null +++ b/src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn @@ -0,0 +1,48 @@ + + + + + Flow_0r8p543 + + + + Flow_08wm3o9 + + + + + + + Flow_0r8p543 + Flow_08wm3o9 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/__tests__/testdata/multi-tenant-worker-test.bpmn b/src/__tests__/testdata/multi-tenant-worker-test.bpmn new file mode 100644 index 00000000..906b834e --- /dev/null +++ b/src/__tests__/testdata/multi-tenant-worker-test.bpmn @@ -0,0 +1,48 @@ + + + + + Flow_0r8p543 + + + + Flow_08wm3o9 + + + + + + + Flow_0r8p543 + Flow_08wm3o9 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/__tests__/zeebe/multitenancy/multitenant-worker-mt.spec.ts b/src/__tests__/zeebe/multitenancy/multitenant-worker-mt.spec.ts new file mode 100644 index 00000000..ab15b736 --- /dev/null +++ b/src/__tests__/zeebe/multitenancy/multitenant-worker-mt.spec.ts @@ -0,0 +1,110 @@ +import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib' +import { ZeebeGrpcClient } from '../../../zeebe/index' + +jest.setTimeout(10000) + +beforeAll(() => { + suppressZeebeLogging() +}) + +afterAll(() => { + restoreZeebeLogging() +}) + +test('A worker can be multi-tenant', async () => { + const client = new ZeebeGrpcClient() + + await client.deployResource({ + processFilename: './src/__tests__/testdata/multi-tenant-worker-test.bpmn', + tenantId: '', + }) + + await client.deployResource({ + processFilename: './src/__tests__/testdata/multi-tenant-worker-test.bpmn', + tenantId: 'green', + }) + + await client.createProcessInstance({ + bpmnProcessId: 'multi-tenant-worker-test', + variables: { foo: 'bar' }, + tenantId: '', + }) + + await client.createProcessInstance({ + bpmnProcessId: 'multi-tenant-worker-test', + variables: { foo: 'bar' }, + tenantId: 'green', + }) + + let greenTenant = false, + defaultTenant = false + await new Promise((resolve) => + client.createWorker({ + taskHandler: (job) => { + greenTenant = greenTenant || job.tenantId === 'green' + defaultTenant = defaultTenant || job.tenantId === '' + if (greenTenant && defaultTenant) { + resolve(null) + } + return job.complete() + }, + taskType: 'multi-tenant-work', + tenantIds: ['', 'green'], + }) + ) + + await client.close() +}) + +test('A stream worker can be multi-tenant', async () => { + const client = new ZeebeGrpcClient() + + await client.deployResource({ + processFilename: + './src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn', + tenantId: '', + }) + + await client.deployResource({ + processFilename: + './src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn', + tenantId: 'green', + }) + + let greenTenant = false, + defaultTenant = false + // eslint-disable-next-line no-async-promise-executor + await new Promise(async (resolve) => { + client.streamJobs({ + taskHandler: async (job) => { + greenTenant = greenTenant || job.tenantId === 'green' + defaultTenant = defaultTenant || job.tenantId === '' + const res = await job.complete() + if (greenTenant && defaultTenant) { + resolve(null) + } + return res + }, + type: 'multi-tenant-stream-work', + tenantIds: ['', 'green'], + worker: 'stream-worker', + timeout: 2000, + }) + + await new Promise((resolve) => setTimeout(resolve, 2000)) + + await client.createProcessInstance({ + bpmnProcessId: 'multi-tenant-stream-worker-test', + variables: { foo: 'bar' }, + tenantId: '', + }) + + await client.createProcessInstance({ + bpmnProcessId: 'multi-tenant-stream-worker-test', + variables: { foo: 'bar' }, + tenantId: 'green', + }) + }) + + await client.close() +}) diff --git a/src/zeebe/lib/ZBStreamWorker.ts b/src/zeebe/lib/ZBStreamWorker.ts index 9cd18a06..8c1a8fc4 100644 --- a/src/zeebe/lib/ZBStreamWorker.ts +++ b/src/zeebe/lib/ZBStreamWorker.ts @@ -65,13 +65,6 @@ export class ZBStreamWorker implements IZBJobWorker { stream.on('error', (e) => { console.error(e) }) - // stream.on('pause', () => console.log('paused')) - // stream.on('metadata', (m) => console.log(m)) - // stream.on('readable', () => console.log('readable')) - // stream.on('status', () => console.log('status')) - // stream.on('close', () => console.log('close')) - // stream.on('end', () => console.log('end')) - // stream.on('resume', (n) => console.log('resume', n)) stream.on('data', (res: ActivatedJob) => { // Make handlers const job: Job = diff --git a/src/zeebe/lib/ZBWorkerBase.ts b/src/zeebe/lib/ZBWorkerBase.ts index 45f181fe..57b6300d 100644 --- a/src/zeebe/lib/ZBWorkerBase.ts +++ b/src/zeebe/lib/ZBWorkerBase.ts @@ -56,6 +56,7 @@ export interface ZBWorkerConstructorConfig< inputVariableDto?: { new (...args: any[]): Readonly } // eslint-disable-next-line @typescript-eslint/no-explicit-any customHeadersDto?: { new (...args: any[]): Readonly } + tenantIds: string[] | [string] | undefined } export class ZBWorkerBase< @@ -101,7 +102,6 @@ export class ZBWorkerBase< private pollMutex: boolean = false private backPressureRetryCount: number = 0 private fetchVariable: (keyof WorkerInputVariables)[] | undefined - private tenantId?: string private inputVariableDto: { // eslint-disable-next-line @typescript-eslint/no-explicit-any new (obj: any): WorkerInputVariables @@ -110,6 +110,7 @@ export class ZBWorkerBase< // eslint-disable-next-line @typescript-eslint/no-explicit-any new (...args: any[]): CustomHeaderShape } + private tenantIds: string[] | [string] | undefined constructor({ grpcClient, @@ -121,6 +122,7 @@ export class ZBWorkerBase< zbClient, inputVariableDto, customHeadersDto, + tenantIds, }: ZBWorkerConstructorConfig< WorkerInputVariables, CustomHeaderShape, @@ -128,6 +130,7 @@ export class ZBWorkerBase< >) { super() options = options || {} + this.tenantIds = tenantIds if (!taskType) { throw new Error('Missing taskType') } @@ -146,7 +149,6 @@ export class ZBWorkerBase< // eslint-disable-next-line @typescript-eslint/no-explicit-any new (obj: any): CustomHeaderShape }) - this.tenantId = options.tenantId this.taskHandler = taskHandler this.taskType = taskType this.maxJobsToActivate = @@ -560,7 +562,7 @@ You should call only one job action method in the worker handler. This is a bug type: this.taskType, worker: this.id, fetchVariable: this.fetchVariable as string[], - tenantIds: this.tenantId ? [this.tenantId] : undefined, + tenantIds: this.tenantIds, } this.logger.logDebug( diff --git a/src/zeebe/lib/interfaces-1.0.ts b/src/zeebe/lib/interfaces-1.0.ts index b6924fef..31719fac 100644 --- a/src/zeebe/lib/interfaces-1.0.ts +++ b/src/zeebe/lib/interfaces-1.0.ts @@ -269,6 +269,10 @@ export interface Job< * All visible variables in the task scope, computed at activation time. */ readonly variables: Readonly + /** + * TenantId of the job in a multi-tenant cluster + */ + readonly tenantId: string } export interface ZBWorkerOptions { @@ -373,6 +377,10 @@ export interface ZBWorkerConfig< */ // eslint-disable-next-line @typescript-eslint/no-explicit-any customHeadersDto?: { new (...args: any[]): Readonly } + /** + * An optional array of tenantIds if you want this to be a multi-tenant worker. + */ + tenantIds?: string[] } export interface BroadcastSignalReq { diff --git a/src/zeebe/zb/ZeebeGrpcClient.ts b/src/zeebe/zb/ZeebeGrpcClient.ts index 1d3785e0..5ebfbe9c 100644 --- a/src/zeebe/zb/ZeebeGrpcClient.ts +++ b/src/zeebe/zb/ZeebeGrpcClient.ts @@ -444,6 +444,11 @@ export class ZeebeGrpcClient extends TypedEmitter< taskHandler: config.taskHandler, taskType: config.taskType, zbClient: this, + tenantIds: config.tenantIds + ? config.tenantIds + : this.tenantId + ? [this.tenantId] + : undefined, }) this.workers.push(worker) return worker @@ -1118,17 +1123,23 @@ export class ZeebeGrpcClient extends TypedEmitter< /** * * @description Create a worker that uses the StreamActivatedJobs RPC to activate jobs. + * **NOTE**: This will only stream jobs created *after* the worker is started. + * To activate existing jobs, use `activateJobs` or `createWorker`. * @example * ``` * const zbc = new ZB.ZeebeGrpcClient() * - * const zbWorker = zbc.streamJobs({ + * const zbStreamWorker = zbc.streamJobs({ * type: 'demo-service', * worker: 'my-worker-uuid', * taskHandler: myTaskHandler, * timeout: 30000 // 30 seconds * }) * + * .... + * // Close the worker stream when done + * zbStreamWorker.close() + * * // A job handler must return one of job.complete, job.fail, job.error, or job.forward * // Note: unhandled exceptions in the job handler cause the library to call job.fail * async function myTaskHandler(job) { @@ -1169,7 +1180,7 @@ export class ZeebeGrpcClient extends TypedEmitter< >( req: Pick< Grpc.StreamActivatedJobsRequest, - 'type' | 'worker' | 'timeout' + 'type' | 'worker' | 'timeout' | 'tenantIds' > & { inputVariableDto?: { // eslint-disable-next-line @typescript-eslint/no-explicit-any