Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passing state to handle_setup #67

Merged
merged 6 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions lib/rabbit/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Rabbit.Consumer do
`AMQP.Channel` and provide the following benefits:

* Durability during connection and channel failures through use of expotential backoff.
* Easy runtime setup through the `c:init/2` and `c:handle_setup/2` callbacks.
* Easy runtime setup through the `c:init/2` and `c:handle_setup/2` or `c:handle_setup/1` callbacks.
* Automatic acknowledgements based on the return value of the `c:handle_message/1` callback.
* Ability to handle exceptions through the `c:handle_error/1` callback.
* Each message is executed within its own supervised task.
Expand Down Expand Up @@ -99,6 +99,7 @@ defmodule Rabbit.Consumer do
| {:no_wait, boolean()}
| {:arguments, Keyword.t()}
| {:custom_meta, map()}
| {:setup_opts, setup_options()}
@type options :: [option()]
@type delivery_tag :: non_neg_integer()
@type action_options :: [{:multiple, boolean()} | {:requeue, boolean()}]
Expand All @@ -110,6 +111,7 @@ defmodule Rabbit.Consumer do
| {:reject, Rabbit.Message.t()}
| {:reject, Rabbit.Message.t(), action_options()}
| any()
@type setup_options :: keyword()

@doc """
A callback executed when the consumer is started.
Expand Down Expand Up @@ -143,6 +145,20 @@ defmodule Rabbit.Consumer do
"""
@callback handle_setup(channel :: AMQP.Channel.t(), queue :: String.t()) :: :ok | :error

@doc """
The same as `c:handle_setup/1` but called with the current state of the consumer.

Important keys from the state include:

* `:connection` - the `Rabbit.Connection` module in use.
* `:channel` - the `AMQP.Channel` open for this consumer.
* `:queue` - the queue name.
* `:setup_opts` - as provided to `start_link/3`.

Return either `:ok` or `{:ok, new_state}` for success, the latter will update the state.
"""
@callback handle_setup(state :: map) :: :ok | {:ok, new_state :: map()} | :error

@doc """
A callback executed to handle message consumption.

Expand Down Expand Up @@ -176,7 +192,7 @@ defmodule Rabbit.Consumer do
"""
@callback handle_error(message :: Rabbit.Message.t()) :: message_response()

@optional_callbacks handle_setup: 2
@optional_callbacks handle_setup: 1, handle_setup: 2

################################
# Public API
Expand Down Expand Up @@ -205,6 +221,7 @@ defmodule Rabbit.Consumer do
* `:arguments` - A set of arguments for the consumer.
* `:custom_meta` - A map of custom data that will be included in each `Rabbit.Message`
handled by the consumer.
* `:setup_opts` - A keyword list of custom options for use in `c:handle_setup/1`.

## Server Options

Expand Down
47 changes: 33 additions & 14 deletions lib/rabbit/consumer/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Rabbit.Consumer.Server do

@opts_schema %{
connection: [type: [:tuple, :pid, :atom], required: true],
queue: [type: :binary, required: true],
queue: [type: :binary, required: false],
prefetch_count: [type: :integer, default: 1],
prefetch_size: [type: :integer, default: 0],
consumer_tag: [type: :binary, default: ""],
Expand All @@ -19,7 +19,8 @@ defmodule Rabbit.Consumer.Server do
no_wait: [type: :boolean, default: false],
arguments: [type: :list, default: []],
timeout: [type: [:integer, :atom], required: false],
custom_meta: [type: :map, default: %{}]
custom_meta: [type: :map, default: %{}],
setup_opts: [type: :list, default: [], required: false]
}

@qos_opts [
Expand Down Expand Up @@ -90,6 +91,7 @@ defmodule Rabbit.Consumer.Server do
def handle_continue(:consume, state) do
case consume(state) do
{:ok, state} -> {:noreply, state}
{:error, :no_queue_given} -> {:stop, :no_queue_given, state}
{:error, state} -> {:noreply, state, {:continue, :restart_delay}}
end
end
Expand Down Expand Up @@ -168,7 +170,8 @@ defmodule Rabbit.Consumer.Server do
qos_opts: Keyword.take(opts, @qos_opts),
consume_opts: Keyword.take(opts, @consume_opts),
worker_opts: Keyword.take(opts, @worker_opts),
custom_meta: Keyword.get(opts, :custom_meta)
custom_meta: Keyword.get(opts, :custom_meta),
setup_opts: Keyword.get(opts, :setup_opts)
}
end

Expand Down Expand Up @@ -206,22 +209,38 @@ defmodule Rabbit.Consumer.Server do
defp handle_setup(%{setup_run: true} = state), do: {:ok, state}

defp handle_setup(state) do
if function_exported?(state.module, :handle_setup, 2) do
case state.module.handle_setup(state.channel, state.queue) do
:ok ->
state = %{state | setup_run: true}
{:ok, state}

error ->
log_error(state, error)
{:error, state}
result =
cond do
function_exported?(state.module, :handle_setup, 2) ->
state.module.handle_setup(state.channel, state.queue)

function_exported?(state.module, :handle_setup, 1) ->
state.module.handle_setup(state)

true ->
:skip
end
else
{:ok, state}

case result do
:ok ->
state = %{state | setup_run: true}
{:ok, state}

:skip ->
{:ok, state}

{:ok, state} ->
state = %{state | setup_run: true}
{:ok, state}

error ->
log_error(state, error)
{:error, state}
end
end

defp consume(%{consuming: true} = state), do: {:ok, state}
defp consume(%{queue: nil}), do: {:error, :no_queue_given}

defp consume(state) do
with :ok <- AMQP.Basic.qos(state.channel, state.qos_opts),
Expand Down
20 changes: 17 additions & 3 deletions lib/rabbit/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Rabbit.Producer do

* Durability during connection and channel failures through use of expotential backoff.
* Channel pooling for increased publishing performance.
* Easy runtime setup through an `c:init/2` and `c:handle_setup/1` callbacks.
* Easy runtime setup through an `c:init/2` and `c:handle_setup/1` or `c:handle_setup/2` callbacks.
* Simplification of standard publishing options.
* Automatic payload encoding based on available serializers and message
content type.
Expand Down Expand Up @@ -82,6 +82,7 @@ defmodule Rabbit.Producer do
| {:sync_start_delay, non_neg_integer()}
| {:sync_start_max, non_neg_integer()}
| {:publish_opts, publish_options()}
| {:setup_opts, setup_options()}
@type options :: [option()]
@type exchange :: String.t()
@type routing_key :: String.t()
Expand All @@ -103,6 +104,7 @@ defmodule Rabbit.Producer do
| {:user_id, String.t()}
| {:app_id, String.t()}
@type publish_options :: [publish_option()]
@type setup_options :: keyword()

@doc """
A callback executed by each component of the producer pool.
Expand Down Expand Up @@ -148,7 +150,18 @@ defmodule Rabbit.Producer do
"""
@callback handle_setup(channel :: AMQP.Channel.t()) :: :ok | :error

@optional_callbacks handle_setup: 1
@doc """
The same as `handle_setup/1` but includes the current state as a second argument.

Important keys in the state include:

* `:connection` - the `Rabbit.Connection` process.
* `:channel` - the opened `AMQP.Channel` channel.
* `:setup_opts` - options provided undert the key `:setup_opts` to `start_link/3`.
"""
@callback handle_setup(channel :: AMQP.Channel.t(), state :: map()) :: :ok | :error

@optional_callbacks handle_setup: 1, handle_setup: 2

################################
# Public API
Expand All @@ -174,6 +187,7 @@ defmodule Rabbit.Producer do
before proceeding with async start - defaults to `100`.
* `:publish_opts` - Any `t:publish_option/0` that is automatically set as a
default option value when calling `publish/6`.
* `:setup_opts` - a keyword list of values to provide to `c:handle_setup/2`.

## Server Options

Expand All @@ -192,7 +206,7 @@ defmodule Rabbit.Producer do

The function must accept a producer child pid.
"""
@spec transaction(Rabbit.Producer.t(), (Rabbit.Prodcuer.t() -> any())) :: any()
@spec transaction(Rabbit.Producer.t(), (Rabbit.Producer.t() -> any())) :: any()
def transaction(producer, fun) do
:poolboy.transaction(producer, &fun.(&1))
end
Expand Down
43 changes: 30 additions & 13 deletions lib/rabbit/producer/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ defmodule Rabbit.Producer.Server do
sync_start: [type: :boolean, required: true, default: true],
sync_start_delay: [type: :integer, required: true, default: 50],
sync_start_max: [type: :integer, required: true, default: 100],
publish_opts: [type: :list, default: []]
publish_opts: [type: :list, default: []],
setup_opts: [type: :list, required: false]
}

################################
Expand Down Expand Up @@ -131,7 +132,8 @@ defmodule Rabbit.Producer.Server do
channel_open: false,
setup_run: false,
restart_attempts: 0,
publish_opts: Keyword.get(opts, :publish_opts)
publish_opts: Keyword.get(opts, :publish_opts),
setup_opts: Keyword.get(opts, :setup_opts)
}
end

Expand Down Expand Up @@ -191,18 +193,33 @@ defmodule Rabbit.Producer.Server do
defp handle_setup(%{setup_run: true} = state), do: {:ok, state}

defp handle_setup(state) do
if function_exported?(state.module, :handle_setup, 1) do
case state.module.handle_setup(state.channel) do
:ok ->
state = %{state | setup_run: true}
{:ok, state}

error ->
log_error(state, error)
{:error, state}
result =
cond do
function_exported?(state.module, :handle_setup, 2) ->
state.module.handle_setup(state.channel, state)

function_exported?(state.module, :handle_setup, 1) ->
state.module.handle_setup(state.channel)

true ->
:skip
end
else
{:ok, state}

case result do
:ok ->
state = %{state | setup_run: true}
{:ok, state}

:skip ->
{:ok, state}

{:ok, state} ->
state = %{state | setup_run: true}
{:ok, state}

error ->
log_error(state, error)
{:error, state}
end
end

Expand Down
83 changes: 80 additions & 3 deletions test/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,55 @@ defmodule Rabbit.ConsumerTest do
end
end

defmodule MinimalTestConsumer do
use Rabbit.Consumer

@impl Rabbit.Consumer
def init(:consumer, opts) do
{:ok, opts}
end

@impl Rabbit.Consumer
def handle_message(_msg) do
end

@impl Rabbit.Consumer
def handle_error(_) do
:ok
end
end

defmodule AdvancedSetupTestConsumer do
use Rabbit.Consumer

@impl Rabbit.Consumer
def init(:consumer, opts) do
{:ok, opts}
end

@impl Rabbit.Consumer
def handle_setup(state) do
%{channel: channel, setup_opts: setup_opts} = state
{:ok, %{queue: queue}} = AMQP.Queue.declare(channel)
# Declare an exchange as the default exchange cannot bind queues.
:ok = AMQP.Exchange.declare(channel, "topic_test", :topic)
:ok = AMQP.Queue.bind(channel, queue, "topic_test", routing_key: setup_opts[:routing_key])

send(setup_opts[:test_pid], :handle_advanced_setup_callback)

{:ok, %{state | queue: queue}}
end

@impl Rabbit.Consumer
def handle_message(_msg) do
end

@impl Rabbit.Consumer
def handle_error(_) do
:ok
end
end

setup do
{:ok, connection} = Connection.start_link(TestConnection)
{:ok, producer} = Producer.start_link(TestProducer, connection: connection)
Expand Down Expand Up @@ -129,13 +178,39 @@ defmodule Rabbit.ConsumerTest do
assert_receive :init_callback
end

test "consumer modules use handle_setup callback", meta do
test "consumer modules use handle_setup/2 callback", meta do
Process.register(self(), :consumer_test)

assert {:ok, _, _} = start_consumer(meta)
assert_receive :handle_setup_callback
end

test "consumer module uses handle_setup/1 callback", meta do
assert {:ok, _, _} =
start_consumer(AdvancedSetupTestConsumer, meta,
setup_opts: [test_pid: self(), routing_key: "routing.route"]
)

assert_receive :handle_advanced_setup_callback
end

test "handle_setup is optional if the queue already exists", meta do
state = connection_state(meta.connection)
{:ok, channel} = AMQP.Channel.open(state.connection)
queue = queue_name()
AMQP.Queue.declare(channel, queue, auto_delete: true)

assert {:ok, _, _} = start_consumer(MinimalTestConsumer, meta, queue: queue)
end

test "stops the server if the queue is not specified", meta do
Process.flag(:trap_exit, true)
{:ok, consumer} = Consumer.start_link(MinimalTestConsumer, connection: meta.connection)
Process.monitor(consumer)

assert_receive {:EXIT, _pid, :no_queue_given}
end

test "will ack messages based on return value", meta do
state = connection_state(meta.connection)
{:ok, channel} = AMQP.Channel.open(state.connection)
Expand All @@ -162,10 +237,12 @@ defmodule Rabbit.ConsumerTest do
assert msg.custom_meta == %{foo: "bar"}
end

defp start_consumer(meta, opts \\ []) do
defp start_consumer(meta, opts \\ []), do: start_consumer(TestConsumer, meta, opts)

defp start_consumer(module, meta, opts) do
queue = Keyword.get(opts, :queue, queue_name())
opts = [connection: meta.connection, queue: queue] ++ opts
{:ok, consumer} = Consumer.start_link(TestConsumer, opts)
{:ok, consumer} = Consumer.start_link(module, opts)
await_consuming(consumer)
{:ok, consumer, queue}
end
Expand Down
Loading