diff --git a/.credo.exs b/.credo.exs index 7b89a30f..89090371 100644 --- a/.credo.exs +++ b/.credo.exs @@ -30,11 +30,6 @@ # requires: [], # - # Credo automatically checks for updates, like e.g. Hex does. - # You can disable this behaviour below: - # - check_for_updates: true, - # # If you want to enforce a style guide and need a more traditional linting # experience, you can change `strict` to `true` below: # @@ -60,13 +55,13 @@ {Credo.Check.Consistency.SpaceInParentheses}, {Credo.Check.Consistency.TabsOrSpaces}, - # For some checks, like AliasUsage, you can only customize the priority + # You can customize the priority of any check # Priority values are: `low, normal, high, higher` # {Credo.Check.Design.AliasUsage, priority: :low}, - # For others you can set parameters - + # For some checks, you can also set other parameters + # # If you don't want the `setup` and `test` macro calls in ExUnit tests # or the `schema` macro in Ecto schemas to trigger DuplicatedCode, just # set the `excluded_macros` parameter to `[:schema, :setup, :test]`. @@ -111,6 +106,7 @@ {Credo.Check.Refactor.UnlessWithElse}, {Credo.Check.Warning.BoolOperationOnSameValues}, + {Credo.Check.Warning.ExpensiveEmptyEnumCheck}, {Credo.Check.Warning.IExPry}, {Credo.Check.Warning.IoInspect}, {Credo.Check.Warning.LazyLogging}, diff --git a/.travis.yml b/.travis.yml index 71f402ff..478133ef 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,9 +8,9 @@ cache: - _build matrix: include: - - elixir: 1.7.3 - otp_release: 21.0 - - elixir: 1.6.2 + - elixir: 1.7.4 + otp_release: 21.1 + - elixir: 1.6.6 otp_release: 20.2 - elixir: 1.5.3 otp_release: 20.2 @@ -33,4 +33,5 @@ before_script: - IP_IFACE=eth0 ./scripts/docker_up.sh script: - MIX_ENV=test mix deps.compile + - MIX_ENV=test mix compile --warnings-as-errors - ./scripts/ci_tests.sh diff --git a/lib/kafka_ex/api_versions.ex b/lib/kafka_ex/api_versions.ex index f29cc3ec..2328eb3a 100644 --- a/lib/kafka_ex/api_versions.ex +++ b/lib/kafka_ex/api_versions.ex @@ -1,26 +1,33 @@ defmodule KafkaEx.ApiVersions do + @moduledoc false def api_versions_map(api_versions) do api_versions |> Enum.reduce(%{}, fn version, version_map -> - version_map |> Map.put(version.api_key, version) - end) + version_map |> Map.put(version.api_key, version) + end) end - - def find_api_version(api_versions_map, message_type, {min_implemented_version, max_implemented_version}) do + def find_api_version( + api_versions_map, + message_type, + {min_implemented_version, max_implemented_version} + ) do if api_versions_map == [:unsupported] do {:ok, min_implemented_version} else case KafkaEx.Protocol.api_key(message_type) do - nil -> :unknown_message_for_client - api_key -> case api_versions_map[api_key] do - %{min_version: min} when min > max_implemented_version -> :no_version_supported - %{max_version: max} when max < min_implemented_version -> :no_version_supported - %{max_version: max} -> {:ok, Enum.min([max_implemented_version, max])} - _ -> :unknown_message_for_server - end + nil -> + :unknown_message_for_client + + api_key -> + case api_versions_map[api_key] do + %{min_version: min} when min > max_implemented_version -> :no_version_supported + %{max_version: max} when max < min_implemented_version -> :no_version_supported + %{max_version: max} -> {:ok, Enum.min([max_implemented_version, max])} + _ -> :unknown_message_for_server + end end end end -end \ No newline at end of file +end diff --git a/lib/kafka_ex/protocol/api_versions.ex b/lib/kafka_ex/protocol/api_versions.ex index 8ccc4938..1c1cb58d 100644 --- a/lib/kafka_ex/protocol/api_versions.ex +++ b/lib/kafka_ex/protocol/api_versions.ex @@ -56,7 +56,7 @@ defmodule KafkaEx.Protocol.ApiVersions do api_versions_count :: 32-signed, rest :: binary >>, this_api_version) do - %{ parse_rest_of_response(api_versions_count, rest, this_api_version) | error_code: Protocol.error(error_code) } + %{parse_rest_of_response(api_versions_count, rest, this_api_version) | error_code: Protocol.error(error_code)} end defp parse_rest_of_response(api_versions_count, data, this_api_version) do @@ -69,7 +69,7 @@ defmodule KafkaEx.Protocol.ApiVersions do end defp parse_one_api_version(<< api_key :: 16-signed, min_version :: 16-signed, max_version :: 16-signed, rest :: binary >>) do - {%ApiVersion{ api_key: api_key, min_version: min_version, max_version: max_version }, rest} + {%ApiVersion{api_key: api_key, min_version: min_version, max_version: max_version}, rest} end end diff --git a/lib/kafka_ex/protocol/create_topics.ex b/lib/kafka_ex/protocol/create_topics.ex index 4a37ce51..315703cb 100644 --- a/lib/kafka_ex/protocol/create_topics.ex +++ b/lib/kafka_ex/protocol/create_topics.ex @@ -1,8 +1,6 @@ - defmodule KafkaEx.Protocol.CreateTopics do alias KafkaEx.Protocol @supported_versions_range {0, 0} - @default_api_version 0 @moduledoc """ Implementation of the Kafka CreateTopics request and response APIs @@ -24,16 +22,19 @@ defmodule KafkaEx.Protocol.CreateTopics do # timeout => INT32 defmodule ReplicaAssignment do + @moduledoc false defstruct partition: nil, replicas: nil - @type t :: %ReplicaAssignment{ partition: integer, replicas: [integer] } + @type t :: %ReplicaAssignment{partition: integer, replicas: [integer]} end defmodule ConfigEntry do + @moduledoc false defstruct config_name: nil, config_value: nil - @type t :: %ConfigEntry{ config_name: binary, config_value: binary | nil } + @type t :: %ConfigEntry{config_name: binary, config_value: binary | nil} end defmodule TopicRequest do + @moduledoc false defstruct topic: nil, num_partitions: -1, replication_factor: -1, @@ -55,8 +56,9 @@ defmodule KafkaEx.Protocol.CreateTopics do end defmodule TopicError do + @moduledoc false defstruct topic_name: nil, error_code: nil - @type t :: %TopicError{ topic_name: binary, error_code: atom } + @type t :: %TopicError{topic_name: binary, error_code: atom} end defmodule Response do @@ -127,7 +129,7 @@ defmodule KafkaEx.Protocol.CreateTopics do end defp map_encode(elems, function) do - if nil == elems or 0 == length(elems) do + if nil == elems or [] == elems do << 0 :: 32-signed >> else << length(elems) :: 32-signed >> <> @@ -138,15 +140,13 @@ defmodule KafkaEx.Protocol.CreateTopics do end @spec parse_response(binary, integer) :: [] | Response.t - def parse_response(message, api_version) - def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>, 0) do %Response{topic_errors: parse_topic_errors(topic_errors_count, topic_errors)} end + @spec parse_topic_errors(integer, binary) :: [TopicError.t] defp parse_topic_errors(0, _), do: [] - @spec parse_topic_errors(integer, binary) :: [TopicError.t] defp parse_topic_errors(topic_errors_count, << topic_name_size :: 16-signed, topic_name :: size(topic_name_size)-binary, error_code :: 16-signed, rest :: binary >>) do [%TopicError{topic_name: topic_name, error_code: Protocol.error(error_code)} | parse_topic_errors(topic_errors_count - 1, rest)] diff --git a/lib/kafka_ex/protocol/metadata.ex b/lib/kafka_ex/protocol/metadata.ex index d790d151..c51a4f61 100644 --- a/lib/kafka_ex/protocol/metadata.ex +++ b/lib/kafka_ex/protocol/metadata.ex @@ -20,7 +20,7 @@ defmodule KafkaEx.Protocol.Metadata do alias KafkaEx.Socket defstruct node_id: -1, host: "", port: 0, socket: nil, is_controller: nil - @type t :: %__MODULE__{node_id: integer, host: binary, port: integer, socket: KafkaEx.Socket.t, is_controller: boolean } + @type t :: %__MODULE__{node_id: integer, host: binary, port: integer, socket: KafkaEx.Socket.t, is_controller: boolean} def connected?(%Broker{} = broker) do broker.socket != nil && Socket.open?(broker.socket) @@ -197,7 +197,7 @@ defmodule KafkaEx.Protocol.Metadata do partition_metadatas_size :: 32-signed, rest :: binary >>) do {partition_metadatas, rest} = parse_partition_metadatas(partition_metadatas_size, [], rest) - [%TopicMetadata{error_code: Protocol.error(error_code), topic: topic, partition_metadatas: partition_metadatas, is_internal: is_internal == 1 } | + [%TopicMetadata{error_code: Protocol.error(error_code), topic: topic, partition_metadatas: partition_metadatas, is_internal: is_internal == 1} | parse_topic_metadatas_v1(topic_metadatas_size - 1, rest)] end diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 71413ffe..ba629cc2 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -160,7 +160,7 @@ defmodule KafkaEx.Server do {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_create_topics(CreateTopicsRequest.t, network_timeout :: integer, state :: State.t) :: + @callback kafka_create_topics([CreateTopicsRequest.t], network_timeout :: integer, state :: State.t) :: {:reply, reply, new_state} when reply: term, new_state: term @callback kafka_api_versions(state :: State.t) :: {:reply, reply, new_state} when reply: term, new_state: term @@ -312,6 +312,7 @@ defmodule KafkaEx.Server do end end + # credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity def kafka_server_produce_send_request(correlation_id, produce_request, produce_request_data, state) do {broker, state, corr_id} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition) do nil -> @@ -414,6 +415,12 @@ defmodule KafkaEx.Server do retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version) end + # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity + def retrieve_metadata_with_version(_, correlation_id, _sync_timeout, topic, 0, error_code, server_api_versions) do + Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}") + {correlation_id, %Metadata.Response{}} + end + # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity def retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do metadata_request = Metadata.create_request(correlation_id, @client_id, topic, api_version) @@ -434,12 +441,6 @@ defmodule KafkaEx.Server do end end - # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity - def retrieve_metadata_with_version(_, correlation_id, _sync_timeout, topic, 0, error_code, server_api_versions) do - Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}") - {correlation_id, %Metadata.Response{}} - end - defoverridable [ kafka_server_produce: 2, kafka_server_offset: 4, diff --git a/lib/kafka_ex/server_0_p_10_and_later.ex b/lib/kafka_ex/server_0_p_10_and_later.ex index 8d6d17d4..61f96879 100644 --- a/lib/kafka_ex/server_0_p_10_and_later.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -65,7 +65,7 @@ defmodule KafkaEx.Server0P10AndLater do brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end) - { _, %KafkaEx.Protocol.ApiVersions.Response{ api_versions: api_versions, error_code: :no_error }, state } = kafka_api_versions(%State{brokers: brokers}) + {_, %KafkaEx.Protocol.ApiVersions.Response{api_versions: api_versions, error_code: :no_error}, state} = kafka_api_versions(%State{brokers: brokers}) api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions) {correlation_id, metadata} = retrieve_metadata(brokers, state.correlation_id, config_sync_timeout(), [], api_versions) @@ -116,7 +116,7 @@ defmodule KafkaEx.Server0P10AndLater do |> first_broker_response(state) |> ApiVersions.parse_response - {:reply, response, %{ state | correlation_id: state.correlation_id + 1 }} + {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end def kafka_create_topics(requests, network_timeout, state) do @@ -130,7 +130,7 @@ defmodule KafkaEx.Server0P10AndLater do timeout: network_timeout } - mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request, api_version) + main_request = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request, api_version) broker = state.brokers |> Enum.find(&(&1.is_controller)) @@ -140,7 +140,7 @@ defmodule KafkaEx.Server0P10AndLater do {:topic_not_found, state} _ -> response = broker - |> NetworkClient.send_sync_request(mainRequest, config_sync_timeout()) + |> NetworkClient.send_sync_request(main_request, config_sync_timeout()) |> case do {:error, reason} -> {:error, reason} response -> CreateTopics.parse_response(response, api_version) diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index a359f63b..fc36edbe 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -15,6 +15,7 @@ defmodule KafkaEx.Server0P8P0 do {:nowarn_function, kafka_server_offset_commit: 2}, {:nowarn_function, kafka_server_offset_fetch: 2}, {:nowarn_function, kafka_create_topics: 3}, + {:nowarn_function, kafka_api_versions: 1} ] use KafkaEx.Server diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index ee4b97d1..ae000e5c 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -10,6 +10,7 @@ defmodule KafkaEx.Server0P8P2 do {:nowarn_function, kafka_server_join_group: 3}, {:nowarn_function, kafka_server_leave_group: 3}, {:nowarn_function, kafka_create_topics: 3}, + {:nowarn_function, kafka_api_versions: 1} ] use KafkaEx.Server diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 20312e3b..87f6336a 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -7,6 +7,7 @@ defmodule KafkaEx.Server0P9P0 do # these functions aren't implemented for 0.9.0 @dialyzer [ {:nowarn_function, kafka_create_topics: 3}, + {:nowarn_function, kafka_api_versions: 1} ] use KafkaEx.Server @@ -46,7 +47,7 @@ defmodule KafkaEx.Server0P9P0 do defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2 defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2 defdelegate kafka_api_versions(state), to: Server0P8P2 - defdelegate kafka_create_topics(requests, network_timeout, state),to: Server0P8P2 + defdelegate kafka_create_topics(requests, network_timeout, state), to: Server0P8P2 def kafka_server_init([args]) do kafka_server_init([args, self()])