Skip to content

Commit

Permalink
fix: use GrpcResponseMapper and GrpcRequestStore
Browse files Browse the repository at this point in the history
We now use the GrpcResponseMapper to map the valueBufferView to a proper rpc response. The sending of the responses has been relocated to the GrpcResponseWriter as it is the responsibility of this class.
  • Loading branch information
remcowesterhoud committed May 10, 2022
1 parent a6ff9d6 commit 4d7b440
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,17 @@
package io.camunda.zeebe.process.test.engine;

import com.google.protobuf.GeneratedMessageV3;
import com.google.rpc.Code;
import com.google.rpc.Status;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivatedJob;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CompleteJobResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CreateProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CreateProcessInstanceWithResultResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DecisionMetadata;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DecisionRequirementsMetadata;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DeployProcessResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DeployResourceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DeployResourceResponse.Builder;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ProcessMetadata;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.PublishMessageResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ResolveIncidentResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.SetVariablesResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ThrowErrorResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesResponse;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceResultRecord;
import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord;
import io.camunda.zeebe.process.test.engine.GatewayRequestStore.Request;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.MutableDirectBuffer;
Expand All @@ -58,6 +30,7 @@ class GrpcResponseWriter implements CommandResponseWriter {
private static final DirectBuffer valueBufferView = new UnsafeBuffer();
private static Intent intent = Intent.UNKNOWN;
final GrpcToLogStreamGateway gateway;
private final GatewayRequestStore gatewayRequestStore;
private int partitionId = -1;
private RecordType recordType = RecordType.NULL_VAL;
private ValueType valueType = ValueType.NULL_VAL;
Expand All @@ -66,8 +39,10 @@ class GrpcResponseWriter implements CommandResponseWriter {
private final MutableDirectBuffer valueBuffer = new ExpandableArrayBuffer();
private final GrpcResponseMapper responseMapper = new GrpcResponseMapper();

public GrpcResponseWriter(final GrpcToLogStreamGateway gateway) {
public GrpcResponseWriter(
final GrpcToLogStreamGateway gateway, final GatewayRequestStore gatewayRequestStore) {
this.gateway = gateway;
this.gatewayRequestStore = gatewayRequestStore;
}

@Override
Expand Down Expand Up @@ -122,16 +97,34 @@ public CommandResponseWriter valueWriter(final BufferWriter value) {
@Override
public boolean tryWriteResponse(final int requestStreamId, final long requestId) {
if (rejectionType != RejectionType.NULL_VAL) {
final Status rejectionResponse = createRejectionResponse();
gateway.errorCallback(requestId, rejectionResponse);
final Status rejectionResponse =
responseMapper.createRejectionResponse(rejectionType, intent, rejectionReason);
final Request request = gatewayRequestStore.removeRequest(requestId);
sendError(request, rejectionResponse);
return true;
}

try {
gateway.responseCallback(requestId);
final Request request = gatewayRequestStore.removeRequest(requestId);
final GeneratedMessageV3 response =
responseMapper.map(request.requestType(), valueBufferView, key, intent);
sendResponse(request, response);
return true;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private void sendResponse(final Request request, final GeneratedMessageV3 response) {
final StreamObserver<GeneratedMessageV3> streamObserver =
(StreamObserver<GeneratedMessageV3>) request.responseObserver();
streamObserver.onNext(response);
streamObserver.onCompleted();
}

private void sendError(final Request request, final Status error) {
final StreamObserver<GeneratedMessageV3> streamObserver =
(StreamObserver<GeneratedMessageV3>) request.responseObserver();
streamObserver.onError(StatusProto.toStatusException(error));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -442,16 +442,6 @@ private ProcessInstanceCreationRecord createProcessInstanceCreationRecord(
return processInstanceCreationRecord;
}

public void responseCallback(final Long requestId) {
final ResponseSender responseSender = responseSenderMap.remove(requestId);
responseSender.sendResponse();
}

public void errorCallback(final Long requestId, final Status error) {
final ResponseSender responseSender = responseSenderMap.remove(requestId);
responseSender.sendError(error);
}

public String getAddress() {
return "0.0.0.0:" + port;
}
Expand Down

0 comments on commit 4d7b440

Please sign in to comment.