Skip to content

Commit

Permalink
Add default partitioner
Browse files Browse the repository at this point in the history
Add default partitioner that would assign partition if one isn't set for
produce request. Partition number is calculated from the message key
using murmur2 hashing algorithm, to align with Java implementation. Both
Java and Elixir clients will partition messages by keys in the same,
deterministic way.

In the case when the key is not provided, the partition is assigned
randomly.

Partitioned behavior is available to allow users to reimplement
partitioner in the desired way.
  • Loading branch information
barthez committed Jan 14, 2019
1 parent b3c7f92 commit acdf81c
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 1 deletion.
78 changes: 78 additions & 0 deletions lib/kafka_ex/default_partitioner.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule KafkaEx.DefaultPartitioner do
@moduledoc """
Default partitioner implementation.
When message key is set and partition isn't, partition is decided based
on murmur2 hash of a key to provide Java implementation consistency. When
message key and partition is missing, partition is selected randomly.
When partition is provided nothing changes.
"""
use KafkaEx.Partitioner
alias KafkaEx.Partitioner
alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Utils.Murmur, as: Murmur
require Logger

@spec assign_partition(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{}
) :: %ProduceRequest{}
def assign_partition(%ProduceRequest{partition: partition} = request, _)
when is_number(partition) do
request
end

def assign_partition(%ProduceRequest{partition: nil} = request, metadata) do
case Partitioner.get_key(request) do
{:ok, nil} ->
assign_partition_randomly(request, metadata)

{:ok, key} ->
assign_partition_with_key(request, metadata, key)

{:error, reason} ->
Logger.warn(
"#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}"
)

assign_partition_randomly(request, metadata)
end
end

@spec assign_partition_randomly(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{}
) :: %ProduceRequest{}
defp assign_partition_randomly(
%ProduceRequest{topic: topic} = request,
metadata
) do
partition_id =
case MetadataResponse.partitions_for_topic(metadata, topic) do
[] -> 0
list -> Enum.random(list)
end

%{request | partition: partition_id}
end

@spec assign_partition_with_key(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{},
key :: binary
) :: %ProduceRequest{}
defp assign_partition_with_key(
%ProduceRequest{topic: topic} = request,
metadata,
key
) do
hash = Murmur.umurmur2(key)

partitions_count =
metadata |> MetadataResponse.partitions_for_topic(topic) |> length()

partition_id = rem(hash, partitions_count)
%{request | partition: partition_id}
end
end
45 changes: 45 additions & 0 deletions lib/kafka_ex/partitioner.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
defmodule KafkaEx.Partitioner do
@moduledoc """
Behaviour definition for partitioners, that assigns partitions for requests.
"""

alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse

@callback assign_partition(
request :: %ProduceRequest{},
metadata :: %MetadataResponse{}
) :: %ProduceRequest{}

defmacro __using__(_) do
quote location: :keep do
@behaviour KafkaEx.Partitioner
end
end

@doc """
Returns key for given messages
Function looks for message key in messages list of {ProduceRequest}. It may return
either `{:ok, nil}` if no key was found, `{:ok, key}` when key was found,
or `{:error, atom}` when error happens while looking for the key.
"""
@spec get_key(request :: %ProduceRequest{}) ::
{:ok, nil | binary} | {:error, atom}
def get_key(%ProduceRequest{messages: messages}) when length(messages) > 0 do
case unique_keys(messages) do
[key] -> {:ok, key}
_ -> {:error, :inconsistent_keys}
end
end

def get_key(_) do
{:error, :no_messages}
end

defp unique_keys(messages) do
messages
|> Enum.map(&Map.get(&1, :key))
|> Enum.uniq()
end
end
12 changes: 11 additions & 1 deletion lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,15 @@ defmodule KafkaEx.Server do
# KakfaEx.Server behavior default implementations
# This needs a refactor, but for now make credo pass:
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
def kafka_server_produce(produce_request, state) do
def kafka_server_produce(
produce_request,
%State{metadata: metadata} = state
) do
correlation_id = state.correlation_id + 1

produce_request =
default_partitioner().assign_partition(produce_request, metadata)

produce_request_data =
try do
Produce.create_request(correlation_id, @client_id, produce_request)
Expand Down Expand Up @@ -932,6 +938,10 @@ defmodule KafkaEx.Server do
defp config_sync_timeout(timeout \\ nil) do
timeout || Application.get_env(:kafka_ex, :sync_timeout, @sync_timeout)
end

defp default_partitioner do
Application.get_env(:kafka_ex, :partitioner, KafkaEx.DefaultPartitioner)
end
end
end
end
70 changes: 70 additions & 0 deletions lib/kafka_ex/utils/murmur.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule KafkaEx.Utils.Murmur do
@moduledoc """
Utility module that provides Murmur hashing algorithm.
"""

use Bitwise

# Arbitrary constant for murmur2 hashing
# https://github.com/aappleby/smhasher/blob/master/src/MurmurHash2.cpp#L39-L43
@m 0x5BD1E995
@r 24

# Default seed to hashing, copied form Java implementation
# https://github.com/apache/kafka/blob/809be928f1ae004e11d65c307ea322bef126c834/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L382
@seed 0x9747B28C

@doc """
Calculates murmur2 hash for given binary
"""
@spec murmur2(key :: binary) :: integer
def murmur2(key) do
<<seed::signed-size(32)>> = <<@seed::size(32)>>
len = byte_size(key)
_murmur2(key, seed ^^^ len)
end

