diff --git a/README.md b/README.md index ae9b65c3..4f8277df 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ and provide you with a set of assertions you can use to verify your process beha ### Dependency -Zeepe Process Test provides you with two dependencies. Which one you need to use is dependent on the +Zeebe Process Test provides you with two dependencies. Which one you need to use is dependent on the Java version you are using. #### Embedded (JDK 21+) diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java index f15c758d..3d965735 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java @@ -102,6 +102,7 @@ import io.camunda.zeebe.util.buffer.BufferUtil; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; @@ -151,6 +152,7 @@ public void activateJobs( jobBatchRecord.setWorker(request.getWorker()); jobBatchRecord.setTimeout(request.getTimeout()); jobBatchRecord.setMaxJobsToActivate(request.getMaxJobsToActivate()); + setJobBatchRecordVariables(jobBatchRecord, request.getFetchVariableList()); writer.writeCommandWithoutKey(jobBatchRecord, recordMetadata); } @@ -601,6 +603,14 @@ public void broadcastSignal( .intent(SignalIntent.BROADCAST)); } + private void setJobBatchRecordVariables( + final JobBatchRecord jobBatchRecord, final List fetchVariables) { + final ValueArray variables = jobBatchRecord.variables(); + fetchVariables.stream() + .map(BufferUtil::wrapString) + .forEach(buffer -> variables.add().wrap(buffer)); + } + private ProcessInstanceModificationRecord createProcessInstanceModificationRecord( final ModifyProcessInstanceRequest request) { final var record = new ProcessInstanceModificationRecord(); diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java index fcf962e8..15c354f4 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java @@ -896,6 +896,58 @@ void shouldUpdateRetiresOnJob() { }); } + @Test + void shouldReturnOnlySpecifiedVariablesOnJobActivation() { + // given + zeebeClient + .newDeployResourceCommand() + .addProcessModel( + Bpmn.createExecutableProcess("simpleProcess") + .startEvent() + .serviceTask("task", (task) -> task.zeebeJobType("jobType")) + .endEvent() + .done(), + "simpleProcess.bpmn") + .send() + .join(); + + zeebeClient + .newCreateInstanceCommand() + .bpmnProcessId("simpleProcess") + .latestVersion() + .variables( + Map.ofEntries( + Map.entry("var_a", "val_a"), + Map.entry("var_b", "val_b"), + Map.entry("var_c", "val_c"), + Map.entry("var_d", "val_d"))) + .send() + .join(); + + Awaitility.await() + .untilAsserted( + () -> { + // when + final ActivateJobsResponse activateJobsResponse = + zeebeClient + .newActivateJobsCommand() + .jobType("jobType") + .maxJobsToActivate(32) + .timeout(Duration.ofMinutes(1)) + .workerName("yolo") + .fetchVariables(List.of("var_b", "var_d")) + .send() + .join(); + + // then + final List jobs = activateJobsResponse.getJobs(); + assertThat(jobs) + .first() + .extracting(ActivatedJob::getVariablesAsMap) + .isEqualTo(Map.of("var_b", "val_b", "var_d", "val_d")); + }); + } + @Test void shouldUpdateDeadlineOnJob() { // given