Skip to content

Commit

Permalink
Add default partitioner
Browse files Browse the repository at this point in the history
Add default partitioner that would assign partition if one isn't set for
produce request. Partition number is calculated from the message key
using murmur2 hashing algorithm, to align with Java implementation. Both
Java and Elixir clients will partition messages by keys in the same,
deterministic way.

In the case when the key is not provided, the partition is assigned
randomly.

Partitioned behavior is available to allow users to reimplement
partitioner in the desired way.
  • Loading branch information
barthez committed Dec 9, 2018
1 parent b3c7f92 commit 40e09e3
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 1 deletion.
55 changes: 55 additions & 0 deletions lib/kafka_ex/default_partitioner.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule KafkaEx.DefaultPartitioner do
@moduledoc """
Default partitioner implementation.
When message key is set and partition isn't, partition is decided based
on murmur2 hash of a key to provide Java implementation consistency. When
message key and partition is missing, partition is selected randomly.
When partition is provided nothing changes.
"""
use KafkaEx.Partitioner
alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Utils.Murmur, as: Murmur
require Logger

@spec assign_partition(request :: %ProduceRequest{}, metadata :: %MetadataResponse{}) ::
%ProduceRequest{}
def assign_partition(request = %ProduceRequest{partition: partition}, _) when is_number(partition) do
request
end

def assign_partition(request = %ProduceRequest{partition: nil}, metadata) do
case get_key(request) do
{:ok, nil} -> assign_partition_randomly(request, metadata)
{:ok, key} -> assign_partition_with_key(request, metadata, key)
{:error, message, nil} ->
Logger.warn("Partitioner: " <> message)
assign_partition_randomly(request, metadata)
{:error, message, key} ->
Logger.warn("Partitioner: " <> message)
assign_partition_with_key(request, metadata, key)
end
end

@spec assign_partition_randomly(request :: %ProduceRequest{}, metadata :: %MetadataResponse{}) ::
%ProduceRequest{}
defp assign_partition_randomly(request = %ProduceRequest{topic: topic}, metadata) do
partition_id =
case MetadataResponse.partitions_for_topic(metadata, topic) do
[] -> 1
list -> Enum.random(list)
end

%{request | partition: partition_id}
end

@spec assign_partition_with_key(request :: %ProduceRequest{}, metadata :: %MetadataResponse{}, key :: binary) ::
%ProduceRequest{}
defp assign_partition_with_key(request = %ProduceRequest{topic: topic}, metadata, key) do
hash = Murmur.murmur2(key)
partitions_count = metadata |> MetadataResponse.partitions_for_topic(topic) |> length()
partition_id = Integer.mod(hash, partitions_count)
%{request | partition: partition_id}
end
end
43 changes: 43 additions & 0 deletions lib/kafka_ex/partitioner.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule KafkaEx.Partitioner do
@moduledoc """
Behaviour definition for partitioners, that assigns partitions for requests.
"""

alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse

@callback assign_partition(request :: %ProduceRequest{}, metadata :: %MetadataResponse{}) ::
%ProduceRequest{}

defmacro __using__(_) do
quote location: :keep do
@behaviour KafkaEx.Partitioner

@doc """
Returns key for given messages
Function looks for message key in messages list of {ProduceRequest}. It may return
either `{:ok, nil}` if no key was found, `{:ok, key}` when key was found,
or `{:error, message, nil | key}` when error happens while looking for the key.
"""
@spec get_key(request :: %ProduceRequest{}) ::
{:ok, nil} | {:ok, binary} | {:error, binary, binary | nil}
defp get_key(%ProduceRequest{messages: messages}) when is_list(messages) and length(messages) > 0 do
Enum.reduce(
messages,
{:ok, nil},
fn
(%{key: key}, {:ok, nil}) when is_binary(key) -> {:ok, key}
(%{key: key}, {:ok, key}) -> {:ok, key}
(%{key: other_key}, {:ok, key}) when is_binary(other_key) -> {:error, "Inconsistent keys within messages", key}
(_, response) -> response
end
)
end

defp get_key(_) do
{:error, "No messages", nil}
end
end
end
end
7 changes: 6 additions & 1 deletion lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,10 @@ defmodule KafkaEx.Server do
# KakfaEx.Server behavior default implementations
# This needs a refactor, but for now make credo pass:
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
def kafka_server_produce(produce_request, state) do
def kafka_server_produce(produce_request, state = %State{metadata: metadata}) do
correlation_id = state.correlation_id + 1

produce_request = default_partitioner().assign_partition(produce_request, metadata)
produce_request_data =
try do
Produce.create_request(correlation_id, @client_id, produce_request)
Expand Down Expand Up @@ -932,6 +933,10 @@ defmodule KafkaEx.Server do
defp config_sync_timeout(timeout \\ nil) do
timeout || Application.get_env(:kafka_ex, :sync_timeout, @sync_timeout)
end