@doc """
Calculates murmur2 hash for given binary as unsigned 32-bit integer
"""
@spec umurmur2(key :: binary) :: integer
def umurmur2(key) do
key |> murmur2() |> band(0xFFFFFFFF)
end

defp mask32(num) do
<<signed::signed-size(32)>> = <<num &&& 0xFFFFFFFF::size(32)>>
signed
end

# Unsigned bitwise right shift on 32 bits
defp ubsr32(num, shift) do
(num &&& 0xFFFFFFFF) >>> shift
end

defp _murmur2(<<a::little-size(32), rest::binary>>, h) do
k = mask32(a * @m)
k = k ^^^ ubsr32(k, @r)
k = mask32(k * @m)
h = mask32(h * @m)
_murmur2(rest, h ^^^ k)
end

defp _murmur2(<<a1::size(8), a2::size(8), a3::size(8)>>, h) do
_murmur2(<<a1, a2>>, h ^^^ mask32(a3 <<< 16))
end

defp _murmur2(<<a1::size(8), a2::size(8)>>, h) do
_murmur2(<<a1>>, h ^^^ mask32(a2 <<< 8))
end

defp _murmur2(<<a1::size(8)>>, h) do
_murmur2("", mask32((h ^^^ a1) * @m))
end

defp _murmur2("", h) do
h = h ^^^ ubsr32(h, 13)
h = mask32(h * @m)
h ^^^ ubsr32(h, 15)
end
end
101 changes: 101 additions & 0 deletions test/kafka_ex/default_partitioner_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
defmodule KafkaEx.DefaultPartitionerTest do
alias KafkaEx.DefaultPartitioner

alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Produce.Message, as: ProduceMessage
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Protocol.Metadata.TopicMetadata
alias KafkaEx.Protocol.Metadata.PartitionMetadata

import ExUnit.CaptureLog

use ExUnit.Case

def metadata(partitions \\ 5) do
%MetadataResponse{
topic_metadatas: [
%TopicMetadata{
topic: "test_topic",
partition_metadatas:
Enum.map(1..partitions, fn n ->
%PartitionMetadata{
partition_id: n
}
end)
}
]
}
end

test "no assignment" do
request = %ProduceRequest{
topic: "test_topic",
partition: 2,
messages: [
%ProduceMessage{key: nil, value: "message"}
]
}

%{partition: 2} = DefaultPartitioner.assign_partition(request, metadata(5))
end

test "random assignment" do
request = %ProduceRequest{
topic: "test_topic",
partition: nil,
messages: [
%ProduceMessage{key: nil, value: "message"}
]
}

%{partition: partition} =
DefaultPartitioner.assign_partition(request, metadata(5))

assert is_number(partition)
assert partition > 0
assert partition <= 5
end

test "key based assignment" do
request = %ProduceRequest{
topic: "test_topic",
partition: nil,
messages: [
%ProduceMessage{key: "key", value: "message"}
]
}

%{partition: 4} = DefaultPartitioner.assign_partition(request, metadata(5))
%{partition: 3} = DefaultPartitioner.assign_partition(request, metadata(6))

second_request = %ProduceRequest{
topic: "test_topic",
partition: nil,
messages: [
%ProduceMessage{key: "key2", value: "message"}
]
}

%{partition: 1} =
DefaultPartitioner.assign_partition(second_request, metadata(5))

%{partition: 5} =
DefaultPartitioner.assign_partition(second_request, metadata(6))
end

test "produce request with inconsistent keys" do
request = %ProduceRequest{
topic: "test_topic",
partition: nil,
messages: [
%ProduceMessage{key: "key-1", value: "message-1"},
%ProduceMessage{key: "key-2", value: "message-2"}
]
}

assert capture_log(fn ->
DefaultPartitioner.assign_partition(request, metadata(5))
end) =~
"KafkaEx.DefaultPartitioner: couldn't assign partition due to :inconsistent_keys"
end
end
65 changes: 65 additions & 0 deletions test/kafka_ex/partitioner_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
defmodule KafkaEx.PartitionerTest do
alias KafkaEx.Partitioner

alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Produce.Message, as: ProduceMessage

use ExUnit.Case

@spec request(messages :: [{binary | nil, binary}]) :: ProduceRequest.t()
def request(messages) do
%ProduceRequest{
topic: "test_topic",
messages:
Enum.map(messages, fn {key, value} ->
%ProduceMessage{key: key, value: value}
end)
}
end

test "key detection" do
assert {:ok, "key"} ==
Partitioner.get_key(
request([
{"key", "message_1"},
{"key", "message_2"},
{"key", "message_3"},
{"key", "message_3"}
])
)

assert {:error, :inconsistent_keys} ==
Partitioner.get_key(
request([
{"key", "message"},
{"key2", "message"}
])
)

assert {:error, :inconsistent_keys} ==
Partitioner.get_key(
request([
{"key", "message"},
{nil, "message"}
])
)

assert {:error, :inconsistent_keys} ==
Partitioner.get_key(
request([
{nil, "message"},
{"key", "message"}
])
)

assert {:ok, nil} ==
Partitioner.get_key(
request([
{nil, "message"},
{nil, "message"}
])
)

assert {:error, :no_messages} == Partitioner.get_key(request([]))
end
end
Loading

0 comments on commit acdf81c

Please sign in to comment.