diff --git a/README.md b/README.md index 27e8b43..d5853c0 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ The package can be installed by adding `rabbit` to your list of dependencies in ```elixir def deps do [ - {:rabbit, "~> 0.4"} + {:rabbit, "~> 0.5"} ] end ``` @@ -36,7 +36,7 @@ defmodule MyConnection do # Callbacks @impl Rabbit.Connection - def init(_type, opts) do + def init(:connection, opts) do # Perform runtime config uri = System.get_env("RABBITMQ_URI") || "amqp://guest:guest@127.0.0.1:5672" opts = Keyword.put(opts, :uri, uri) @@ -63,7 +63,7 @@ defmodule MyConsumer do # Callbacks @impl Rabbit.Consumer - def init(_type, opts) do + def init(:consumer, opts) do # Perform runtime config {:ok, opts} end @@ -161,8 +161,13 @@ defmodule MyProducer do # Callbacks @impl Rabbit.Producer + def init(:producer_pool, opts) do + # Perform runtime config for the producer pool + {:ok, opts} + end + def init(:producer, opts) do - # Perform runtime config + # Perform runtime config per producer {:ok, opts} end end diff --git a/lib/rabbit/connection/server.ex b/lib/rabbit/connection/server.ex index d0e288e..af42662 100644 --- a/lib/rabbit/connection/server.ex +++ b/lib/rabbit/connection/server.ex @@ -56,8 +56,8 @@ defmodule Rabbit.Connection.Server do @doc false @impl GenServer def init({module, opts}) do - with {:ok, opts} <- module.init(:connection, opts) do - opts = KeywordValidator.validate!(opts, @opts_schema) + with {:ok, opts} <- module.init(:connection, opts), + {:ok, opts} <- validate_opts(opts, @opts_schema) do state = init_state(opts) {:ok, state, {:continue, :connect}} end diff --git a/lib/rabbit/consumer/server.ex b/lib/rabbit/consumer/server.ex index af0ca2b..660967f 100644 --- a/lib/rabbit/consumer/server.ex +++ b/lib/rabbit/consumer/server.ex @@ -54,8 +54,8 @@ defmodule Rabbit.Consumer.Server do @doc false @impl GenServer def init({module, opts}) do - with {:ok, opts} <- module.init(:consumer, opts) do - opts = KeywordValidator.validate!(opts, @opts_schema) + with {:ok, opts} <- module.init(:consumer, opts), + {:ok, opts} <- validate_opts(opts, @opts_schema) do state = init_state(module, opts) {:ok, state, {:continue, :connection}} end diff --git a/lib/rabbit/initializer/server.ex b/lib/rabbit/initializer/server.ex index 1550ef2..519f3f4 100644 --- a/lib/rabbit/initializer/server.ex +++ b/lib/rabbit/initializer/server.ex @@ -66,8 +66,8 @@ defmodule Rabbit.Initializer.Server do @doc false @impl GenServer def init({module, opts}) do - with {:ok, opts} <- module.init(:initializer, opts) do - opts = KeywordValidator.validate!(opts, @opts_schema) + with {:ok, opts} <- module.init(:initializer, opts), + {:ok, opts} <- validate_opts(opts, @opts_schema) do state = init_state(opts) initialize(state) end diff --git a/lib/rabbit/producer.ex b/lib/rabbit/producer.ex index 8e7b5db..931ede0 100644 --- a/lib/rabbit/producer.ex +++ b/lib/rabbit/producer.ex @@ -40,8 +40,13 @@ defmodule Rabbit.Producer do # Callbacks @impl Rabbit.Producer - def init(_type, opts) do - # Perform any runtime configuration... + def init(:producer_pool, opts) do + # Perform any runtime configuration for the pool + {:ok, opts} + end + + def init(:producer, opts) do + # Perform any runtime configuration per producer {:ok, opts} end end @@ -76,6 +81,14 @@ defmodule Rabbit.Producer do @type exchange :: String.t() @type routing_key :: String.t() @type message :: term() + @type producer_option :: + {:connection, Rabbit.Connection.t()} + | {:publish_opts, publish_options()} + @type producer_options :: [producer_option()] + @type pool_option :: + {:pool_size, non_neg_integer()} + | {:max_overflow, non_neg_integer()} + @type pool_options :: [pool_option()] @type publish_option :: {:mandatory, boolean()} | {:immediate, boolean()} @@ -95,7 +108,20 @@ defmodule Rabbit.Producer do @type publish_options :: [publish_option()] @doc """ - A callback executed when the producer is started. + A callback executed by each component of the producer. + + Two versions of the callback must be created. One for the pool, and one + for the producers. The first argument differentiates the callback. + + # Initialize the pool + def init(:producer_pool, opts) do + {:ok, opts} + end + + # Initialize a single producer + def init(:producer, opts) do + {:ok, opts} + end Returning `{:ok, opts}` - where `opts` is a keyword list of `t:option()` will, cause `start_link/3` to return `{:ok, pid}` and the process to enter its loop. @@ -103,7 +129,8 @@ defmodule Rabbit.Producer do Returning `:ignore` will cause `start_link/3` to return `:ignore` and the process will exit normally without entering the loop """ - @callback init(:producer, options()) :: {:ok, options()} | :ignore + @callback init(:producer_pool | :producer, options()) :: + {:ok, pool_options() | producer_options()} | :ignore ################################ # Public API diff --git a/lib/rabbit/producer/pool.ex b/lib/rabbit/producer/pool.ex index ee4e7f3..437ecd4 100644 --- a/lib/rabbit/producer/pool.ex +++ b/lib/rabbit/producer/pool.ex @@ -5,23 +5,34 @@ defmodule Rabbit.Producer.Pool do # Public API ################################ + @opts_schema %{ + pool_size: [type: :integer, required: true, default: 1], + max_overflow: [type: :integer, required: true, default: 0] + } @worker_opts [ :connection, - :publish_opts, - :async_connect + :publish_opts ] @doc false def start_link(module, opts \\ [], server_opts \\ []) do - pool_opts = get_pool_opts(opts, server_opts) worker_opts = get_worker_opts(module, opts) - :poolboy.start_link(pool_opts, worker_opts) + + with {:ok, opts} <- module.init(:producer_pool, opts), + {:ok, opts} <- validate_opts(opts) do + pool_opts = get_pool_opts(opts, server_opts) + :poolboy.start_link(pool_opts, worker_opts) + end end ################################ # Private API ################################ + defp validate_opts(opts) do + KeywordValidator.validate(opts, @opts_schema, strict: false) + end + defp get_pool_opts(opts, server_opts) do [ {:worker_module, Rabbit.Producer.Server}, diff --git a/lib/rabbit/producer/server.ex b/lib/rabbit/producer/server.ex index 5ba96b5..6373b44 100644 --- a/lib/rabbit/producer/server.ex +++ b/lib/rabbit/producer/server.ex @@ -34,8 +34,8 @@ defmodule Rabbit.Producer.Server do @doc false @impl GenServer def init({module, opts}) do - with {:ok, opts} <- module.init(:producer, opts) do - opts = KeywordValidator.validate!(opts, @opts_schema) + with {:ok, opts} <- module.init(:producer, opts), + {:ok, opts} <- validate_opts(opts, @opts_schema) do state = init_state(opts) {:ok, state, {:continue, :connection}} end diff --git a/lib/rabbit/utilities.ex b/lib/rabbit/utilities.ex index 73fa0a8..36e96d1 100644 --- a/lib/rabbit/utilities.ex +++ b/lib/rabbit/utilities.ex @@ -8,4 +8,13 @@ defmodule Rabbit.Utilities do |> Process.info() |> Keyword.get(:registered_name, self()) end + + @doc false + @spec validate_opts(keyword(), map()) :: {:ok, keyword()} | {:error, keyword()} + def validate_opts(opts, schema) do + case KeywordValidator.validate(opts, schema) do + {:ok, _} = result -> result + {:error, reason} -> {:stop, reason} + end + end end diff --git a/lib/rabbit/worker/executer.ex b/lib/rabbit/worker/executer.ex index 42bdffd..5821c52 100644 --- a/lib/rabbit/worker/executer.ex +++ b/lib/rabbit/worker/executer.ex @@ -3,6 +3,8 @@ defmodule Rabbit.Worker.Executer do use GenServer + import Rabbit.Utilities + require Logger @opts_schema %{ @@ -34,10 +36,11 @@ defmodule Rabbit.Worker.Executer do @doc false @impl GenServer def init({message, opts}) do - opts = KeywordValidator.validate!(opts, @opts_schema) - state = init_state(message, opts) - set_timeout(state.timeout) - {:ok, state, {:continue, :run}} + with {:ok, opts} <- validate_opts(opts, @opts_schema) do + state = init_state(message, opts) + set_timeout(state.timeout) + {:ok, state, {:continue, :run}} + end end @doc false diff --git a/mix.exs b/mix.exs index 7a3a194..76c7058 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Rabbit.MixProject do use Mix.Project - @version "0.4.0" + @version "0.5.0" def project do [ diff --git a/test/connection_test.exs b/test/connection_test.exs index ad13426..e342d90 100644 --- a/test/connection_test.exs +++ b/test/connection_test.exs @@ -29,6 +29,10 @@ defmodule Rabbit.ConnectionTest do assert {:ok, connection} = Connection.start_link(TestConnection, [], name: :foo) assert true = Connection.alive?(:foo) end + + test "returns error when given bad connection options" do + assert {:error, _} = Connection.start_link(TestConnection, uri: 1) + end end describe "stop/1" do diff --git a/test/consumer_test.exs b/test/consumer_test.exs index 76358ae..90264e7 100644 --- a/test/consumer_test.exs +++ b/test/consumer_test.exs @@ -16,7 +16,7 @@ defmodule Rabbit.ConsumerTest do use Rabbit.Producer @impl Rabbit.Producer - def init(:producer, opts) do + def init(_type, opts) do {:ok, opts} end end @@ -62,6 +62,10 @@ defmodule Rabbit.ConsumerTest do assert {:ok, _con} = Consumer.start_link(TestConsumer, connection: meta.connection, queue: "consumer") end + + test "returns error when given bad consumer options" do + assert {:error, _} = Consumer.start_link(TestConsumer, connection: 1) + end end describe "stop/1" do diff --git a/test/initializer_test.exs b/test/initializer_test.exs index 57055a8..c4a6754 100644 --- a/test/initializer_test.exs +++ b/test/initializer_test.exs @@ -43,7 +43,7 @@ defmodule Rabbit.InitializerTest do use Rabbit.Producer @impl Rabbit.Producer - def init(:producer, opts) do + def init(_type, opts) do {:ok, opts} end end @@ -114,6 +114,10 @@ defmodule Rabbit.InitializerTest do assert {:error, :no_connection} = Initializer.start_link(TestInitializer, connection: connection, retry_max: 1) end + + test "returns error when given bad initializer options" do + assert {:error, _} = Initializer.start_link(TestInitializer, connection: 1) + end end defp publish_message(meta, exchange, routing_key, opts \\ []) do diff --git a/test/producer_test.exs b/test/producer_test.exs index aaefe99..2f038da 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -16,7 +16,7 @@ defmodule Rabbit.ProducerTest do use Rabbit.Producer @impl Rabbit.Producer - def init(:producer, opts) do + def init(_type, opts) do {:ok, opts} end end @@ -40,6 +40,16 @@ defmodule Rabbit.ProducerTest do assert [_, _, _] = GenServer.call(producer, :get_avail_workers) end + + test "returns error when given bad pool options" do + assert {:error, _} = Producer.start_link(TestProducer, pool_size: "foo") + end + + test "returns error when given bad producer options" do + Process.flag(:trap_exit, true) + Producer.start_link(TestProducer, connection: "foo") + assert_receive {:EXIT, _, _} + end end describe "stop/1" do @@ -111,7 +121,7 @@ defmodule Rabbit.ProducerTest do use Rabbit.Producer @impl Rabbit.Producer - def init(:producer, opts) do + def init(_type, opts) do send(:producer_test, :init_callback) {:ok, opts} end