Skip to content

Commit

Permalink
Make Consumer queue opt optional
Browse files Browse the repository at this point in the history
The queue name might be set dynamically in `handle_setup` instead of
supplying the value on start. If there is no queue name by the time we
start consuming then stop the server.
  • Loading branch information
Odaeus committed Jan 14, 2021
1 parent 15d5a6c commit 26c3d69
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
4 changes: 3 additions & 1 deletion lib/rabbit/consumer/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Rabbit.Consumer.Server do

@opts_schema %{
connection: [type: [:tuple, :pid, :atom], required: true],
queue: [type: :binary, required: true],
queue: [type: :binary, required: false],
prefetch_count: [type: :integer, default: 1],
prefetch_size: [type: :integer, default: 0],
consumer_tag: [type: :binary, default: ""],
Expand Down Expand Up @@ -91,6 +91,7 @@ defmodule Rabbit.Consumer.Server do
def handle_continue(:consume, state) do
case consume(state) do
{:ok, state} -> {:noreply, state}
{:error, :no_queue_given} -> {:stop, :no_queue_given, state}
{:error, state} -> {:noreply, state, {:continue, :restart_delay}}
end
end
Expand Down Expand Up @@ -239,6 +240,7 @@ defmodule Rabbit.Consumer.Server do
end

defp consume(%{consuming: true} = state), do: {:ok, state}
defp consume(%{queue: nil}), do: {:error, :no_queue_given}

defp consume(state) do
with :ok <- AMQP.Basic.qos(state.channel, state.qos_opts),
Expand Down
8 changes: 8 additions & 0 deletions test/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ defmodule Rabbit.ConsumerTest do
assert {:ok, _, _} = start_consumer(MinimalTestConsumer, meta, queue: queue)
end

test "stops the server if the queue is not specified", meta do
Process.flag(:trap_exit, true)
{:ok, consumer} = Consumer.start_link(MinimalTestConsumer, connection: meta.connection)
Process.monitor(consumer)

assert_receive {:EXIT, _pid, :no_queue_given}
end

test "will ack messages based on return value", meta do
state = connection_state(meta.connection)
{:ok, channel} = AMQP.Channel.open(state.connection)
Expand Down

0 comments on commit 26c3d69

Please sign in to comment.