diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 9dcd4263..1eac4519 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -137,6 +137,10 @@ defmodule KafkaEx.GenConsumer do {:ok, %State{}} end + def init(_topic, _partition, extra_args) do + {:ok, %State{}} + end + def handle_message_set(message_set, state) do {:async_commit, %{state | messages: state.messages ++ message_set}} end @@ -204,6 +208,7 @@ defmodule KafkaEx.GenConsumer do {:commit_interval, non_neg_integer} | {:commit_threshold, non_neg_integer} | {:auto_offset_reset, :none | :earliest | :latest} + | {:extra_consumer_args, map()} @typedoc """ Options used when starting a `KafkaEx.GenConsumer`. @@ -227,6 +232,28 @@ defmodule KafkaEx.GenConsumer do @callback init(topic :: binary, partition :: non_neg_integer) :: {:ok, state :: term} + @doc """ + Invoked when the server is started. `start_link/5` will block until it + returns. + + `topic` and `partition` are the arguments passed to `start_link/5`. They + identify the Kafka partition that the `KafkaEx.GenConsumer` will consume from. + + `extra_args` is the value of the `extra_consumer_args` option to `start_link/5`. + + The default implementation of this function calls `init/2`. + + Returning `{:ok, state}` will cause `start_link/5` to return `{:ok, pid}` and + the process to start consuming from its assigned partition. `state` becomes + the consumer's state. + + Any other return value will cause the `start_link/5` to return `{:error, + error}` and the process to exit. + """ + @callback init(topic :: binary, partition :: non_neg_integer, extra_args :: map()) :: + {:ok, state :: term} + + @doc """ Invoked for each message set consumed from a Kafka topic partition. @@ -285,6 +312,10 @@ defmodule KafkaEx.GenConsumer do {:ok, nil} end + def init(topic, partition, _extra_args) do + init(topic, partition) + end + def handle_call(msg, _from, consumer_state) do # taken from the GenServer handle_call implementation proc = @@ -336,7 +367,7 @@ defmodule KafkaEx.GenConsumer do {:noreply, consumer_state} end - defoverridable init: 2, handle_call: 3, handle_cast: 2, handle_info: 2 + defoverridable init: 2, init: 3, handle_call: 3, handle_cast: 2, handle_info: 2 end end @@ -392,6 +423,11 @@ defmodule KafkaEx.GenConsumer do * `:fetch_options` - Optional keyword list that is passed along to the `KafkaEx.fetch` call. + * `:extra_consumer_args` - Optional parameter that is passed along to the + `GenConsumer.init` call in the consumer module. Note that if `init/3` is not + implemented, the default implementation calls to `init/2`, dropping the extra + arguments. + Both `:commit_interval` and `:commit_threshold` default to the application config (e.g., `Application.get_env/2`) if that value is present, or the stated default if the application config is not present. @@ -482,7 +518,13 @@ defmodule KafkaEx.GenConsumer do Application.get_env(:kafka_ex, :auto_offset_reset, @auto_offset_reset) ) - {:ok, consumer_state} = consumer_module.init(topic, partition) + extra_consumer_args = + Keyword.get( + opts, + :extra_consumer_args + ) + + {:ok, consumer_state} = consumer_module.init(topic, partition, extra_consumer_args) worker_opts = Keyword.take(opts, [:uris]) {:ok, worker_name} =