From 4d7b4401c56970dac3fade0bf9eb664fea422936 Mon Sep 17 00:00:00 2001 From: Remco Westerhoud Date: Tue, 10 May 2022 16:07:20 +0200 Subject: [PATCH] fix: use GrpcResponseMapper and GrpcRequestStore We now use the GrpcResponseMapper to map the valueBufferView to a proper rpc response. The sending of the responses has been relocated to the GrpcResponseWriter as it is the responsibility of this class. --- .../test/engine/GrpcResponseWriter.java | 63 +++++++++---------- .../test/engine/GrpcToLogStreamGateway.java | 10 --- 2 files changed, 28 insertions(+), 45 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseWriter.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseWriter.java index fee2b441..bc0f384c 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseWriter.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseWriter.java @@ -8,45 +8,17 @@ package io.camunda.zeebe.process.test.engine; import com.google.protobuf.GeneratedMessageV3; -import com.google.rpc.Code; import com.google.rpc.Status; import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivatedJob; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CompleteJobResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CreateProcessInstanceResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CreateProcessInstanceWithResultResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DecisionMetadata; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DecisionRequirementsMetadata; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DeployProcessResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DeployResourceResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DeployResourceResponse.Builder; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ProcessMetadata; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.PublishMessageResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ResolveIncidentResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.SetVariablesResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ThrowErrorResponse; -import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesResponse; -import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter; -import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord; -import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord; -import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord; -import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord; -import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord; -import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceResultRecord; -import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord; +import io.camunda.zeebe.process.test.engine.GatewayRequestStore.Request; import io.camunda.zeebe.protocol.record.RecordType; import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.ValueType; import io.camunda.zeebe.protocol.record.intent.Intent; -import io.camunda.zeebe.protocol.record.intent.JobIntent; import io.camunda.zeebe.util.buffer.BufferUtil; import io.camunda.zeebe.util.buffer.BufferWriter; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; +import io.grpc.protobuf.StatusProto; +import io.grpc.stub.StreamObserver; import org.agrona.DirectBuffer; import org.agrona.ExpandableArrayBuffer; import org.agrona.MutableDirectBuffer; @@ -58,6 +30,7 @@ class GrpcResponseWriter implements CommandResponseWriter { private static final DirectBuffer valueBufferView = new UnsafeBuffer(); private static Intent intent = Intent.UNKNOWN; final GrpcToLogStreamGateway gateway; + private final GatewayRequestStore gatewayRequestStore; private int partitionId = -1; private RecordType recordType = RecordType.NULL_VAL; private ValueType valueType = ValueType.NULL_VAL; @@ -66,8 +39,10 @@ class GrpcResponseWriter implements CommandResponseWriter { private final MutableDirectBuffer valueBuffer = new ExpandableArrayBuffer(); private final GrpcResponseMapper responseMapper = new GrpcResponseMapper(); - public GrpcResponseWriter(final GrpcToLogStreamGateway gateway) { + public GrpcResponseWriter( + final GrpcToLogStreamGateway gateway, final GatewayRequestStore gatewayRequestStore) { this.gateway = gateway; + this.gatewayRequestStore = gatewayRequestStore; } @Override @@ -122,16 +97,34 @@ public CommandResponseWriter valueWriter(final BufferWriter value) { @Override public boolean tryWriteResponse(final int requestStreamId, final long requestId) { if (rejectionType != RejectionType.NULL_VAL) { - final Status rejectionResponse = createRejectionResponse(); - gateway.errorCallback(requestId, rejectionResponse); + final Status rejectionResponse = + responseMapper.createRejectionResponse(rejectionType, intent, rejectionReason); + final Request request = gatewayRequestStore.removeRequest(requestId); + sendError(request, rejectionResponse); return true; } try { - gateway.responseCallback(requestId); + final Request request = gatewayRequestStore.removeRequest(requestId); + final GeneratedMessageV3 response = + responseMapper.map(request.requestType(), valueBufferView, key, intent); + sendResponse(request, response); return true; } catch (final Exception e) { throw new RuntimeException(e); } } + + private void sendResponse(final Request request, final GeneratedMessageV3 response) { + final StreamObserver streamObserver = + (StreamObserver) request.responseObserver(); + streamObserver.onNext(response); + streamObserver.onCompleted(); + } + + private void sendError(final Request request, final Status error) { + final StreamObserver streamObserver = + (StreamObserver) request.responseObserver(); + streamObserver.onError(StatusProto.toStatusException(error)); + } } diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java index 15c1181c..6df349ee 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java @@ -442,16 +442,6 @@ private ProcessInstanceCreationRecord createProcessInstanceCreationRecord( return processInstanceCreationRecord; } - public void responseCallback(final Long requestId) { - final ResponseSender responseSender = responseSenderMap.remove(requestId); - responseSender.sendResponse(); - } - - public void errorCallback(final Long requestId, final Status error) { - final ResponseSender responseSender = responseSenderMap.remove(requestId); - responseSender.sendError(error); - } - public String getAddress() { return "0.0.0.0:" + port; }