diff --git a/lib/rabbit/consumer.ex b/lib/rabbit/consumer.ex index bfcb1e7..7130904 100644 --- a/lib/rabbit/consumer.ex +++ b/lib/rabbit/consumer.ex @@ -6,7 +6,7 @@ defmodule Rabbit.Consumer do `AMQP.Channel` and provide the following benefits: * Durability during connection and channel failures through use of expotential backoff. - * Easy runtime setup through the `c:init/2` and `c:handle_setup/2` callbacks. + * Easy runtime setup through the `c:init/2` and `c:handle_setup/1` callbacks. * Automatic acknowledgements based on the return value of the `c:handle_message/1` callback. * Ability to handle exceptions through the `c:handle_error/1` callback. * Each message is executed within its own supervised task. @@ -49,9 +49,9 @@ defmodule Rabbit.Consumer do end @impl Rabbit.Consumer - def handle_setup(channel, queue) do + def handle_setup(state) do # Optional callback to perform exchange or queue setup - AMQP.Queue.declare(channel, queue) + AMQP.Queue.declare(state.channel, state.queue) :ok end @@ -99,6 +99,7 @@ defmodule Rabbit.Consumer do | {:no_wait, boolean()} | {:arguments, Keyword.t()} | {:custom_meta, map()} + | {:setup_opts, setup_options()} @type options :: [option()] @type delivery_tag :: non_neg_integer() @type action_options :: [{:multiple, boolean()} | {:requeue, boolean()}] @@ -110,6 +111,7 @@ defmodule Rabbit.Consumer do | {:reject, Rabbit.Message.t()} | {:reject, Rabbit.Message.t(), action_options()} | any() + @type setup_options :: keyword() @doc """ A callback executed when the consumer is started. @@ -125,23 +127,32 @@ defmodule Rabbit.Consumer do @doc """ An optional callback executed after the channel is open, but before consumption. - The callback is called with an `AMQP.Channel`, as well as the queue that will - be consumed. At the most basic, you may want to declare the queue to ensure - its available. This will be entirely application dependent though. + The callback is called with the current state, containing the open channel and queue name if + given. At the most basic, you may want to declare the queue to ensure it's available. This will + be entirely application dependent though. - def handle_setup(channel, queue) do - AMQP.Queue.declare(channel, queue) + def handle_setup(state) do + AMQP.Queue.declare(state.channel, state.queue) :ok end - The callback must return an `:ok` atom - otherise it will be marked as failed, - and the consumer will attempt to go through the connection setup process again. + Important keys from the state include: + + * `:connection` - the `Rabbit.Connection` module in use. + * `:channel` - the `AMQP.Channel` open for this consumer. + * `:queue` - the queue name. + * `:setup_opts` - as provided to `start_link/3`. + + Return either `:ok` or `{:ok, new_state}` for success, the latter will update the state. + + If another value is returned it will be marked as failed, and the consumer will attempt to go + through the connection setup process again. Alternatively, you could use a `Rabbit.Topology` process to perform this setup work. Please see its docs for more information. """ - @callback handle_setup(channel :: AMQP.Channel.t(), queue :: String.t()) :: :ok | :error + @callback handle_setup(state :: map) :: :ok | {:ok, new_state :: map()} | :error @doc """ A callback executed to handle message consumption. @@ -176,7 +187,7 @@ defmodule Rabbit.Consumer do """ @callback handle_error(message :: Rabbit.Message.t()) :: message_response() - @optional_callbacks handle_setup: 2 + @optional_callbacks handle_setup: 1 ################################ # Public API @@ -205,6 +216,7 @@ defmodule Rabbit.Consumer do * `:arguments` - A set of arguments for the consumer. * `:custom_meta` - A map of custom data that will be included in each `Rabbit.Message` handled by the consumer. + * `:setup_opts` - A keyword list of custom options for use in `c:handle_setup/1`. ## Server Options diff --git a/lib/rabbit/consumer/server.ex b/lib/rabbit/consumer/server.ex index ecd919c..4ce405e 100644 --- a/lib/rabbit/consumer/server.ex +++ b/lib/rabbit/consumer/server.ex @@ -9,7 +9,7 @@ defmodule Rabbit.Consumer.Server do @opts_schema %{ connection: [type: [:tuple, :pid, :atom], required: true], - queue: [type: :binary, required: true], + queue: [type: :binary, required: false], prefetch_count: [type: :integer, default: 1], prefetch_size: [type: :integer, default: 0], consumer_tag: [type: :binary, default: ""], @@ -19,7 +19,8 @@ defmodule Rabbit.Consumer.Server do no_wait: [type: :boolean, default: false], arguments: [type: :list, default: []], timeout: [type: [:integer, :atom], required: false], - custom_meta: [type: :map, default: %{}] + custom_meta: [type: :map, default: %{}], + setup_opts: [type: :list, default: [], required: false] } @qos_opts [ @@ -90,6 +91,7 @@ defmodule Rabbit.Consumer.Server do def handle_continue(:consume, state) do case consume(state) do {:ok, state} -> {:noreply, state} + {:error, :no_queue_given} -> {:stop, :no_queue_given, state} {:error, state} -> {:noreply, state, {:continue, :restart_delay}} end end @@ -168,7 +170,8 @@ defmodule Rabbit.Consumer.Server do qos_opts: Keyword.take(opts, @qos_opts), consume_opts: Keyword.take(opts, @consume_opts), worker_opts: Keyword.take(opts, @worker_opts), - custom_meta: Keyword.get(opts, :custom_meta) + custom_meta: Keyword.get(opts, :custom_meta), + setup_opts: Keyword.get(opts, :setup_opts) } end @@ -206,12 +209,16 @@ defmodule Rabbit.Consumer.Server do defp handle_setup(%{setup_run: true} = state), do: {:ok, state} defp handle_setup(state) do - if function_exported?(state.module, :handle_setup, 2) do - case state.module.handle_setup(state.channel, state.queue) do + if function_exported?(state.module, :handle_setup, 1) do + case state.module.handle_setup(state) do :ok -> state = %{state | setup_run: true} {:ok, state} + {:ok, state} -> + state = %{state | setup_run: true} + {:ok, state} + error -> log_error(state, error) {:error, state} @@ -222,6 +229,7 @@ defmodule Rabbit.Consumer.Server do end defp consume(%{consuming: true} = state), do: {:ok, state} + defp consume(%{queue: nil}), do: {:error, :no_queue_given} defp consume(state) do with :ok <- AMQP.Basic.qos(state.channel, state.qos_opts), diff --git a/lib/rabbit/producer.ex b/lib/rabbit/producer.ex index 1ac72c4..acf9b01 100644 --- a/lib/rabbit/producer.ex +++ b/lib/rabbit/producer.ex @@ -82,6 +82,7 @@ defmodule Rabbit.Producer do | {:sync_start_delay, non_neg_integer()} | {:sync_start_max, non_neg_integer()} | {:publish_opts, publish_options()} + | {:setup_opts, setup_options()} @type options :: [option()] @type exchange :: String.t() @type routing_key :: String.t() @@ -103,6 +104,7 @@ defmodule Rabbit.Producer do | {:user_id, String.t()} | {:app_id, String.t()} @type publish_options :: [publish_option()] + @type setup_options :: keyword() @doc """ A callback executed by each component of the producer pool. @@ -131,22 +133,28 @@ defmodule Rabbit.Producer do @doc """ An optional callback executed after the channel is open for each producer. - The callback is called with an `AMQP.Channel`. At the most basic, you may want - to declare queues that you will be publishing to. + The callback is called with the current state. At the most basic, you may want to declare queues + that you will be publishing to. - def handle_setup(channel) do - AMQP.Queue.declare(channel, "some_queue") + def handle_setup(state) do + AMQP.Queue.declare(state.channel, "some_queue") :ok end - The callback must return an `:ok` atom - otherise it will be marked as failed, - and the producer will attempt to go through the channel setup process again. + Important keys in the state include: + + * `:connection` - the `Rabbit.Connection` process. + * `:channel` - the opened `AMQP.Channel` channel. + * `:setup_opts` - options provided undert the key `:setup_opts` to `start_link/3`. + + The callback must return an `:ok` atom or `{:ok, state}` tuple - otherise it will be marked as + failed, and the producer will attempt to go through the channel setup process again. Alternatively, you could use a `Rabbit.Topology` process to perform this setup work. Please see its docs for more information. """ - @callback handle_setup(channel :: AMQP.Channel.t()) :: :ok | :error + @callback handle_setup(state :: map()) :: :ok | {:ok, map()} | :error @optional_callbacks handle_setup: 1 @@ -174,6 +182,7 @@ defmodule Rabbit.Producer do before proceeding with async start - defaults to `100`. * `:publish_opts` - Any `t:publish_option/0` that is automatically set as a default option value when calling `publish/6`. + * `:setup_opts` - a keyword list of values to provide to `c:handle_setup/1`. ## Server Options @@ -192,7 +201,7 @@ defmodule Rabbit.Producer do The function must accept a producer child pid. """ - @spec transaction(Rabbit.Producer.t(), (Rabbit.Prodcuer.t() -> any())) :: any() + @spec transaction(Rabbit.Producer.t(), (Rabbit.Producer.t() -> any())) :: any() def transaction(producer, fun) do :poolboy.transaction(producer, &fun.(&1)) end diff --git a/lib/rabbit/producer/server.ex b/lib/rabbit/producer/server.ex index 1a7d0fe..7abf361 100644 --- a/lib/rabbit/producer/server.ex +++ b/lib/rabbit/producer/server.ex @@ -12,7 +12,8 @@ defmodule Rabbit.Producer.Server do sync_start: [type: :boolean, required: true, default: true], sync_start_delay: [type: :integer, required: true, default: 50], sync_start_max: [type: :integer, required: true, default: 100], - publish_opts: [type: :list, default: []] + publish_opts: [type: :list, default: []], + setup_opts: [type: :list, required: false] } ################################ @@ -131,7 +132,8 @@ defmodule Rabbit.Producer.Server do channel_open: false, setup_run: false, restart_attempts: 0, - publish_opts: Keyword.get(opts, :publish_opts) + publish_opts: Keyword.get(opts, :publish_opts), + setup_opts: Keyword.get(opts, :setup_opts) } end @@ -192,11 +194,15 @@ defmodule Rabbit.Producer.Server do defp handle_setup(state) do if function_exported?(state.module, :handle_setup, 1) do - case state.module.handle_setup(state.channel) do + case state.module.handle_setup(state) do :ok -> state = %{state | setup_run: true} {:ok, state} + {:ok, state} -> + state = %{state | setup_run: true} + {:ok, state} + error -> log_error(state, error) {:error, state} diff --git a/test/consumer_test.exs b/test/consumer_test.exs index c402c86..234a0fa 100644 --- a/test/consumer_test.exs +++ b/test/consumer_test.exs @@ -31,10 +31,10 @@ defmodule Rabbit.ConsumerTest do end @impl Rabbit.Consumer - def handle_setup(chan, queue) do + def handle_setup(state) do if is_pid(Process.whereis(:consumer_test)), do: send(:consumer_test, :handle_setup_callback) - AMQP.Queue.declare(chan, queue, auto_delete: true) - AMQP.Queue.purge(chan, queue) + AMQP.Queue.declare(state.channel, state.queue, auto_delete: true) + AMQP.Queue.purge(state.channel, state.queue) :ok end @@ -52,6 +52,55 @@ defmodule Rabbit.ConsumerTest do end end + defmodule MinimalTestConsumer do + use Rabbit.Consumer + + @impl Rabbit.Consumer + def init(:consumer, opts) do + {:ok, opts} + end + + @impl Rabbit.Consumer + def handle_message(_msg) do + end + + @impl Rabbit.Consumer + def handle_error(_) do + :ok + end + end + + defmodule AdvancedSetupTestConsumer do + use Rabbit.Consumer + + @impl Rabbit.Consumer + def init(:consumer, opts) do + {:ok, opts} + end + + @impl Rabbit.Consumer + def handle_setup(state) do + %{channel: channel, setup_opts: setup_opts} = state + {:ok, %{queue: queue}} = AMQP.Queue.declare(channel) + # Declare an exchange as the default exchange cannot bind queues. + :ok = AMQP.Exchange.declare(channel, "topic_test", :topic) + :ok = AMQP.Queue.bind(channel, queue, "topic_test", routing_key: setup_opts[:routing_key]) + + send(setup_opts[:test_pid], :handle_advanced_setup_callback) + + {:ok, %{state | queue: queue}} + end + + @impl Rabbit.Consumer + def handle_message(_msg) do + end + + @impl Rabbit.Consumer + def handle_error(_) do + :ok + end + end + setup do {:ok, connection} = Connection.start_link(TestConnection) {:ok, producer} = Producer.start_link(TestProducer, connection: connection) @@ -129,13 +178,39 @@ defmodule Rabbit.ConsumerTest do assert_receive :init_callback end - test "consumer modules use handle_setup callback", meta do + test "consumer modules use handle_setup/2 callback", meta do Process.register(self(), :consumer_test) assert {:ok, _, _} = start_consumer(meta) assert_receive :handle_setup_callback end + test "consumer module uses handle_setup/1 callback", meta do + assert {:ok, _, _} = + start_consumer(AdvancedSetupTestConsumer, meta, + setup_opts: [test_pid: self(), routing_key: "routing.route"] + ) + + assert_receive :handle_advanced_setup_callback + end + + test "handle_setup is optional if the queue already exists", meta do + state = connection_state(meta.connection) + {:ok, channel} = AMQP.Channel.open(state.connection) + queue = queue_name() + AMQP.Queue.declare(channel, queue, auto_delete: true) + + assert {:ok, _, _} = start_consumer(MinimalTestConsumer, meta, queue: queue) + end + + test "stops the server if the queue is not specified", meta do + Process.flag(:trap_exit, true) + {:ok, consumer} = Consumer.start_link(MinimalTestConsumer, connection: meta.connection) + Process.monitor(consumer) + + assert_receive {:EXIT, _pid, :no_queue_given} + end + test "will ack messages based on return value", meta do state = connection_state(meta.connection) {:ok, channel} = AMQP.Channel.open(state.connection) @@ -162,10 +237,12 @@ defmodule Rabbit.ConsumerTest do assert msg.custom_meta == %{foo: "bar"} end - defp start_consumer(meta, opts \\ []) do + defp start_consumer(meta, opts \\ []), do: start_consumer(TestConsumer, meta, opts) + + defp start_consumer(module, meta, opts) do queue = Keyword.get(opts, :queue, queue_name()) opts = [connection: meta.connection, queue: queue] ++ opts - {:ok, consumer} = Consumer.start_link(TestConsumer, opts) + {:ok, consumer} = Consumer.start_link(module, opts) await_consuming(consumer) {:ok, consumer, queue} end diff --git a/test/producer_test.exs b/test/producer_test.exs index 013276c..aebbf2b 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -160,7 +160,7 @@ defmodule Rabbit.ProducerTest do assert_receive :init_callback end - test "producer modules use handle_setup callback" do + test "producer modules use handle_setup/1 callback" do Process.register(self(), :producer_test) defmodule TestProducerFour do @@ -172,15 +172,16 @@ defmodule Rabbit.ProducerTest do end @impl Rabbit.Producer - def handle_setup(_channel) do - send(:producer_test, :handle_setup_callback) + def handle_setup(state) do + send(:producer_test, {:handle_setup_callback, state}) :ok end end assert {:ok, connection} = Connection.start_link(TestConnection) assert {:ok, _producer} = Producer.start_link(TestProducerFour, connection: connection) - assert_receive :handle_setup_callback + assert_receive {:handle_setup_callback, %{channel: channel}} + assert channel end def await_publishing(producer) do diff --git a/test/topology_test.exs b/test/topology_test.exs index 6894802..f6400e2 100644 --- a/test/topology_test.exs +++ b/test/topology_test.exs @@ -69,11 +69,6 @@ defmodule Rabbit.TopologyTest do {:ok, opts} end - @impl Rabbit.Consumer - def handle_setup(_chan, _queue) do - :ok - end - @impl Rabbit.Consumer def handle_message(msg) do decoded_payload = Base.decode64!(msg.payload)