Skip to content

Commit

Permalink
Merge pull request #325 from kafkaex/fix_compile_warnings
Browse files Browse the repository at this point in the history
Fix compile warnings and credo
  • Loading branch information
joshuawscott authored Nov 29, 2018
2 parents cb590ed + e5a69e3 commit f194d29
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 48 deletions.
12 changes: 4 additions & 8 deletions .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@
#
requires: [],
#
# Credo automatically checks for updates, like e.g. Hex does.
# You can disable this behaviour below:
#
check_for_updates: true,
#
# If you want to enforce a style guide and need a more traditional linting
# experience, you can change `strict` to `true` below:
#
Expand All @@ -60,13 +55,13 @@
{Credo.Check.Consistency.SpaceInParentheses},
{Credo.Check.Consistency.TabsOrSpaces},

# For some checks, like AliasUsage, you can only customize the priority
# You can customize the priority of any check
# Priority values are: `low, normal, high, higher`
#
{Credo.Check.Design.AliasUsage, priority: :low},

# For others you can set parameters

# For some checks, you can also set other parameters
#
# If you don't want the `setup` and `test` macro calls in ExUnit tests
# or the `schema` macro in Ecto schemas to trigger DuplicatedCode, just
# set the `excluded_macros` parameter to `[:schema, :setup, :test]`.
Expand Down Expand Up @@ -111,6 +106,7 @@
{Credo.Check.Refactor.UnlessWithElse},

{Credo.Check.Warning.BoolOperationOnSameValues},
{Credo.Check.Warning.ExpensiveEmptyEnumCheck},
{Credo.Check.Warning.IExPry},
{Credo.Check.Warning.IoInspect},
{Credo.Check.Warning.LazyLogging},
Expand Down
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ cache:
- _build
matrix:
include:
- elixir: 1.7.3
otp_release: 21.0
- elixir: 1.6.2
- elixir: 1.7.4
otp_release: 21.1
- elixir: 1.6.6
otp_release: 20.2
- elixir: 1.5.3
otp_release: 20.2
Expand All @@ -33,4 +33,5 @@ before_script:
- IP_IFACE=eth0 ./scripts/docker_up.sh
script:
- MIX_ENV=test mix deps.compile
- MIX_ENV=test mix compile --warnings-as-errors
- ./scripts/ci_tests.sh
31 changes: 19 additions & 12 deletions lib/kafka_ex/api_versions.ex
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
defmodule KafkaEx.ApiVersions do
@moduledoc false

def api_versions_map(api_versions) do
api_versions
|> Enum.reduce(%{}, fn version, version_map ->
version_map |> Map.put(version.api_key, version)
end)
version_map |> Map.put(version.api_key, version)
end)
end


def find_api_version(api_versions_map, message_type, {min_implemented_version, max_implemented_version}) do
def find_api_version(
api_versions_map,
message_type,
{min_implemented_version, max_implemented_version}
) do
if api_versions_map == [:unsupported] do
{:ok, min_implemented_version}
else
case KafkaEx.Protocol.api_key(message_type) do
nil -> :unknown_message_for_client
api_key -> case api_versions_map[api_key] do
%{min_version: min} when min > max_implemented_version -> :no_version_supported
%{max_version: max} when max < min_implemented_version -> :no_version_supported
%{max_version: max} -> {:ok, Enum.min([max_implemented_version, max])}
_ -> :unknown_message_for_server
end
nil ->
:unknown_message_for_client

api_key ->
case api_versions_map[api_key] do
%{min_version: min} when min > max_implemented_version -> :no_version_supported
%{max_version: max} when max < min_implemented_version -> :no_version_supported
%{max_version: max} -> {:ok, Enum.min([max_implemented_version, max])}
_ -> :unknown_message_for_server
end
end
end
end
end
end
4 changes: 2 additions & 2 deletions lib/kafka_ex/protocol/api_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ defmodule KafkaEx.Protocol.ApiVersions do
api_versions_count :: 32-signed,
rest :: binary
>>, this_api_version) do
%{ parse_rest_of_response(api_versions_count, rest, this_api_version) | error_code: Protocol.error(error_code) }
%{parse_rest_of_response(api_versions_count, rest, this_api_version) | error_code: Protocol.error(error_code)}
end

defp parse_rest_of_response(api_versions_count, data, this_api_version) do
Expand All @@ -69,7 +69,7 @@ defmodule KafkaEx.Protocol.ApiVersions do
end

