Skip to content

Commit

Permalink
merge: #967
Browse files Browse the repository at this point in the history
967: [Backport stable/8.3] Properly support Signal broadcasting r=github-actions[bot] a=backport-action

# Description
Backport of #966 to `stable/8.3`.

relates to #964

Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud authored Nov 9, 2023
2 parents 978246c + 9529fae commit f7e3ee8
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivatedJob;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CompleteJobRequest;
Expand Down Expand Up @@ -59,6 +61,7 @@
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceResultRecord;
import io.camunda.zeebe.protocol.impl.record.value.signal.SignalRecord;
import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
Expand Down Expand Up @@ -93,7 +96,8 @@ class GrpcResponseMapper {
Map.entry(ResolveIncidentRequest.class, this::createResolveIncidentResponse),
Map.entry(SetVariablesRequest.class, this::createSetVariablesResponse),
Map.entry(UpdateJobRetriesRequest.class, this::createJobUpdateRetriesResponse),
Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse));
Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse),
Map.entry(BroadcastSignalRequest.class, this::createBroadcastSignalResponse));

GeneratedMessageV3 map(
final Class<? extends GeneratedMessageV3> requestType,
Expand Down Expand Up @@ -287,6 +291,13 @@ private GeneratedMessageV3 createModifyProcessInstanceResponse() {
return ModifyProcessInstanceResponse.newBuilder().build();
}

private GeneratedMessageV3 createBroadcastSignalResponse() {
final SignalRecord signal = new SignalRecord();
signal.wrap(valueBufferView);

return BroadcastSignalResponse.newBuilder().setKey(key).build();
}

private GeneratedMessageV3 createResolveIncidentResponse() {
final IncidentRecord incident = new IncidentRecord();
incident.wrap(valueBufferView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,15 @@ public void broadcastSignal(
final var requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

final SignalRecord command = new SignalRecord().setSignalName(request.getSignalName());

if (!request.getVariables().isEmpty()) {
command.setVariables(
BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(request.getVariables())));
}

writer.writeCommandWithoutKey(
new SignalRecord()
.setSignalName(request.getSignalName())
.setVariables(
BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(request.getVariables()))),
command,
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.SIGNAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.client.api.command.ClientException;
import io.camunda.zeebe.client.api.response.ActivateJobsResponse;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.response.BroadcastSignalResponse;
import io.camunda.zeebe.client.api.response.BrokerInfo;
import io.camunda.zeebe.client.api.response.DeploymentEvent;
import io.camunda.zeebe.client.api.response.EvaluateDecisionResponse;
Expand Down Expand Up @@ -1013,4 +1014,55 @@ void shouldDeployForm() {
assertThat(form.getFormKey()).isPositive();
assertThat(form.getFormId()).isEqualTo("Form_0w7r08e");
}

@Test
void shouldBroadcastSignal() {
// given
zeebeClient
.newDeployResourceCommand()
.addProcessModel(
Bpmn.createExecutableProcess("simpleProcess")
.startEvent()
.signal("signal")
.endEvent()
.done(),
"simpleProcess.bpmn")
.send()
.join();

// when
final BroadcastSignalResponse response =
zeebeClient.newBroadcastSignalCommand().signalName("signal").send().join();

// then
assertThat(response.getKey()).isPositive();
}

@Test
void shouldBroadcastSignalWithVariables() {
// given
zeebeClient
.newDeployResourceCommand()
.addProcessModel(
Bpmn.createExecutableProcess("simpleProcess")
.startEvent()
.signal("signal")
.endEvent()
.done(),
"simpleProcess.bpmn")
.send()
.join();

// when
final BroadcastSignalResponse response =
zeebeClient
.newBroadcastSignalCommand()
.signalName("signal")
.variable("foo", "bar")
.send()
.join();

// then
assertThat(response.getKey()).isPositive();
}
}

0 comments on commit f7e3ee8

Please sign in to comment.