-
Notifications
You must be signed in to change notification settings - Fork 44
Conversation
@akoutmos looking good - left some questions. |
…act on running tasks
I added some preliminary tests for this functionality, updated the telemetry documentation (as there is now a telemetry event for failed tasks), and also refactored the consumer a bit to be able to act upon Tasks that it has spawned under the supervisor. There are still a few more edge cases that I want to cover in the tests, so I will keep this in draft until I feel as though I have covered those code paths. |
I think this one is at a good point to review now. I added the ability for consumer processes to wait on in flight tasks under the Task.Supervisor before terminating and also added tests to validate the flow. I also added an additional telemetry event to report on when Tasks fail. One question I have is if we should expose an optional configuration for how long to wait on running Tasks. Currently I am just using the default value that is set for |
@akoutmos I think yes we have to let users chose the task timeout. I suspect users will have very different expectations on their message processor, depending on their business logic. |
I'll add that tonight and leave the default as the yield_many value. |
@akoutmos can I just say: You are awesome :) Great contributions in this PR (and elsewhere). Thanks so much! |
Thanks for the kind words @spier! I appreciate it :) |
Went ahead and added the |
@akoutmos amazing job 🥇- I will reserve some time during the weekend to review it. |
@@ -42,4 +42,9 @@ GenRMQ emits [Telemetry][telemetry] events for consumers. It currently exposes t | |||
- Measurement: `%{time: System.monotonic_time}` | |||
- Metadata: `%{module: atom, reason: atom}` | |||
|
|||
- `[:gen_rmq, :consumer, :task, :error]` - Dispatched by a GenRMQ consumer when a supervised Task fails to process a message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
{:ok, state} | ||
@doc false | ||
@impl GenServer | ||
def handle_continue(:init, state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lovely!
lib/consumer.ex
Outdated
{:DOWN, ref, :process, _pid, reason}, | ||
%{module: module, config: config, running_tasks: running_tasks} = state | ||
) do | ||
if Map.has_key?(running_tasks, ref) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens in this case to the message? Will it be rejected/nacked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call....we would probably want some sort of configuration so the user can decide what happens to the message if the task fails. Perhaps we also provide the option to send the message to the deadletter exchange. Also on that same note, we may also want some configuration around how long the Tasks take to complete. Currently it is set to the async_nolink
value of 5 seconds. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you should be able to set a callback to deal with the message? But I think the sensible default here would be to nack the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the callback, smth like: handle_error(reason, state)
. Then in this callback user has all the power to ack, reject or requeue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, are we capturing here also task timeouts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My latest commit adds the ability to configure a timeout for tasks. It was a little tricky to add that in given that Task.Supervisor does not provide that functionality for async_nolink
, but I think I have a good implementation in place to handle this functionality. I misspoke earlier in regards to the 5 second timeout...that is for async_stream
and async_stream_nolink
.
As for a custom callback to handle the failure, are you thinking about rewriting the consumer module to a macro so that we can leverage defoverridable
if the user does not want to use the default error handler? Or would handle_error
be a required behaviour callback that needs to be implemented by the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akoutmos I would prefer to avoid rewriting consumer module to a macro at this stage (just to limit the scope of changes).
Since these changes will be released as a major version, requiring error callback to be implemented should be fine, right? @vorce what do you think?
We could also consider skipping error callback and just reject the messages. The problem here is that users might skip dead-letter configuration for their consumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I'll add an additional callback handle_error
that will be triggered on task exception or task timeout along with an example or 2. I agree that this is probably the way to go so that gen_rmq does not impose any assumptions upon the user. They can deal with the error the way that best fits their needs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, and thanks again for all the work @akoutmos
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing that I thought about today was that there is no way to throttle the consumer. With how it currently is, it can keep on consuming messages and spawning new tasks under the supervisor. Thoughts on adding an additional configuration to have the user set the number of concurrent tasks running? |
@akoutmos good point! I think we can introduce that later if it turns out to be a problem for some users. The current/master implementation has the same problem. |
I added the |
lib/consumer.ex
Outdated
`reason` - atom denoting the type of error | ||
|
||
## Examples: | ||
To reject the message message that caused the Task to fail you can do something like so: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double message
@@ -0,0 +1,25 @@ | |||
defmodule GenRMQ.MessageTask do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea to wrap it like this 👍
@akoutmos error handling looks good 👍 |
I wrapped up that last of the tests and added |
@akoutmos great, thank you 🙏 I plan to merge it today/tomorrow. |
Updated some docs and some minor error handling tweaks. Nothing else pops out at me for clean up. All good on this side |
🎉 |
@mkorszun just curious, what is the release process for the library? I remember you mentioning that we would have to release a new major(?) version for this? |
@spier we need a major release because we have dropped support for some older versions of the OTP. Release process will involve:
|
@mkorszun Thanks for the writeup. I am going through the steps, to understand them better. I might write a new issue to document the process for ourselves, and additional admins in the future. I will try to review what you do for this 3.0.0 release and then try myself on writing this up if that is ok? Notes
(unrelated) 30k partyBtw I realized that we are at |
@spier sounds good. Your notes are already quite precise :) regarding |
This PR is an initial attempt at addressing issue #46. Still in the process of adding tests and solidifying the implementation, so leaving the PR in draft until complete (also looking for implementation feedback).
Description
Originally, the
GenRMQ.Consumer
module would callspawn/1
to asynchronously execute a message handler. This change instead leveragesTask.Supervisor
to execute message processing handlers. This should be a transparent change to the end user as this does not impact howGenRMQ.Consumer
concurrently executes message handlers.This introduces the usage of OTP 21's
handle_continue
to ensure that theTask.Supervisor
is started before any mailbox messages are processed. As a result, gen_rmq would no longer be compatible with OTP releases prior to 21.Checklist