Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compile warnings and credo #325

Merged
merged 5 commits into from
Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@
#
requires: [],
#
# Credo automatically checks for updates, like e.g. Hex does.
# You can disable this behaviour below:
#
check_for_updates: true,
#
# If you want to enforce a style guide and need a more traditional linting
# experience, you can change `strict` to `true` below:
#
Expand All @@ -60,13 +55,13 @@
{Credo.Check.Consistency.SpaceInParentheses},
{Credo.Check.Consistency.TabsOrSpaces},

# For some checks, like AliasUsage, you can only customize the priority
# You can customize the priority of any check
# Priority values are: `low, normal, high, higher`
#
{Credo.Check.Design.AliasUsage, priority: :low},

# For others you can set parameters

# For some checks, you can also set other parameters
#
# If you don't want the `setup` and `test` macro calls in ExUnit tests
# or the `schema` macro in Ecto schemas to trigger DuplicatedCode, just
# set the `excluded_macros` parameter to `[:schema, :setup, :test]`.
Expand Down Expand Up @@ -111,6 +106,7 @@
{Credo.Check.Refactor.UnlessWithElse},

{Credo.Check.Warning.BoolOperationOnSameValues},
{Credo.Check.Warning.ExpensiveEmptyEnumCheck},
{Credo.Check.Warning.IExPry},
{Credo.Check.Warning.IoInspect},
{Credo.Check.Warning.LazyLogging},
Expand Down
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ cache:
- _build
matrix:
include:
- elixir: 1.7.3
otp_release: 21.0
- elixir: 1.6.2
- elixir: 1.7.4
otp_release: 21.1
- elixir: 1.6.6
otp_release: 20.2
- elixir: 1.5.3
otp_release: 20.2
Expand All @@ -33,4 +33,5 @@ before_script:
- IP_IFACE=eth0 ./scripts/docker_up.sh
script:
- MIX_ENV=test mix deps.compile
- MIX_ENV=test mix compile --warnings-as-errors
- ./scripts/ci_tests.sh
31 changes: 19 additions & 12 deletions lib/kafka_ex/api_versions.ex
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
defmodule KafkaEx.ApiVersions do
@moduledoc false

def api_versions_map(api_versions) do
api_versions
|> Enum.reduce(%{}, fn version, version_map ->
version_map |> Map.put(version.api_key, version)
end)
version_map |> Map.put(version.api_key, version)
end)
end


def find_api_version(api_versions_map, message_type, {min_implemented_version, max_implemented_version}) do
def find_api_version(
api_versions_map,
message_type,
{min_implemented_version, max_implemented_version}
) do
if api_versions_map == [:unsupported] do
{:ok, min_implemented_version}
else
case KafkaEx.Protocol.api_key(message_type) do
nil -> :unknown_message_for_client
api_key -> case api_versions_map[api_key] do
%{min_version: min} when min > max_implemented_version -> :no_version_supported
%{max_version: max} when max < min_implemented_version -> :no_version_supported
%{max_version: max} -> {:ok, Enum.min([max_implemented_version, max])}
_ -> :unknown_message_for_server
end
nil ->
:unknown_message_for_client

api_key ->
case api_versions_map[api_key] do
%{min_version: min} when min > max_implemented_version -> :no_version_supported
%{max_version: max} when max < min_implemented_version -> :no_version_supported
%{max_version: max} -> {:ok, Enum.min([max_implemented_version, max])}
_ -> :unknown_message_for_server
end
end
end
end
end
end
4 changes: 2 additions & 2 deletions lib/kafka_ex/protocol/api_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ defmodule KafkaEx.Protocol.ApiVersions do
api_versions_count :: 32-signed,
rest :: binary
>>, this_api_version) do
%{ parse_rest_of_response(api_versions_count, rest, this_api_version) | error_code: Protocol.error(error_code) }
%{parse_rest_of_response(api_versions_count, rest, this_api_version) | error_code: Protocol.error(error_code)}
end

defp parse_rest_of_response(api_versions_count, data, this_api_version) do
Expand All @@ -69,7 +69,7 @@ defmodule KafkaEx.Protocol.ApiVersions do
end

defp parse_one_api_version(<< api_key :: 16-signed, min_version :: 16-signed, max_version :: 16-signed, rest :: binary >>) do
{%ApiVersion{ api_key: api_key, min_version: min_version, max_version: max_version }, rest}
{%ApiVersion{api_key: api_key, min_version: min_version, max_version: max_version}, rest}
end

