From d766a022b4433ce74e8b0ab095244d87fc9a4c14 Mon Sep 17 00:00:00 2001 From: Michael Oliver Date: Thu, 14 Dec 2017 15:35:24 -0800 Subject: [PATCH 1/3] Add auto_offset_reset option to consumer implementation --- lib/kafka_ex/gen_consumer.ex | 58 +++++++++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 9df2874a..3717a594 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -297,11 +297,13 @@ defmodule KafkaEx.GenConsumer do :committed_offset, :acked_offset, :last_commit, + :auto_offset_reset, ] end @commit_interval 5_000 @commit_threshold 100 + @auto_offset_reset :none # Client API @@ -323,6 +325,11 @@ defmodule KafkaEx.GenConsumer do * `:commit_threshold` - Threshold number of messages consumed to commit offsets to the broker. Default 100. + * `:auto_offset_reset` - The policy for resetting offsets when an + `:offset_out_of_range` error occurs. `:earliest` will move the offset to + the oldest available, `:latest` moves to the most recent. If anything else + is specified, the error will simply be raised. + Both `:commit_interval` and `:commit_threshold` default to the application config (e.g., `Application.get_env/2`) if that value is present, or the stated default if the application config is not present. @@ -389,6 +396,11 @@ defmodule KafkaEx.GenConsumer do :commit_threshold, Application.get_env(:kafka_ex, :commit_threshold, @commit_threshold) ) + auto_offset_reset = Keyword.get( + opts, + :auto_offset_reset, + Application.get_env(:kafka_ex, :auto_offset_reset, @auto_offset_reset) + ) {:ok, consumer_state} = consumer_module.init(topic, partition) worker_opts = Keyword.take(opts, [:uris]) @@ -402,6 +414,7 @@ defmodule KafkaEx.GenConsumer do consumer_state: consumer_state, commit_interval: commit_interval, commit_threshold: commit_threshold, + auto_offset_reset: auto_offset_reset, worker_name: worker_name, group: group_name, topic: topic, @@ -475,7 +488,7 @@ defmodule KafkaEx.GenConsumer do %FetchResponse{ topic: ^topic, partitions: [ - response = %{error_code: :no_error, partition: ^partition} + response = %{error_code: error_code, partition: ^partition} ] } ] = KafkaEx.fetch( @@ -486,6 +499,14 @@ defmodule KafkaEx.GenConsumer do worker_name: worker_name ) + state = + case error_code do + :offset_out_of_range -> + handle_offset_out_of_range(state) + :no_error -> + state + end + case response do %{last_offset: nil, message_set: []} -> handle_commit(:async_commit, state) @@ -515,6 +536,41 @@ defmodule KafkaEx.GenConsumer do handle_commit(sync_status, state_out) end + defp handle_offset_out_of_range( + %State{ + worker_name: worker_name, + topic: topic, + partition: partition, + current_offset: current_offset, + committed_offset: committed_offset, + acked_offset: acked_offset, + auto_offset_reset: auto_offset_reset + } = state + ) do + [ + %OffsetResponse{ + topic: ^topic, + partition_offsets: [ + %{partition: ^partition, error_code: :no_error, offset: [offset]} + ] + } + ] = case auto_offset_reset do + :earliest -> + KafkaEx.earliest_offset(topic, partition, worker_name) + :latest -> + KafkaEx.latest_offset(topic, partition, worker_name) + _ -> + raise "Offset out of range while consuming topic #{topic}, partition #{partition}." + end + + %State{ + state | + current_offset: offset, + committed_offset: offset, + acked_offset: offset + } + end + defp handle_commit(:sync_commit, %State{} = state), do: commit(state) defp handle_commit( :async_commit, From ad316bb36f30ca1796a6ac78b019a3c973a5a64f Mon Sep 17 00:00:00 2001 From: Michael Oliver Date: Thu, 14 Dec 2017 16:13:49 -0800 Subject: [PATCH 2/3] Update version --- README.md | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a5a8b63c..43596630 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ defmodule MyApp.Mixfile do defp deps do [ # add to your existing deps - {:kafka_ex, "~> 0.8.1"}, + {:kafka_ex, "~> 0.8.2"}, # if using snappy compression {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"} ] diff --git a/mix.exs b/mix.exs index cfbf42a0..b64151a3 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule KafkaEx.Mixfile do def project do [ app: :kafka_ex, - version: "0.8.1", + version: "0.8.2", elixir: "~> 1.1", dialyzer: [ plt_add_deps: :transitive, From 05d1b9c8d91dd58fb702ba90d5876b2daaf02d75 Mon Sep 17 00:00:00 2001 From: Michael Oliver Date: Sat, 16 Dec 2017 23:56:41 -0700 Subject: [PATCH 3/3] Undo version bump, add new option to @type --- README.md | 2 +- lib/kafka_ex/gen_consumer.ex | 1 + mix.exs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 43596630..a5a8b63c 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ defmodule MyApp.Mixfile do defp deps do [ # add to your existing deps - {:kafka_ex, "~> 0.8.2"}, + {:kafka_ex, "~> 0.8.1"}, # if using snappy compression {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"} ] diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 3717a594..c659eea6 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -200,6 +200,7 @@ defmodule KafkaEx.GenConsumer do """ @type option :: {:commit_interval, non_neg_integer} | {:commit_threshold, non_neg_integer} + | {:auto_offset_reset, :none | :earliest | :latest} @typedoc """ Options used when starting a `KafkaEx.GenConsumer`. diff --git a/mix.exs b/mix.exs index b64151a3..cfbf42a0 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule KafkaEx.Mixfile do def project do [ app: :kafka_ex, - version: "0.8.2", + version: "0.8.1", elixir: "~> 1.1", dialyzer: [ plt_add_deps: :transitive,