Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(zeebe): add multi-tenant support to workers #175

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions src/__tests__/config/jest.cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,44 @@ export const cleanUp = async () => {
const processIds = (bpmn as any[]).map(
(b) => b?.['bpmn:definitions']?.['bpmn:process']?.['@_id']
)
const operate = new OperateApiClient()
const zeebe = new ZeebeGrpcClient({
config: {
zeebeGrpcSettings: { ZEEBE_CLIENT_LOG_LEVEL: 'NONE' },
},
})
for (const id of processIds) {
if (id) {
const res = await operate.searchProcessInstances({
filter: { bpmnProcessId: id, state: 'ACTIVE' },
})
const instancesKeys = res.items.map((instance) => instance.key)
if (instancesKeys.length > 0) {
console.log(`Cancelling ${instancesKeys.length} instances for ${id}`)
}
for (const key of instancesKeys) {
try {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
// Are we running in a multi-tenant environment?
const multiTenant = !!process.env.CAMUNDA_TENANT_ID
const tenantIds = multiTenant
? ['<default>', 'red', 'green']
: [undefined]
for (const tenantId of tenantIds) {
const operate = new OperateApiClient({
config: {
CAMUNDA_TENANT_ID: tenantId,
},
})
const res = await operate.searchProcessInstances({
filter: { bpmnProcessId: id, state: 'ACTIVE' },
})
const instancesKeys = res.items.map((instance) => instance.key)
if (instancesKeys.length > 0) {
console.log(
`Don't worry about it - Operate is eventually consistent.`
`Cancelling ${instancesKeys.length} instances for ${id} in tenant '${tenantId}'...`
)
}
for (const key of instancesKeys) {
try {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
if (!(e as Error).message.startsWith('5 NOT_FOUND')) {
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
}
}
}
}
}
}
Expand Down
48 changes: 48 additions & 0 deletions src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1o5c8zw" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="multi-tenant-stream-worker-test" name="Multi-tenant Stream Worker Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start multi-tenancy worker test">
<bpmn:outgoing>Flow_0r8p543</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0r8p543" sourceRef="StartEvent_1" targetRef="Activity_1an5aay" />
<bpmn:endEvent id="Event_1hylnf3" name="Multi-tenancy worker test complete">
<bpmn:incoming>Flow_08wm3o9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_08wm3o9" sourceRef="Activity_1an5aay" targetRef="Event_1hylnf3" />
<bpmn:serviceTask id="Activity_1an5aay" name="multi-tenant-stream-work">
<bpmn:extensionElements>
<zeebe:taskDefinition type="multi-tenant-stream-work" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0r8p543</bpmn:incoming>
<bpmn:outgoing>Flow_08wm3o9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="multi-tenant-stream-worker-test">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="160" y="142" width="75" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hylnf3_di" bpmnElement="Event_1hylnf3">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="417" y="142" width="66" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0cx6d07_di" bpmnElement="Activity_1an5aay">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0r8p543_di" bpmnElement="Flow_0r8p543">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_08wm3o9_di" bpmnElement="Flow_08wm3o9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
48 changes: 48 additions & 0 deletions src/__tests__/testdata/multi-tenant-worker-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1o5c8zw" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="multi-tenant-worker-test" name="Multi-tenant Worker Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start multi-tenancy worker test">
<bpmn:outgoing>Flow_0r8p543</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0r8p543" sourceRef="StartEvent_1" targetRef="Activity_1an5aay" />
<bpmn:endEvent id="Event_1hylnf3" name="Multi-tenancy worker test complete">
<bpmn:incoming>Flow_08wm3o9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_08wm3o9" sourceRef="Activity_1an5aay" targetRef="Event_1hylnf3" />
<bpmn:serviceTask id="Activity_1an5aay" name="multi-tenant-work">
<bpmn:extensionElements>
<zeebe:taskDefinition type="multi-tenant-work" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0r8p543</bpmn:incoming>
<bpmn:outgoing>Flow_08wm3o9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="multi-tenant-worker-test">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="160" y="142" width="75" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hylnf3_di" bpmnElement="Event_1hylnf3">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="417" y="142" width="66" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0cx6d07_di" bpmnElement="Activity_1an5aay">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0r8p543_di" bpmnElement="Flow_0r8p543">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_08wm3o9_di" bpmnElement="Flow_08wm3o9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
110 changes: 110 additions & 0 deletions src/__tests__/zeebe/multitenancy/multitenant-worker-mt.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe/index'

jest.setTimeout(10000)

beforeAll(() => {
suppressZeebeLogging()
})

afterAll(() => {
restoreZeebeLogging()
})

test('A worker can be multi-tenant', async () => {
const client = new ZeebeGrpcClient()

await client.deployResource({
processFilename: './src/__tests__/testdata/multi-tenant-worker-test.bpmn',
tenantId: '<default>',
})

await client.deployResource({
processFilename: './src/__tests__/testdata/multi-tenant-worker-test.bpmn',
tenantId: 'green',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-worker-test',
variables: { foo: 'bar' },
tenantId: '<default>',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-worker-test',
variables: { foo: 'bar' },
tenantId: 'green',
})

let greenTenant = false,
defaultTenant = false
await new Promise((resolve) =>
client.createWorker({
taskHandler: (job) => {
greenTenant = greenTenant || job.tenantId === 'green'
defaultTenant = defaultTenant || job.tenantId === '<default>'
if (greenTenant && defaultTenant) {
resolve(null)
}
return job.complete()
},
taskType: 'multi-tenant-work',
tenantIds: ['<default>', 'green'],
})
)

await client.close()
})

test('A stream worker can be multi-tenant', async () => {
const client = new ZeebeGrpcClient()

await client.deployResource({
processFilename:
'./src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn',
tenantId: '<default>',
})

await client.deployResource({
processFilename:
'./src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn',
tenantId: 'green',
})

let greenTenant = false,
defaultTenant = false
// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
client.streamJobs({
taskHandler: async (job) => {
greenTenant = greenTenant || job.tenantId === 'green'
defaultTenant = defaultTenant || job.tenantId === '<default>'
const res = await job.complete()
if (greenTenant && defaultTenant) {
resolve(null)
}
return res
},
type: 'multi-tenant-stream-work',
tenantIds: ['<default>', 'green'],
worker: 'stream-worker',
timeout: 2000,
})

await new Promise((resolve) => setTimeout(resolve, 2000))

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-stream-worker-test',
variables: { foo: 'bar' },
tenantId: '<default>',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-stream-worker-test',
variables: { foo: 'bar' },
tenantId: 'green',
})
})

await client.close()
})
7 changes: 0 additions & 7 deletions src/zeebe/lib/ZBStreamWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ export class ZBStreamWorker implements IZBJobWorker {
stream.on('error', (e) => {
console.error(e)
})
// stream.on('pause', () => console.log('paused'))
// stream.on('metadata', (m) => console.log(m))
// stream.on('readable', () => console.log('readable'))
// stream.on('status', () => console.log('status'))
// stream.on('close', () => console.log('close'))
// stream.on('end', () => console.log('end'))
// stream.on('resume', (n) => console.log('resume', n))
stream.on('data', (res: ActivatedJob) => {
// Make handlers
const job: Job<WorkerInputVariables, CustomHeaderShape> =
Expand Down
8 changes: 5 additions & 3 deletions src/zeebe/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface ZBWorkerConstructorConfig<
inputVariableDto?: { new (...args: any[]): Readonly<WorkerInputVariables> }
// eslint-disable-next-line @typescript-eslint/no-explicit-any
customHeadersDto?: { new (...args: any[]): Readonly<CustomHeaderShape> }
tenantIds: string[] | [string] | undefined
}

export class ZBWorkerBase<
Expand Down Expand Up @@ -101,7 +102,6 @@ export class ZBWorkerBase<
private pollMutex: boolean = false
private backPressureRetryCount: number = 0
private fetchVariable: (keyof WorkerInputVariables)[] | undefined
private tenantId?: string
private inputVariableDto: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (obj: any): WorkerInputVariables
Expand All @@ -110,6 +110,7 @@ export class ZBWorkerBase<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (...args: any[]): CustomHeaderShape
}
private tenantIds: string[] | [string] | undefined

constructor({
grpcClient,
Expand All @@ -121,13 +122,15 @@ export class ZBWorkerBase<
zbClient,
inputVariableDto,
customHeadersDto,
tenantIds,
}: ZBWorkerConstructorConfig<
WorkerInputVariables,
CustomHeaderShape,
WorkerOutputVariables
>) {
super()
options = options || {}
this.tenantIds = tenantIds
if (!taskType) {
throw new Error('Missing taskType')
}
Expand All @@ -146,7 +149,6 @@ export class ZBWorkerBase<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (obj: any): CustomHeaderShape
})
this.tenantId = options.tenantId
this.taskHandler = taskHandler
this.taskType = taskType
this.maxJobsToActivate =
Expand Down Expand Up @@ -560,7 +562,7 @@ You should call only one job action method in the worker handler. This is a bug
type: this.taskType,
worker: this.id,
fetchVariable: this.fetchVariable as string[],
tenantIds: this.tenantId ? [this.tenantId] : undefined,
tenantIds: this.tenantIds,
}

this.logger.logDebug(
Expand Down
8 changes: 8 additions & 0 deletions src/zeebe/lib/interfaces-1.0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ export interface Job<
* All visible variables in the task scope, computed at activation time.
*/
readonly variables: Readonly<Variables>
/**
* TenantId of the job in a multi-tenant cluster
*/
readonly tenantId: string
}

export interface ZBWorkerOptions<InputVars = IInputVariables> {
Expand Down Expand Up @@ -373,6 +377,10 @@ export interface ZBWorkerConfig<
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
customHeadersDto?: { new (...args: any[]): Readonly<CustomHeaderShape> }
/**
* An optional array of tenantIds if you want this to be a multi-tenant worker.
*/
tenantIds?: string[]
}

export interface BroadcastSignalReq {
Expand Down
Loading