Skip to content

Commit

Permalink
Allow extra_consumer_args option to pass arguments to consumer modu…
Browse files Browse the repository at this point in the history
…le init
  • Loading branch information
mtrudel committed Feb 6, 2019
1 parent 29cd7a2 commit 75c8a95
Showing 1 changed file with 44 additions and 2 deletions.
46 changes: 44 additions & 2 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ defmodule KafkaEx.GenConsumer do
{:ok, %State{}}
end
def init(_topic, _partition, extra_args) do
{:ok, %State{}}
end
def handle_message_set(message_set, state) do
{:async_commit, %{state | messages: state.messages ++ message_set}}
end
Expand Down Expand Up @@ -204,6 +208,7 @@ defmodule KafkaEx.GenConsumer do
{:commit_interval, non_neg_integer}
| {:commit_threshold, non_neg_integer}
| {:auto_offset_reset, :none | :earliest | :latest}
| {:extra_consumer_args, map()}

@typedoc """
Options used when starting a `KafkaEx.GenConsumer`.
Expand All @@ -227,6 +232,28 @@ defmodule KafkaEx.GenConsumer do
@callback init(topic :: binary, partition :: non_neg_integer) ::
{:ok, state :: term}

@doc """
Invoked when the server is started. `start_link/5` will block until it
returns.
`topic` and `partition` are the arguments passed to `start_link/5`. They
identify the Kafka partition that the `KafkaEx.GenConsumer` will consume from.
`extra_args` is the value of the `extra_consumer_args` option to `start_link/5`.
The default implementation of this function calls `init/2`.
Returning `{:ok, state}` will cause `start_link/5` to return `{:ok, pid}` and
the process to start consuming from its assigned partition. `state` becomes
the consumer's state.
Any other return value will cause the `start_link/5` to return `{:error,
error}` and the process to exit.
"""
@callback init(topic :: binary, partition :: non_neg_integer, extra_args :: map()) ::
{:ok, state :: term}


@doc """
Invoked for each message set consumed from a Kafka topic partition.
Expand Down Expand Up @@ -285,6 +312,10 @@ defmodule KafkaEx.GenConsumer do
{:ok, nil}
end

def init(topic, partition, _extra_args) do
init(topic, partition)
end

def handle_call(msg, _from, consumer_state) do
# taken from the GenServer handle_call implementation
proc =
Expand Down Expand Up @@ -336,7 +367,7 @@ defmodule KafkaEx.GenConsumer do
{:noreply, consumer_state}
end

defoverridable init: 2, handle_call: 3, handle_cast: 2, handle_info: 2
defoverridable init: 2, init: 3, handle_call: 3, handle_cast: 2, handle_info: 2
end
end

Expand Down Expand Up @@ -392,6 +423,11 @@ defmodule KafkaEx.GenConsumer do
* `:fetch_options` - Optional keyword list that is passed along to the
`KafkaEx.fetch` call.
* `:extra_consumer_args` - Optional parameter that is passed along to the
`GenConsumer.init` call in the consumer module. Note that if `init/3` is not
implemented, the default implementation calls to `init/2`, dropping the extra
arguments.
Both `:commit_interval` and `:commit_threshold` default to the application
config (e.g., `Application.get_env/2`) if that value is present, or the
stated default if the application config is not present.
Expand Down Expand Up @@ -482,7 +518,13 @@ defmodule KafkaEx.GenConsumer do
Application.get_env(:kafka_ex, :auto_offset_reset, @auto_offset_reset)
)

{:ok, consumer_state} = consumer_module.init(topic, partition)
extra_consumer_args =
Keyword.get(
opts,
:extra_consumer_args
)

{:ok, consumer_state} = consumer_module.init(topic, partition, extra_consumer_args)
worker_opts = Keyword.take(opts, [:uris])

{:ok, worker_name} =
Expand Down

0 comments on commit 75c8a95

Please sign in to comment.