From e7826354dc3c6f5f46ea89728cbcdc193adfc2ca Mon Sep 17 00:00:00 2001 From: Remco Westerhoud Date: Tue, 10 May 2022 15:40:06 +0200 Subject: [PATCH] fix: remove single threaded executor The single threaded executor was causing issues in combination with the GrpcResponseWriter. The response was send in a different thread from where the request was received. Since the writer reuses a DirectBuffer to store the record this could result in the buffer being modified by a thread, whilst the response was still being send in the single executor thread. --- .../test/engine/GrpcToLogStreamGateway.java | 455 ++++++++---------- .../process/test/engine/InMemoryEngine.java | 1 - 2 files changed, 193 insertions(+), 263 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java index 650ee6e6..15e49051 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java @@ -73,15 +73,11 @@ import io.grpc.stub.StreamObserver; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase implements AutoCloseable { +class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase { private final LogStreamRecordWriter writer; - private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Map responseSenderMap = new HashMap<>(); private final RecordMetadata recordMetadata = new RecordMetadata(); private final AtomicLong requestIdGenerator = new AtomicLong(); @@ -118,307 +114,262 @@ private void writeCommandWithoutKey( public void activateJobs( final ActivateJobsRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest(responseObserver, GrpcResponseWriter::createJobBatchResponse); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createJobBatchResponse); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.JOB_BATCH) - .intent(JobBatchIntent.ACTIVATE); + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.JOB_BATCH) + .intent(JobBatchIntent.ACTIVATE); - final JobBatchRecord jobBatchRecord = new JobBatchRecord(); + final JobBatchRecord jobBatchRecord = new JobBatchRecord(); - jobBatchRecord.setType(request.getType()); - jobBatchRecord.setWorker(request.getWorker()); - jobBatchRecord.setTimeout(request.getTimeout()); - jobBatchRecord.setMaxJobsToActivate(request.getMaxJobsToActivate()); + jobBatchRecord.setType(request.getType()); + jobBatchRecord.setWorker(request.getWorker()); + jobBatchRecord.setTimeout(request.getTimeout()); + jobBatchRecord.setMaxJobsToActivate(request.getMaxJobsToActivate()); - writeCommandWithoutKey(recordMetadata, jobBatchRecord); - }); + writeCommandWithoutKey(recordMetadata, jobBatchRecord); } @Override public void cancelProcessInstance( final CancelProcessInstanceRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest( - responseObserver, GrpcResponseWriter::createCancelInstanceResponse); - - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.PROCESS_INSTANCE) - .intent(ProcessInstanceIntent.CANCEL); - - final ProcessInstanceRecord processInstanceRecord = new ProcessInstanceRecord(); - processInstanceRecord.setProcessInstanceKey(request.getProcessInstanceKey()); - - writeCommandWithKey( - request.getProcessInstanceKey(), recordMetadata, processInstanceRecord); - }); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createCancelInstanceResponse); + + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.PROCESS_INSTANCE) + .intent(ProcessInstanceIntent.CANCEL); + + final ProcessInstanceRecord processInstanceRecord = new ProcessInstanceRecord(); + processInstanceRecord.setProcessInstanceKey(request.getProcessInstanceKey()); + + writeCommandWithKey(request.getProcessInstanceKey(), recordMetadata, processInstanceRecord); } @Override public void completeJob( final CompleteJobRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest(responseObserver, GrpcResponseWriter::createCompleteJobResponse); - - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.JOB) - .intent(JobIntent.COMPLETE); - - final JobRecord jobRecord = new JobRecord(); - - final String variables = request.getVariables(); - if (!variables.isEmpty()) { - jobRecord.setVariables( - BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); - } - - writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); - }); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createCompleteJobResponse); + + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.JOB) + .intent(JobIntent.COMPLETE); + + final JobRecord jobRecord = new JobRecord(); + + final String variables = request.getVariables(); + if (!variables.isEmpty()) { + jobRecord.setVariables(BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); + } + + writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); } @Override public void createProcessInstance( final CreateProcessInstanceRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest( - responseObserver, GrpcResponseWriter::createProcessInstanceResponse); - - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.PROCESS_INSTANCE_CREATION) - .intent(ProcessInstanceCreationIntent.CREATE); - - final ProcessInstanceCreationRecord processInstanceCreationRecord = - createProcessInstanceCreationRecord(request); - writeCommandWithoutKey(recordMetadata, processInstanceCreationRecord); - }); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createProcessInstanceResponse); + + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.PROCESS_INSTANCE_CREATION) + .intent(ProcessInstanceCreationIntent.CREATE); + + final ProcessInstanceCreationRecord processInstanceCreationRecord = + createProcessInstanceCreationRecord(request); + writeCommandWithoutKey(recordMetadata, processInstanceCreationRecord); } @Override public void createProcessInstanceWithResult( final CreateProcessInstanceWithResultRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest( - responseObserver, GrpcResponseWriter::createProcessInstanceWithResultResponse); - - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.PROCESS_INSTANCE_CREATION) - .intent(ProcessInstanceCreationIntent.CREATE_WITH_AWAITING_RESULT); - - final ProcessInstanceCreationRecord processInstanceCreationRecord = - createProcessInstanceCreationRecord(request.getRequest()); - processInstanceCreationRecord.setFetchVariables(request.getFetchVariablesList()); - - writeCommandWithoutKey(recordMetadata, processInstanceCreationRecord); - }); + final Long requestId = + registerNewRequest( + responseObserver, GrpcResponseWriter::createProcessInstanceWithResultResponse); + + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.PROCESS_INSTANCE_CREATION) + .intent(ProcessInstanceCreationIntent.CREATE_WITH_AWAITING_RESULT); + + final ProcessInstanceCreationRecord processInstanceCreationRecord = + createProcessInstanceCreationRecord(request.getRequest()); + processInstanceCreationRecord.setFetchVariables(request.getFetchVariablesList()); + + writeCommandWithoutKey(recordMetadata, processInstanceCreationRecord); } @Override public void deployProcess( final DeployProcessRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest(responseObserver, GrpcResponseWriter::createDeployResponse); - - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.DEPLOYMENT) - .intent(DeploymentIntent.CREATE); - - final DeploymentRecord deploymentRecord = new DeploymentRecord(); - final ValueArray resources = deploymentRecord.resources(); - - request - .getProcessesList() - .forEach( - (processRequestObject -> { - resources - .add() - .setResourceName(processRequestObject.getName()) - .setResource(processRequestObject.getDefinition().toByteArray()); - })); - - writeCommandWithoutKey(recordMetadata, deploymentRecord); - }); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createDeployResponse); + + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.DEPLOYMENT) + .intent(DeploymentIntent.CREATE); + + final DeploymentRecord deploymentRecord = new DeploymentRecord(); + final ValueArray resources = deploymentRecord.resources(); + + request + .getProcessesList() + .forEach( + (processRequestObject -> { + resources + .add() + .setResourceName(processRequestObject.getName()) + .setResource(processRequestObject.getDefinition().toByteArray()); + })); + + writeCommandWithoutKey(recordMetadata, deploymentRecord); } @Override public void deployResource( final DeployResourceRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest( - responseObserver, GrpcResponseWriter::createDeployResourceResponse); - - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.DEPLOYMENT) - .intent(DeploymentIntent.CREATE); - - final DeploymentRecord deploymentRecord = new DeploymentRecord(); - final ValueArray resources = deploymentRecord.resources(); - - request - .getResourcesList() - .forEach( - (resource -> - resources - .add() - .setResourceName(resource.getName()) - .setResource(resource.getContent().toByteArray()))); - - writeCommandWithoutKey(recordMetadata, deploymentRecord); - }); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createDeployResourceResponse); + + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.DEPLOYMENT) + .intent(DeploymentIntent.CREATE); + + final DeploymentRecord deploymentRecord = new DeploymentRecord(); + final ValueArray resources = deploymentRecord.resources(); + + request + .getResourcesList() + .forEach( + (resource -> + resources + .add() + .setResourceName(resource.getName()) + .setResource(resource.getContent().toByteArray()))); + + writeCommandWithoutKey(recordMetadata, deploymentRecord); } @Override public void failJob( final FailJobRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest(responseObserver, GrpcResponseWriter::createFailJobResponse); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createFailJobResponse); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.JOB) - .intent(JobIntent.FAIL); + prepareRecordMetadata().requestId(requestId).valueType(ValueType.JOB).intent(JobIntent.FAIL); - final JobRecord jobRecord = new JobRecord(); + final JobRecord jobRecord = new JobRecord(); - jobRecord.setRetries(request.getRetries()); - jobRecord.setErrorMessage(request.getErrorMessage()); + jobRecord.setRetries(request.getRetries()); + jobRecord.setErrorMessage(request.getErrorMessage()); - writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); - }); + writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); } @Override public void throwError( final ThrowErrorRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest(responseObserver, GrpcResponseWriter::createJobThrowErrorResponse); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createJobThrowErrorResponse); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.JOB) - .intent(JobIntent.THROW_ERROR); + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.JOB) + .intent(JobIntent.THROW_ERROR); - final JobRecord jobRecord = new JobRecord(); + final JobRecord jobRecord = new JobRecord(); - jobRecord.setErrorCode(BufferUtil.wrapString(request.getErrorCode())); - jobRecord.setErrorMessage(request.getErrorMessage()); + jobRecord.setErrorCode(BufferUtil.wrapString(request.getErrorCode())); + jobRecord.setErrorMessage(request.getErrorMessage()); - writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); - }); + writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); } @Override public void publishMessage( final PublishMessageRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest(responseObserver, GrpcResponseWriter::createMessageResponse); - - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.MESSAGE) - .intent(MessageIntent.PUBLISH); - - final MessageRecord messageRecord = new MessageRecord(); - - messageRecord.setCorrelationKey(request.getCorrelationKey()); - messageRecord.setMessageId(request.getMessageId()); - messageRecord.setName(request.getName()); - messageRecord.setTimeToLive(request.getTimeToLive()); - final String variables = request.getVariables(); - if (!variables.isEmpty()) { - messageRecord.setVariables( - BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); - } - - writeCommandWithoutKey(recordMetadata, messageRecord); - }); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createMessageResponse); + + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.MESSAGE) + .intent(MessageIntent.PUBLISH); + + final MessageRecord messageRecord = new MessageRecord(); + + messageRecord.setCorrelationKey(request.getCorrelationKey()); + messageRecord.setMessageId(request.getMessageId()); + messageRecord.setName(request.getName()); + messageRecord.setTimeToLive(request.getTimeToLive()); + final String variables = request.getVariables(); + if (!variables.isEmpty()) { + messageRecord.setVariables( + BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); + } + + writeCommandWithoutKey(recordMetadata, messageRecord); } @Override public void resolveIncident( final ResolveIncidentRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest( - responseObserver, GrpcResponseWriter::createResolveIncidentResponse); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createResolveIncidentResponse); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.INCIDENT) - .intent(IncidentIntent.RESOLVE); + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.INCIDENT) + .intent(IncidentIntent.RESOLVE); - final IncidentRecord incidentRecord = new IncidentRecord(); + final IncidentRecord incidentRecord = new IncidentRecord(); - writeCommandWithKey(request.getIncidentKey(), recordMetadata, incidentRecord); - }); + writeCommandWithKey(request.getIncidentKey(), recordMetadata, incidentRecord); } @Override public void setVariables( final SetVariablesRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest(responseObserver, GrpcResponseWriter::createSetVariablesResponse); - - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.VARIABLE_DOCUMENT) - .intent(VariableDocumentIntent.UPDATE); - - final VariableDocumentRecord variableDocumentRecord = new VariableDocumentRecord(); - - final String variables = request.getVariables(); - if (!variables.isEmpty()) { - variableDocumentRecord.setVariables( - BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); - } - - variableDocumentRecord.setScopeKey(request.getElementInstanceKey()); - variableDocumentRecord.setUpdateSemantics( - request.getLocal() - ? VariableDocumentUpdateSemantic.LOCAL - : VariableDocumentUpdateSemantic.PROPAGATE); - - writeCommandWithoutKey(recordMetadata, variableDocumentRecord); - }); + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createSetVariablesResponse); + + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.VARIABLE_DOCUMENT) + .intent(VariableDocumentIntent.UPDATE); + + final VariableDocumentRecord variableDocumentRecord = new VariableDocumentRecord(); + + final String variables = request.getVariables(); + if (!variables.isEmpty()) { + variableDocumentRecord.setVariables( + BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); + } + + variableDocumentRecord.setScopeKey(request.getElementInstanceKey()); + variableDocumentRecord.setUpdateSemantics( + request.getLocal() + ? VariableDocumentUpdateSemantic.LOCAL + : VariableDocumentUpdateSemantic.PROPAGATE); + + writeCommandWithoutKey(recordMetadata, variableDocumentRecord); } @Override @@ -456,32 +407,18 @@ public void topology( public void updateJobRetries( final UpdateJobRetriesRequest request, final StreamObserver responseObserver) { - executor.submit( - () -> { - final Long requestId = - registerNewRequest( - responseObserver, GrpcResponseWriter::createJobUpdateRetriesResponse); - - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.JOB) - .intent(JobIntent.UPDATE_RETRIES); - - final JobRecord jobRecord = new JobRecord(); - jobRecord.setRetries(request.getRetries()); - - writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); - }); - } + final Long requestId = + registerNewRequest(responseObserver, GrpcResponseWriter::createJobUpdateRetriesResponse); - @Override - public void close() { - try { - executor.shutdownNow(); - executor.awaitTermination(60, TimeUnit.SECONDS); - } catch (final InterruptedException ie) { - // TODO handle - } + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.JOB) + .intent(JobIntent.UPDATE_RETRIES); + + final JobRecord jobRecord = new JobRecord(); + jobRecord.setRetries(request.getRetries()); + + writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); } private RecordMetadata prepareRecordMetadata() { @@ -514,19 +451,13 @@ private ProcessInstanceCreationRecord createProcessInstanceCreationRecord( } public void responseCallback(final Long requestId) { - executor.submit( - () -> { - final ResponseSender responseSender = responseSenderMap.remove(requestId); - responseSender.sendResponse(); - }); + final ResponseSender responseSender = responseSenderMap.remove(requestId); + responseSender.sendResponse(); } public void errorCallback(final Long requestId, final Status error) { - executor.submit( - () -> { - final ResponseSender responseSender = responseSenderMap.remove(requestId); - responseSender.sendError(error); - }); + final ResponseSender responseSender = responseSenderMap.remove(requestId); + responseSender.sendError(error); } public String getAddress() { diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryEngine.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryEngine.java index 04d0ee36..a4ff3fe7 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryEngine.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryEngine.java @@ -77,7 +77,6 @@ public void stop() { try { grpcServer.shutdownNow(); grpcServer.awaitTermination(); - gateway.close(); streamProcessor.close(); database.close(); logStream.close();