defp parse_one_api_version(<< api_key :: 16-signed, min_version :: 16-signed, max_version :: 16-signed, rest :: binary >>) do
{%ApiVersion{ api_key: api_key, min_version: min_version, max_version: max_version }, rest}
{%ApiVersion{api_key: api_key, min_version: min_version, max_version: max_version}, rest}
end

end
18 changes: 9 additions & 9 deletions lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@

defmodule KafkaEx.Protocol.CreateTopics do
alias KafkaEx.Protocol
@supported_versions_range {0, 0}
@default_api_version 0

@moduledoc """
Implementation of the Kafka CreateTopics request and response APIs
Expand All @@ -24,16 +22,19 @@ defmodule KafkaEx.Protocol.CreateTopics do
# timeout => INT32

defmodule ReplicaAssignment do
@moduledoc false
defstruct partition: nil, replicas: nil
@type t :: %ReplicaAssignment{ partition: integer, replicas: [integer] }
@type t :: %ReplicaAssignment{partition: integer, replicas: [integer]}
end

defmodule ConfigEntry do
@moduledoc false
defstruct config_name: nil, config_value: nil
@type t :: %ConfigEntry{ config_name: binary, config_value: binary | nil }
@type t :: %ConfigEntry{config_name: binary, config_value: binary | nil}
end

defmodule TopicRequest do
@moduledoc false
defstruct topic: nil,
num_partitions: -1,
replication_factor: -1,
Expand All @@ -55,8 +56,9 @@ defmodule KafkaEx.Protocol.CreateTopics do
end

defmodule TopicError do
@moduledoc false
defstruct topic_name: nil, error_code: nil
@type t :: %TopicError{ topic_name: binary, error_code: atom }
@type t :: %TopicError{topic_name: binary, error_code: atom}
end

defmodule Response do
Expand Down Expand Up @@ -127,7 +129,7 @@ defmodule KafkaEx.Protocol.CreateTopics do
end

defp map_encode(elems, function) do
if nil == elems or 0 == length(elems) do
if nil == elems or [] == elems do
<< 0 :: 32-signed >>
else
<< length(elems) :: 32-signed >> <>
Expand All @@ -138,15 +140,13 @@ defmodule KafkaEx.Protocol.CreateTopics do
end

@spec parse_response(binary, integer) :: [] | Response.t
def parse_response(message, api_version)

def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>, 0) do
%Response{topic_errors: parse_topic_errors(topic_errors_count, topic_errors)}
end

@spec parse_topic_errors(integer, binary) :: [TopicError.t]
defp parse_topic_errors(0, _), do: []

@spec parse_topic_errors(integer, binary) :: [TopicError.t]
defp parse_topic_errors(topic_errors_count,
<< topic_name_size :: 16-signed, topic_name :: size(topic_name_size)-binary, error_code :: 16-signed, rest :: binary >>) do
[%TopicError{topic_name: topic_name, error_code: Protocol.error(error_code)} | parse_topic_errors(topic_errors_count - 1, rest)]
Expand Down
4 changes: 2 additions & 2 deletions lib/kafka_ex/protocol/metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule KafkaEx.Protocol.Metadata do
alias KafkaEx.Socket

defstruct node_id: -1, host: "", port: 0, socket: nil, is_controller: nil
@type t :: %__MODULE__{node_id: integer, host: binary, port: integer, socket: KafkaEx.Socket.t, is_controller: boolean }
@type t :: %__MODULE__{node_id: integer, host: binary, port: integer, socket: KafkaEx.Socket.t, is_controller: boolean}

def connected?(%Broker{} = broker) do
broker.socket != nil && Socket.open?(broker.socket)
Expand Down Expand Up @@ -197,7 +197,7 @@ defmodule KafkaEx.Protocol.Metadata do
partition_metadatas_size :: 32-signed,
rest :: binary >>) do
{partition_metadatas, rest} = parse_partition_metadatas(partition_metadatas_size, [], rest)
[%TopicMetadata{error_code: Protocol.error(error_code), topic: topic, partition_metadatas: partition_metadatas, is_internal: is_internal == 1 } |
[%TopicMetadata{error_code: Protocol.error(error_code), topic: topic, partition_metadatas: partition_metadatas, is_internal: is_internal == 1} |
parse_topic_metadatas_v1(topic_metadatas_size - 1, rest)]
end

Expand Down
15 changes: 8 additions & 7 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ defmodule KafkaEx.Server do
{:noreply, new_state, timeout | :hibernate} |
{:stop, reason, reply, new_state} |
{:stop, reason, new_state} when reply: term, new_state: term, reason: term
@callback kafka_create_topics(CreateTopicsRequest.t, network_timeout :: integer, state :: State.t) ::
@callback kafka_create_topics([CreateTopicsRequest.t], network_timeout :: integer, state :: State.t) ::
{:reply, reply, new_state} when reply: term, new_state: term
@callback kafka_api_versions(state :: State.t) ::
{:reply, reply, new_state} when reply: term, new_state: term
Expand Down Expand Up @@ -312,6 +312,7 @@ defmodule KafkaEx.Server do
end
end

# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
def kafka_server_produce_send_request(correlation_id, produce_request, produce_request_data, state) do
{broker, state, corr_id} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition) do
nil ->
Expand Down Expand Up @@ -414,6 +415,12 @@ defmodule KafkaEx.Server do
retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version)
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata_with_version(_, correlation_id, _sync_timeout, topic, 0, error_code, server_api_versions) do
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}")
{correlation_id, %Metadata.Response{}}
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do
metadata_request = Metadata.create_request(correlation_id, @client_id, topic, api_version)
Expand All @@ -434,12 +441,6 @@ defmodule KafkaEx.Server do
end
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata_with_version(_, correlation_id, _sync_timeout, topic, 0, error_code, server_api_versions) do
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}")
{correlation_id, %Metadata.Response{}}
end


defoverridable [
kafka_server_produce: 2, kafka_server_offset: 4,
Expand Down
8 changes: 4 additions & 4 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ defmodule KafkaEx.Server0P10AndLater do

brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end)

{ _, %KafkaEx.Protocol.ApiVersions.Response{ api_versions: api_versions, error_code: :no_error }, state } = kafka_api_versions(%State{brokers: brokers})
{_, %KafkaEx.Protocol.ApiVersions.Response{api_versions: api_versions, error_code: :no_error}, state} = kafka_api_versions(%State{brokers: brokers})
api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions)

{correlation_id, metadata} = retrieve_metadata(brokers, state.correlation_id, config_sync_timeout(), [], api_versions)
Expand Down Expand Up @@ -116,7 +116,7 @@ defmodule KafkaEx.Server0P10AndLater do
|> first_broker_response(state)
|> ApiVersions.parse_response

{:reply, response, %{ state | correlation_id: state.correlation_id + 1 }}
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end

def kafka_create_topics(requests, network_timeout, state) do
Expand All @@ -130,7 +130,7 @@ defmodule KafkaEx.Server0P10AndLater do
timeout: network_timeout
}

mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request, api_version)
main_request = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request, api_version)

broker = state.brokers |> Enum.find(&(&1.is_controller))

Expand All @@ -140,7 +140,7 @@ defmodule KafkaEx.Server0P10AndLater do
{:topic_not_found, state}
_ ->
response = broker
|> NetworkClient.send_sync_request(mainRequest, config_sync_timeout())
|> NetworkClient.send_sync_request(main_request, config_sync_timeout())
|> case do
{:error, reason} -> {:error, reason}
response -> CreateTopics.parse_response(response, api_version)
Expand Down
1 change: 1 addition & 0 deletions lib/kafka_ex/server_0_p_8_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule KafkaEx.Server0P8P0 do
{:nowarn_function, kafka_server_offset_commit: 2},
{:nowarn_function, kafka_server_offset_fetch: 2},
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
]

use KafkaEx.Server
Expand Down
1 change: 1 addition & 0 deletions lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule KafkaEx.Server0P8P2 do
{:nowarn_function, kafka_server_join_group: 3},
{:nowarn_function, kafka_server_leave_group: 3},
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
]

use KafkaEx.Server
Expand Down
3 changes: 2 additions & 1 deletion lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule KafkaEx.Server0P9P0 do
# these functions aren't implemented for 0.9.0
@dialyzer [
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
]

use KafkaEx.Server
Expand Down Expand Up @@ -46,7 +47,7 @@ defmodule KafkaEx.Server0P9P0 do
defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2
defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2
defdelegate kafka_api_versions(state), to: Server0P8P2
defdelegate kafka_create_topics(requests, network_timeout, state),to: Server0P8P2
defdelegate kafka_create_topics(requests, network_timeout, state), to: Server0P8P2

def kafka_server_init([args]) do
kafka_server_init([args, self()])
Expand Down

0 comments on commit f194d29

Please sign in to comment.