end
18 changes: 9 additions & 9 deletions lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@

defmodule KafkaEx.Protocol.CreateTopics do
alias KafkaEx.Protocol
@supported_versions_range {0, 0}
@default_api_version 0

@moduledoc """
Implementation of the Kafka CreateTopics request and response APIs
Expand All @@ -24,16 +22,19 @@ defmodule KafkaEx.Protocol.CreateTopics do
# timeout => INT32

defmodule ReplicaAssignment do
@moduledoc false
defstruct partition: nil, replicas: nil
@type t :: %ReplicaAssignment{ partition: integer, replicas: [integer] }
@type t :: %ReplicaAssignment{partition: integer, replicas: [integer]}
end

defmodule ConfigEntry do
@moduledoc false
defstruct config_name: nil, config_value: nil
@type t :: %ConfigEntry{ config_name: binary, config_value: binary | nil }
@type t :: %ConfigEntry{config_name: binary, config_value: binary | nil}
end

defmodule TopicRequest do
@moduledoc false
defstruct topic: nil,
num_partitions: -1,
replication_factor: -1,
Expand All @@ -55,8 +56,9 @@ defmodule KafkaEx.Protocol.CreateTopics do
end

defmodule TopicError do
@moduledoc false
defstruct topic_name: nil, error_code: nil
@type t :: %TopicError{ topic_name: binary, error_code: atom }
@type t :: %TopicError{topic_name: binary, error_code: atom}
end

defmodule Response do
Expand Down Expand Up @@ -127,7 +129,7 @@ defmodule KafkaEx.Protocol.CreateTopics do
end

defp map_encode(elems, function) do
if nil == elems or 0 == length(elems) do
if nil == elems or [] == elems do
<< 0 :: 32-signed >>
else
<< length(elems) :: 32-signed >> <>
Expand All @@ -138,15 +140,13 @@ defmodule KafkaEx.Protocol.CreateTopics do
end

@spec parse_response(binary, integer) :: [] | Response.t
def parse_response(message, api_version)

def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>, 0) do
%Response{topic_errors: parse_topic_errors(topic_errors_count, topic_errors)}
end

@spec parse_topic_errors(integer, binary) :: [TopicError.t]
defp parse_topic_errors(0, _), do: []

@spec parse_topic_errors(integer, binary) :: [TopicError.t]
defp parse_topic_errors(topic_errors_count,
<< topic_name_size :: 16-signed, topic_name :: size(topic_name_size)-binary, error_code :: 16-signed, rest :: binary >>) do
[%TopicError{topic_name: topic_name, error_code: Protocol.error(error_code)} | parse_topic_errors(topic_errors_count - 1, rest)]
Expand Down
4 changes: 2 additions & 2 deletions lib/kafka_ex/protocol/metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule KafkaEx.Protocol.Metadata do
alias KafkaEx.Socket

defstruct node_id: -1, host: "", port: 0, socket: nil, is_controller: nil
@type t :: %__MODULE__{node_id: integer, host: binary, port: integer, socket: KafkaEx.Socket.t, is_controller: boolean }
@type t :: %__MODULE__{node_id: integer, host: binary, port: integer, socket: KafkaEx.Socket.t, is_controller: boolean}

def connected?(%Broker{} = broker) do
broker.socket != nil && Socket.open?(broker.socket)
Expand Down Expand Up @@ -197,7 +197,7 @@ defmodule KafkaEx.Protocol.Metadata do
partition_metadatas_size :: 32-signed,
rest :: binary >>) do
{partition_metadatas, rest} = parse_partition_metadatas(partition_metadatas_size, [], rest)
[%TopicMetadata{error_code: Protocol.error(error_code), topic: topic, partition_metadatas: partition_metadatas, is_internal: is_internal == 1 } |
[%TopicMetadata{error_code: Protocol.error(error_code), topic: topic, partition_metadatas: partition_metadatas, is_internal: is_internal == 1} |
parse_topic_metadatas_v1(topic_metadatas_size - 1, rest)]
end

Expand Down
15 changes: 8 additions & 7 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ defmodule KafkaEx.Server do
{:noreply, new_state, timeout | :hibernate} |
{:stop, reason, reply, new_state} |
{:stop, reason, new_state} when reply: term, new_state: term, reason: term
@callback kafka_create_topics(CreateTopicsRequest.t, network_timeout :: integer, state :: State.t) ::
@callback kafka_create_topics([CreateTopicsRequest.t], network_timeout :: integer, state :: State.t) ::
{:reply, reply, new_state} when reply: term, new_state: term
@callback kafka_api_versions(state :: State.t) ::
{:reply, reply, new_state} when reply: term, new_state: term
Expand Down Expand Up @@ -312,6 +312,7 @@ defmodule KafkaEx.Server do
end
end

# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
def kafka_server_produce_send_request(correlation_id, produce_request, produce_request_data, state) do
{broker, state, corr_id} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition) do
nil ->
Expand Down Expand Up @@ -414,6 +415,12 @@ defmodule KafkaEx.Server do
retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version)
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata_with_version(_, correlation_id, _sync_timeout, topic, 0, error_code, server_api_versions) do
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}")
{correlation_id, %Metadata.Response{}}
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do
metadata_request = Metadata.create_request(correlation_id, @client_id, topic, api_version)
Expand All @@ -434,12 +441,6 @@ defmodule KafkaEx.Server do
end
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata_with_version(_, correlation_id, _sync_timeout, topic, 0, error_code, server_api_versions) do
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}")
{correlation_id, %Metadata.Response{}}
end


defoverridable [
kafka_server_produce: 2, kafka_server_offset: 4,
Expand Down
8 changes: 4 additions & 4 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ defmodule KafkaEx.Server0P10AndLater do

brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end)

{ _, %KafkaEx.Protocol.ApiVersions.Response{ api_versions: api_versions, error_code: :no_error }, state } = kafka_api_versions(%State{brokers: brokers})
{_, %KafkaEx.Protocol.ApiVersions.Response{api_versions: api_versions, error_code: :no_error}, state} = kafka_api_versions(%State{brokers: brokers})
api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions)

{correlation_id, metadata} = retrieve_metadata(brokers, state.correlation_id, config_sync_timeout(), [], api_versions)
Expand Down Expand Up @@ -116,7 +116,7 @@ defmodule KafkaEx.Server0P10AndLater do
|> first_broker_response(state)
|> ApiVersions.parse_response

{:reply, response, %{ state | correlation_id: state.correlation_id + 1 }}
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end

def kafka_create_topics(requests, network_timeout, state) do
Expand All @@ -130,7 +130,7 @@ defmodule KafkaEx.Server0P10AndLater do
timeout: network_timeout
}

mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request, api_version)
main_request = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request, api_version)

broker = state.brokers |> Enum.find(&(&1.is_controller))

Expand All @@ -140,7 +140,7 @@ defmodule KafkaEx.Server0P10AndLater do
{:topic_not_found, state}
_ ->
response = broker
|> NetworkClient.send_sync_request(mainRequest, config_sync_timeout())
|> NetworkClient.send_sync_request(main_request, config_sync_timeout())
|> case do
{:error, reason} -> {:error, reason}
response -> CreateTopics.parse_response(response, api_version)
Expand Down
1 change: 1 addition & 0 deletions lib/kafka_ex/server_0_p_8_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule KafkaEx.Server0P8P0 do
{:nowarn_function, kafka_server_offset_commit: 2},
{:nowarn_function, kafka_server_offset_fetch: 2},
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
]

use KafkaEx.Server
Expand Down
1 change: 1 addition & 0 deletions lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule KafkaEx.Server0P8P2 do
{:nowarn_function, kafka_server_join_group: 3},
{:nowarn_function, kafka_server_leave_group: 3},
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
]

use KafkaEx.Server
Expand Down
3 changes: 2 additions & 1 deletion lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule KafkaEx.Server0P9P0 do
# these functions aren't implemented for 0.9.0
@dialyzer [
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
]

use KafkaEx.Server
Expand Down Expand Up @@ -46,7 +47,7 @@ defmodule KafkaEx.Server0P9P0 do
defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2
defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2
defdelegate kafka_api_versions(state), to: Server0P8P2
defdelegate kafka_create_topics(requests, network_timeout, state),to: Server0P8P2
defdelegate kafka_create_topics(requests, network_timeout, state), to: Server0P8P2

def kafka_server_init([args]) do
kafka_server_init([args, self()])
Expand Down