-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make gRPC response sender thread-safe #358
Conversation
The GatewayRequestStore is a utility class that will be used by the GrpcToLogStreamGateway and the GrpcResponseWriter to share request between the two of them. The GrpcToLogStreamGateway will store new requests in here. The GrpcResponseWriter will take these requests out and write the correct response to the correct request. This store has to be thread-safe. Because of this we are using a ConcurrentHashMap as well as an AtomicLong
The single threaded executor was causing issues in combination with the GrpcResponseWriter. The response was send in a different thread from where the request was received. Since the writer reuses a DirectBuffer to store the record this could result in the buffer being modified by a thread, whilst the response was still being send in the single executor thread.
Incoming requests need to be stored in the new GatewayRequestStore so the GrpcResponseWriter will be able to access them.
The GrpcResponseWriter used to do all the mapping of the responses. This should not be the responsibility of this class. It should only be responsible for writing the responses to the correct requests. The mapping itself has now been extracted into a separate GrpcResponseMapper class.
We now use the GrpcResponseMapper to map the valueBufferView to a proper rpc response. The sending of the responses has been relocated to the GrpcResponseWriter as it is the responsibility of this class.
Creates the GatewayRequestStore and makes it available in the GrpcResponseWriter and the GrpcToLogStreamGateway.
Every command will now create its own RecordMetadata. This ensures the thread-safety of this class.
dc60102
to
51a917a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work @remcowesterhoud 👍
I have some questions, please take a look
engine/src/main/java/io/camunda/zeebe/process/test/engine/GatewayRequestStore.java
Show resolved
Hide resolved
executor.submit( | ||
() -> { | ||
final ResponseSender responseSender = responseSenderMap.remove(requestId); | ||
responseSender.sendResponse(); | ||
}); | ||
final ResponseSender responseSender = responseSenderMap.remove(requestId); | ||
responseSender.sendResponse(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Do I understand these lines are the true fix of the problem? When the engine calls this callback method, then the response buffer has been filled, but it used to not be immediately executed. In the meantime the engine might overwrite the buffer and then call this method again, before the gateway's executor reads the buffer on line 520. So, by immediately executing this code the problem is removed.
❓ Is it really necessary to remove the executor usage from all the other parts? Receiving and handling new requests, could still be performed async and single-threaded right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct that is the main fix.
I'm honestly not sure about the second question. I guess it could but I haven't tried it. The executor makes everything more complex in my mind and causes a lot of confusion for me, so I was happy to delete it 😄
I don't see much gain in keeping it though. From what I can see the gateway doesn't have any "heavy" methods that would block the thread for a long time, causing a performance drop. If anything I'd love to not have to worry about any concurrency in the GrpcToLogStreamGateway
at all. As a matter of fact I think it's better to extract the command writing to a separate class to fully get rid of concurrency handling in this class, so I will do a small refactoring for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining sounds good to me 👍
Add javadoc to shortly describe why there is a GatewayRequestStore
The GrpcToLogStreamGateway should not have to worry about any concurrency issues. By extracting the command writing to a separate class we have this logic isolated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
Thanks for the changes 🙇
Successfully created backport PR #361 for |
Description
This PR makes sure the request is received and responded to in the same thread. The classes have been made thread-safe where necessary. Please look at the issue and commit messages for a more detailed description.
The PR also includes some small refactoring to make the code easier to understand.
Related issues
closes #349
Definition of Done
Not all items need to be done depending on the issue and the pull request.
Code changes:
Testing:
Documentation: