Skip to content

Commit

Permalink
Merge pull request #67 from Odaeus/handle_setup_state
Browse files Browse the repository at this point in the history
Passing state to `handle_setup`
  • Loading branch information
nsweeting authored Mar 11, 2021
2 parents 89bf1ba + 5aadb4f commit a931b87
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 43 deletions.
36 changes: 24 additions & 12 deletions lib/rabbit/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Rabbit.Consumer do
`AMQP.Channel` and provide the following benefits:
* Durability during connection and channel failures through use of expotential backoff.
* Easy runtime setup through the `c:init/2` and `c:handle_setup/2` callbacks.
* Easy runtime setup through the `c:init/2` and `c:handle_setup/1` callbacks.
* Automatic acknowledgements based on the return value of the `c:handle_message/1` callback.
* Ability to handle exceptions through the `c:handle_error/1` callback.
* Each message is executed within its own supervised task.
Expand Down Expand Up @@ -49,9 +49,9 @@ defmodule Rabbit.Consumer do
end
@impl Rabbit.Consumer
def handle_setup(channel, queue) do
def handle_setup(state) do
# Optional callback to perform exchange or queue setup
AMQP.Queue.declare(channel, queue)
AMQP.Queue.declare(state.channel, state.queue)
:ok
end
Expand Down Expand Up @@ -99,6 +99,7 @@ defmodule Rabbit.Consumer do
| {:no_wait, boolean()}
| {:arguments, Keyword.t()}
| {:custom_meta, map()}
| {:setup_opts, setup_options()}
@type options :: [option()]
@type delivery_tag :: non_neg_integer()
@type action_options :: [{:multiple, boolean()} | {:requeue, boolean()}]
Expand All @@ -110,6 +111,7 @@ defmodule Rabbit.Consumer do
| {:reject, Rabbit.Message.t()}
| {:reject, Rabbit.Message.t(), action_options()}
| any()
@type setup_options :: keyword()

@doc """
A callback executed when the consumer is started.
Expand All @@ -125,23 +127,32 @@ defmodule Rabbit.Consumer do
@doc """
An optional callback executed after the channel is open, but before consumption.
The callback is called with an `AMQP.Channel`, as well as the queue that will
be consumed. At the most basic, you may want to declare the queue to ensure
its available. This will be entirely application dependent though.
The callback is called with the current state, containing the open channel and queue name if
given. At the most basic, you may want to declare the queue to ensure it's available. This will
be entirely application dependent though.
def handle_setup(channel, queue) do
AMQP.Queue.declare(channel, queue)
def handle_setup(state) do
AMQP.Queue.declare(state.channel, state.queue)
:ok
end
The callback must return an `:ok` atom - otherise it will be marked as failed,
and the consumer will attempt to go through the connection setup process again.
Important keys from the state include:
* `:connection` - the `Rabbit.Connection` module in use.
* `:channel` - the `AMQP.Channel` open for this consumer.
* `:queue` - the queue name.
* `:setup_opts` - as provided to `start_link/3`.
Return either `:ok` or `{:ok, new_state}` for success, the latter will update the state.
If another value is returned it will be marked as failed, and the consumer will attempt to go
through the connection setup process again.
Alternatively, you could use a `Rabbit.Topology` process to perform this
setup work. Please see its docs for more information.
"""
@callback handle_setup(channel :: AMQP.Channel.t(), queue :: String.t()) :: :ok | :error
@callback handle_setup(state :: map) :: :ok | {:ok, new_state :: map()} | :error

@doc """
A callback executed to handle message consumption.
Expand Down Expand Up @@ -176,7 +187,7 @@ defmodule Rabbit.Consumer do
"""
@callback handle_error(message :: Rabbit.Message.t()) :: message_response()

@optional_callbacks handle_setup: 2
@optional_callbacks handle_setup: 1

################################
# Public API
Expand Down Expand Up @@ -205,6 +216,7 @@ defmodule Rabbit.Consumer do
* `:arguments` - A set of arguments for the consumer.
* `:custom_meta` - A map of custom data that will be included in each `Rabbit.Message`
handled by the consumer.
* `:setup_opts` - A keyword list of custom options for use in `c:handle_setup/1`.
## Server Options
Expand Down
18 changes: 13 additions & 5 deletions lib/rabbit/consumer/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Rabbit.Consumer.Server do

@opts_schema %{
connection: [type: [:tuple, :pid, :atom], required: true],
queue: [type: :binary, required: true],
queue: [type: :binary, required: false],
prefetch_count: [type: :integer, default: 1],
prefetch_size: [type: :integer, default: 0],
consumer_tag: [type: :binary, default: ""],
Expand All @@ -19,7 +19,8 @@ defmodule Rabbit.Consumer.Server do
no_wait: [type: :boolean, default: false],
arguments: [type: :list, default: []],
timeout: [type: [:integer, :atom], required: false],
custom_meta: [type: :map, default: %{}]
custom_meta: [type: :map, default: %{}],
setup_opts: [type: :list, default: [], required: false]
}

@qos_opts [
Expand Down Expand Up @@ -90,6 +91,7 @@ defmodule Rabbit.Consumer.Server do
def handle_continue(:consume, state) do
case consume(state) do
{:ok, state} -> {:noreply, state}
{:error, :no_queue_given} -> {:stop, :no_queue_given, state}
{:error, state} -> {:noreply, state, {:continue, :restart_delay}}
end
end
Expand Down Expand Up @@ -168,7 +170,8 @@ defmodule Rabbit.Consumer.Server do
qos_opts: Keyword.take(opts, @qos_opts),
consume_opts: Keyword.take(opts, @consume_opts),
worker_opts: Keyword.take(opts, @worker_opts),
custom_meta: Keyword.get(opts, :custom_meta)
custom_meta: Keyword.get(opts, :custom_meta),
setup_opts: Keyword.get(opts, :setup_opts)
}
end

Expand Down Expand Up @@ -206,12 +209,16 @@ defmodule Rabbit.Consumer.Server do
defp handle_setup(%{setup_run: true} = state), do: {:ok, state}

defp handle_setup(state) do
if function_exported?(state.module, :handle_setup, 2) do
case state.module.handle_setup(state.channel, state.queue) do
if function_exported?(state.module, :handle_setup, 1) do
case state.module.handle_setup(state) do
:ok ->
state = %{state | setup_run: true}
{:ok, state}

{:ok, state} ->
state = %{state | setup_run: true}
{:ok, state}

error ->
log_error(state, error)
{:error, state}
Expand All @@ -222,6 +229,7 @@ defmodule Rabbit.Consumer.Server do
end

defp consume(%{consuming: true} = state), do: {:ok, state}
defp consume(%{queue: nil}), do: {:error, :no_queue_given}

defp consume(state) do
with :ok <- AMQP.Basic.qos(state.channel, state.qos_opts),
Expand Down
25 changes: 17 additions & 8 deletions lib/rabbit/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ defmodule Rabbit.Producer do
| {:sync_start_delay, non_neg_integer()}
| {:sync_start_max, non_neg_integer()}
| {:publish_opts, publish_options()}
| {:setup_opts, setup_options()}
@type options :: [option()]
@type exchange :: String.t()
@type routing_key :: String.t()
Expand All @@ -103,6 +104,7 @@ defmodule Rabbit.Producer do
| {:user_id, String.t()}
| {:app_id, String.t()}
@type publish_options :: [publish_option()]
@type setup_options :: keyword()

@doc """
A callback executed by each component of the producer pool.
Expand Down Expand Up @@ -131,22 +133,28 @@ defmodule Rabbit.Producer do
@doc """
An optional callback executed after the channel is open for each producer.
The callback is called with an `AMQP.Channel`. At the most basic, you may want
to declare queues that you will be publishing to.
The callback is called with the current state. At the most basic, you may want to declare queues
that you will be publishing to.
def handle_setup(channel) do
AMQP.Queue.declare(channel, "some_queue")
def handle_setup(state) do
AMQP.Queue.declare(state.channel, "some_queue")
:ok
end
The callback must return an `:ok` atom - otherise it will be marked as failed,
and the producer will attempt to go through the channel setup process again.
Important keys in the state include:
* `:connection` - the `Rabbit.Connection` process.
* `:channel` - the opened `AMQP.Channel` channel.
* `:setup_opts` - options provided undert the key `:setup_opts` to `start_link/3`.
The callback must return an `:ok` atom or `{:ok, state}` tuple - otherise it will be marked as
failed, and the producer will attempt to go through the channel setup process again.
Alternatively, you could use a `Rabbit.Topology` process to perform this
setup work. Please see its docs for more information.
"""
@callback handle_setup(channel :: AMQP.Channel.t()) :: :ok | :error
@callback handle_setup(state :: map()) :: :ok | {:ok, map()} | :error

@optional_callbacks handle_setup: 1

Expand Down Expand Up @@ -174,6 +182,7 @@ defmodule Rabbit.Producer do
before proceeding with async start - defaults to `100`.
* `:publish_opts` - Any `t:publish_option/0` that is automatically set as a
default option value when calling `publish/6`.
* `:setup_opts` - a keyword list of values to provide to `c:handle_setup/1`.
## Server Options
Expand All @@ -192,7 +201,7 @@ defmodule Rabbit.Producer do
The function must accept a producer child pid.
"""
@spec transaction(Rabbit.Producer.t(), (Rabbit.Prodcuer.t() -> any())) :: any()
@spec transaction(Rabbit.Producer.t(), (Rabbit.Producer.t() -> any())) :: any()
def transaction(producer, fun) do
:poolboy.transaction(producer, &fun.(&1))
end
Expand Down
12 changes: 9 additions & 3 deletions lib/rabbit/producer/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ defmodule Rabbit.Producer.Server do
sync_start: [type: :boolean, required: true, default: true],
sync_start_delay: [type: :integer, required: true, default: 50],
sync_start_max: [type: :integer, required: true, default: 100],
publish_opts: [type: :list, default: []]
publish_opts: [type: :list, default: []],
setup_opts: [type: :list, required: false]
}

################################
Expand Down Expand Up @@ -131,7 +132,8 @@ defmodule Rabbit.Producer.Server do
channel_open: false,
setup_run: false,
restart_attempts: 0,
publish_opts: Keyword.get(opts, :publish_opts)
publish_opts: Keyword.get(opts, :publish_opts),
setup_opts: Keyword.get(opts, :setup_opts)
}
end

Expand Down Expand Up @@ -192,11 +194,15 @@ defmodule Rabbit.Producer.Server do

defp handle_setup(state) do
if function_exported?(state.module, :handle_setup, 1) do
case state.module.handle_setup(state.channel) do
case state.module.handle_setup(state) do
:ok ->
state = %{state | setup_run: true}
{:ok, state}

{:ok, state} ->
state = %{state | setup_run: true}
{:ok, state}

error ->
log_error(state, error)
{:error, state}
Expand Down
89 changes: 83 additions & 6 deletions test/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ defmodule Rabbit.ConsumerTest do
end

@impl Rabbit.Consumer
def handle_setup(chan, queue) do
def handle_setup(state) do
if is_pid(Process.whereis(:consumer_test)), do: send(:consumer_test, :handle_setup_callback)
AMQP.Queue.declare(chan, queue, auto_delete: true)
AMQP.Queue.purge(chan, queue)
AMQP.Queue.declare(state.channel, state.queue, auto_delete: true)
AMQP.Queue.purge(state.channel, state.queue)
:ok
end

Expand All @@ -52,6 +52,55 @@ defmodule Rabbit.ConsumerTest do
end
end

defmodule MinimalTestConsumer do
use Rabbit.Consumer

@impl Rabbit.Consumer
def init(:consumer, opts) do
{:ok, opts}
end

@impl Rabbit.Consumer
def handle_message(_msg) do
end

@impl Rabbit.Consumer
def handle_error(_) do
:ok
end
end

defmodule AdvancedSetupTestConsumer do
use Rabbit.Consumer

@impl Rabbit.Consumer
def init(:consumer, opts) do
{:ok, opts}
end

@impl Rabbit.Consumer
def handle_setup(state) do
%{channel: channel, setup_opts: setup_opts} = state
{:ok, %{queue: queue}} = AMQP.Queue.declare(channel)
# Declare an exchange as the default exchange cannot bind queues.
:ok = AMQP.Exchange.declare(channel, "topic_test", :topic)
:ok = AMQP.Queue.bind(channel, queue, "topic_test", routing_key: setup_opts[:routing_key])

send(setup_opts[:test_pid], :handle_advanced_setup_callback)

{:ok, %{state | queue: queue}}
end

@impl Rabbit.Consumer
def handle_message(_msg) do
end

@impl Rabbit.Consumer
def handle_error(_) do
:ok
end
end

setup do
{:ok, connection} = Connection.start_link(TestConnection)
{:ok, producer} = Producer.start_link(TestProducer, connection: connection)
Expand Down Expand Up @@ -129,13 +178,39 @@ defmodule Rabbit.ConsumerTest do
assert_receive :init_callback
end

test "consumer modules use handle_setup callback", meta do
test "consumer modules use handle_setup/2 callback", meta do
Process.register(self(), :consumer_test)

assert {:ok, _, _} = start_consumer(meta)
assert_receive :handle_setup_callback
end

test "consumer module uses handle_setup/1 callback", meta do
assert {:ok, _, _} =
start_consumer(AdvancedSetupTestConsumer, meta,
setup_opts: [test_pid: self(), routing_key: "routing.route"]
)

assert_receive :handle_advanced_setup_callback
end

test "handle_setup is optional if the queue already exists", meta do
state = connection_state(meta.connection)
{:ok, channel} = AMQP.Channel.open(state.connection)
queue = queue_name()
AMQP.Queue.declare(channel, queue, auto_delete: true)

assert {:ok, _, _} = start_consumer(MinimalTestConsumer, meta, queue: queue)
end

test "stops the server if the queue is not specified", meta do
Process.flag(:trap_exit, true)
{:ok, consumer} = Consumer.start_link(MinimalTestConsumer, connection: meta.connection)
Process.monitor(consumer)

assert_receive {:EXIT, _pid, :no_queue_given}
end

test "will ack messages based on return value", meta do
state = connection_state(meta.connection)
{:ok, channel} = AMQP.Channel.open(state.connection)
Expand All @@ -162,10 +237,12 @@ defmodule Rabbit.ConsumerTest do
assert msg.custom_meta == %{foo: "bar"}
end

defp start_consumer(meta, opts \\ []) do
defp start_consumer(meta, opts \\ []), do: start_consumer(TestConsumer, meta, opts)

defp start_consumer(module, meta, opts) do
queue = Keyword.get(opts, :queue, queue_name())
opts = [connection: meta.connection, queue: queue] ++ opts
{:ok, consumer} = Consumer.start_link(TestConsumer, opts)
{:ok, consumer} = Consumer.start_link(module, opts)
await_consuming(consumer)
{:ok, consumer, queue}
end
Expand Down
Loading

0 comments on commit a931b87

Please sign in to comment.