defp default_partitioner() do
Application.get_env(:kafka_ex, :partitioner, KafkaEx.DefaultPartitioner)
end
end
end
end
68 changes: 68 additions & 0 deletions lib/kafka_ex/utils/murmur.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
defmodule KafkaEx.Utils.Murmur do
@moduledoc """
Utility module that provides Murmur hashing algorithm.
"""

use Bitwise

# Arbitrary constant for murmur2 hashing
@m 0x5bd1e995

# Default seed to hashing, copied form Java implementation
@seed 0x9747b28c

@doc """
Calculates murmur2 hash for given binary
"""
@spec murmur2(key :: binary) :: integer
def murmur2(key) do
<<seed :: signed-size(32)>> = <<@seed :: size(32) >>
len = byte_size(key)
_murmur2(key, seed ^^^ len)
end

@doc """
Calculates murmur2 hash for given binary as unsigned 32-bit integer
"""
@spec murmur2(key :: binary) :: integer
def umurmur2(key) do
key |> murmur2() |> band(0xFFFFFFFF)
end

defmacrop mask32(num) do
quote do
<<signed :: signed-size(32)>> = << unquote(num) &&& 0xFFFFFFFF :: size(32)>>
signed
end
end

# Unsigned bitwise right shift on 32 bits
defmacrop ubsr32(num, shift), do: quote(do: (unquote(num) &&& 0xFFFFFFFF) >>> unquote(shift))

defp _murmur2(<<a :: little-size(32), rest :: binary>>, h) do
k = mask32(a * @m)
k = k ^^^ ubsr32(k, 24)
k = mask32(k * @m)
h = mask32(h * @m)
_murmur2(rest, h ^^^ k)
end

defp _murmur2(<<a1 :: size(8), a2 :: size(8), a3 :: size(8)>>, h) do
_murmur2(<<a1, a2>>, h ^^^ mask32(a3 <<< 16))
end

defp _murmur2(<<a1 :: size(8), a2 :: size(8)>>, h) do
_murmur2(<<a1>>, h ^^^ mask32(a2 <<< 8))
end

defp _murmur2(<<a1 :: size(8)>>, h) do
_murmur2("", mask32((h ^^^ a1) * @m))
end

defp _murmur2("", h) do
h = h ^^^ ubsr32(h, 13)
h = mask32(h * @m)
h ^^^ ubsr32(h, 15)
end

end
77 changes: 77 additions & 0 deletions test/kafka_ex/default_partitioner_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
defmodule KafkaEx.DefaultPartitionerTest do
alias KafkaEx.DefaultPartitioner

alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Produce.Message, as: ProduceMessage
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Protocol.Metadata.TopicMetadata
alias KafkaEx.Protocol.Metadata.PartitionMetadata

use ExUnit.Case

def metadata(partitions \\ 5) do
%MetadataResponse{
topic_metadatas: [
%TopicMetadata{
topic: "test_topic",
partition_metadatas: Enum.map(1..partitions, fn n ->
%PartitionMetadata{
partition_id: n
}
end)
}
]
}
end

test "no assignment" do
request = %ProduceRequest{
topic: "test_topic",
partition: 2,
messages: [
%ProduceMessage{key: nil, value: "message"}
]
}

%{partition: 2} = DefaultPartitioner.assign_partition(request, metadata(5))
end

test "random assignment" do
request = %ProduceRequest{
topic: "test_topic",
partition: nil,
messages: [
%ProduceMessage{key: nil, value: "message"}
]
}

%{partition: partition} = DefaultPartitioner.assign_partition(request, metadata(5))
assert is_number(partition)
assert partition > 0
assert partition <= 5
end

test "key based assignment" do
request = %ProduceRequest{
topic: "test_topic",
partition: nil,
messages: [
%ProduceMessage{key: "key", value: "message"}
]
}

%{partition: 3} = DefaultPartitioner.assign_partition(request, metadata(5))
%{partition: 5} = DefaultPartitioner.assign_partition(request, metadata(6))

second_request = %ProduceRequest{
topic: "test_topic",
partition: nil,
messages: [
%ProduceMessage{key: "key2", value: "message"}
]
}

%{partition: 1} = DefaultPartitioner.assign_partition(second_request, metadata(5))
%{partition: 1} = DefaultPartitioner.assign_partition(second_request, metadata(10))
end
end
18 changes: 18 additions & 0 deletions test/kafka_ex/utils/murmur_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule KafkaEx.Utils.MurmurTest do
use ExUnit.Case

alias KafkaEx.Utils.Murmur

test "murmur2" do
assert Murmur.murmur2("rule") == -1673595344
assert Murmur.murmur2("monkey") == 385264353
assert Murmur.murmur2("hover") == -1982829826
assert Murmur.murmur2("guest") == 1235690374
assert Murmur.murmur2("necklace") == -515547202
assert Murmur.murmur2("storm") == 393248174
assert Murmur.murmur2("paint") == -1653751818
assert Murmur.murmur2("agony") == -1158062389
assert Murmur.murmur2("strategic") == -758786001
assert Murmur.murmur2("redundancy") == 451414978
end
end

0 comments on commit 40e09e3

Please sign in to comment.