diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java index 3d965735..d6b017de 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java @@ -214,7 +214,7 @@ public void completeJob( final String variables = request.getVariables(); if (!variables.isEmpty()) { - jobRecord.setVariables(BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); + jobRecord.setVariables(convertVariablesToMessagePack(variables)); } writer.writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata); @@ -371,6 +371,11 @@ public void throwError( jobRecord.setErrorCode(BufferUtil.wrapString(request.getErrorCode())); jobRecord.setErrorMessage(request.getErrorMessage()); + final String variables = request.getVariables(); + if (!variables.isEmpty()) { + jobRecord.setVariables(convertVariablesToMessagePack(variables)); + } + writer.writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata); } @@ -395,8 +400,7 @@ public void publishMessage( messageRecord.setTimeToLive(request.getTimeToLive()); final String variables = request.getVariables(); if (!variables.isEmpty()) { - messageRecord.setVariables( - BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); + messageRecord.setVariables(convertVariablesToMessagePack(variables)); } writer.writeCommandWithoutKey(messageRecord, recordMetadata); @@ -437,8 +441,7 @@ public void setVariables( final String variables = request.getVariables(); if (!variables.isEmpty()) { - variableDocumentRecord.setVariables( - BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); + variableDocumentRecord.setVariables(convertVariablesToMessagePack(variables)); } variableDocumentRecord.setScopeKey(request.getElementInstanceKey()); @@ -591,8 +594,7 @@ public void broadcastSignal( final SignalRecord command = new SignalRecord().setSignalName(request.getSignalName()); if (!request.getVariables().isEmpty()) { - command.setVariables( - BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(request.getVariables()))); + command.setVariables(convertVariablesToMessagePack(request.getVariables())); } writer.writeCommandWithoutKey( @@ -624,9 +626,7 @@ final var record = new ProcessInstanceModificationRecord(); instruction.addVariableInstruction( new ProcessInstanceModificationVariableInstruction() .setElementId(variable.getScopeId()) - .setVariables( - BufferUtil.wrapArray( - MsgPackConverter.convertToMsgPack(variable.getVariables())))); + .setVariables(convertVariablesToMessagePack(variable.getVariables()))); } record.addActivateInstruction(instruction); @@ -663,8 +663,7 @@ private ProcessInstanceCreationRecord createProcessInstanceCreationRecord( final String variables = request.getVariables(); if (!variables.isEmpty()) { - processInstanceCreationRecord.setVariables( - BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); + processInstanceCreationRecord.setVariables(convertVariablesToMessagePack(variables)); } return processInstanceCreationRecord; } @@ -681,12 +680,16 @@ private DecisionEvaluationRecord createDecisionEvaluationRecord( final String variables = request.getVariables(); if (!variables.isEmpty()) { - record.setVariables(BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); + record.setVariables(convertVariablesToMessagePack(variables)); } return record; } + private static DirectBuffer convertVariablesToMessagePack(final String variables) { + return BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables)); + } + public String getAddress() { return "0.0.0.0:" + port; } diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java index 15c354f4..b8078c5e 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java @@ -772,6 +772,7 @@ void shouldThrowErrorOnJob() { .serviceTask("task", (task) -> task.zeebeJobType("jobType")) .boundaryEvent("error") .error("0xCAFE") + .zeebeOutputExpression("error_var", "error_var") .endEvent() .done(), "simpleProcess.bpmn") @@ -782,10 +783,10 @@ void shouldThrowErrorOnJob() { .newCreateInstanceCommand() .bpmnProcessId("simpleProcess") .latestVersion() - .variables(Map.of("test", 1)) .send() .join(); + // when Awaitility.await() .untilAsserted( () -> { @@ -796,7 +797,6 @@ void shouldThrowErrorOnJob() { .maxJobsToActivate(32) .timeout(Duration.ofMinutes(1)) .workerName("yolo") - .fetchVariables(List.of("test")) .send() .join(); @@ -804,34 +804,50 @@ void shouldThrowErrorOnJob() { assertThat(jobs).isNotEmpty(); final ActivatedJob job = jobs.get(0); - // when - then zeebeClient .newThrowErrorCommand(job.getKey()) .errorCode("0xCAFE") .errorMessage("What just happened.") + .variable("error_var", "Out of coffee") .send() .join(); + }); - Awaitility.await() - .untilAsserted( - () -> { - final Optional> boundaryEvent = - StreamSupport.stream( - RecordStream.of(zeebeEngine.getRecordStreamSource()) - .processInstanceRecords() - .spliterator(), - false) - .filter( - r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_COMPLETED) - .filter( - r -> - r.getValue().getBpmnElementType() - == BpmnElementType.BOUNDARY_EVENT) - .filter(r -> r.getValue().getBpmnEventType() == BpmnEventType.ERROR) - .findFirst(); + // then + Awaitility.await() + .untilAsserted( + () -> { + final Optional> boundaryEvent = + StreamSupport.stream( + RecordStream.of(zeebeEngine.getRecordStreamSource()) + .processInstanceRecords() + .spliterator(), + false) + .filter(r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_COMPLETED) + .filter( + r -> r.getValue().getBpmnElementType() == BpmnElementType.BOUNDARY_EVENT) + .filter(r -> r.getValue().getBpmnEventType() == BpmnEventType.ERROR) + .findFirst(); - assertThat(boundaryEvent).isNotEmpty(); - }); + assertThat(boundaryEvent) + .describedAs("Expect that the error boundary event is completed") + .isNotEmpty(); + + final var errorVariable = + StreamSupport.stream( + RecordStream.of(zeebeEngine.getRecordStreamSource()) + .variableRecords() + .spliterator(), + false) + .filter(r -> r.getValue().getName().equals("error_var")) + .findFirst(); + + assertThat(errorVariable) + .describedAs("Expect that the error variable is set") + .isPresent() + .hasValueSatisfying( + record -> + assertThat(record.getValue().getValue()).isEqualTo("\"Out of coffee\"")); }); }