diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 9df2874a..c659eea6 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -200,6 +200,7 @@ defmodule KafkaEx.GenConsumer do """ @type option :: {:commit_interval, non_neg_integer} | {:commit_threshold, non_neg_integer} + | {:auto_offset_reset, :none | :earliest | :latest} @typedoc """ Options used when starting a `KafkaEx.GenConsumer`. @@ -297,11 +298,13 @@ defmodule KafkaEx.GenConsumer do :committed_offset, :acked_offset, :last_commit, + :auto_offset_reset, ] end @commit_interval 5_000 @commit_threshold 100 + @auto_offset_reset :none # Client API @@ -323,6 +326,11 @@ defmodule KafkaEx.GenConsumer do * `:commit_threshold` - Threshold number of messages consumed to commit offsets to the broker. Default 100. + * `:auto_offset_reset` - The policy for resetting offsets when an + `:offset_out_of_range` error occurs. `:earliest` will move the offset to + the oldest available, `:latest` moves to the most recent. If anything else + is specified, the error will simply be raised. + Both `:commit_interval` and `:commit_threshold` default to the application config (e.g., `Application.get_env/2`) if that value is present, or the stated default if the application config is not present. @@ -389,6 +397,11 @@ defmodule KafkaEx.GenConsumer do :commit_threshold, Application.get_env(:kafka_ex, :commit_threshold, @commit_threshold) ) + auto_offset_reset = Keyword.get( + opts, + :auto_offset_reset, + Application.get_env(:kafka_ex, :auto_offset_reset, @auto_offset_reset) + ) {:ok, consumer_state} = consumer_module.init(topic, partition) worker_opts = Keyword.take(opts, [:uris]) @@ -402,6 +415,7 @@ defmodule KafkaEx.GenConsumer do consumer_state: consumer_state, commit_interval: commit_interval, commit_threshold: commit_threshold, + auto_offset_reset: auto_offset_reset, worker_name: worker_name, group: group_name, topic: topic, @@ -475,7 +489,7 @@ defmodule KafkaEx.GenConsumer do %FetchResponse{ topic: ^topic, partitions: [ - response = %{error_code: :no_error, partition: ^partition} + response = %{error_code: error_code, partition: ^partition} ] } ] = KafkaEx.fetch( @@ -486,6 +500,14 @@ defmodule KafkaEx.GenConsumer do worker_name: worker_name ) + state = + case error_code do + :offset_out_of_range -> + handle_offset_out_of_range(state) + :no_error -> + state + end + case response do %{last_offset: nil, message_set: []} -> handle_commit(:async_commit, state) @@ -515,6 +537,41 @@ defmodule KafkaEx.GenConsumer do handle_commit(sync_status, state_out) end + defp handle_offset_out_of_range( + %State{ + worker_name: worker_name, + topic: topic, + partition: partition, + current_offset: current_offset, + committed_offset: committed_offset, + acked_offset: acked_offset, + auto_offset_reset: auto_offset_reset + } = state + ) do + [ + %OffsetResponse{ + topic: ^topic, + partition_offsets: [ + %{partition: ^partition, error_code: :no_error, offset: [offset]} + ] + } + ] = case auto_offset_reset do + :earliest -> + KafkaEx.earliest_offset(topic, partition, worker_name) + :latest -> + KafkaEx.latest_offset(topic, partition, worker_name) + _ -> + raise "Offset out of range while consuming topic #{topic}, partition #{partition}." + end + + %State{ + state | + current_offset: offset, + committed_offset: offset, + acked_offset: offset + } + end + defp handle_commit(:sync_commit, %State{} = state), do: commit(state) defp handle_commit( :async_commit,