Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default partitioner #329

Merged
merged 1 commit into from
Jan 17, 2019
Merged

Add default partitioner #329

merged 1 commit into from
Jan 17, 2019

Conversation

barthez
Copy link
Contributor

@barthez barthez commented Dec 9, 2018

Add default partitioner that would assign partition if one isn't set for produce request. Partition number is calculated from the message key using murmur2 hashing algorithm, to align with Java implementation. Both Java and Elixir clients will partition messages by keys in the same, deterministic way.

In the case when the key is not provided, the partition is assigned randomly.

Partitioned behavior is available to allow users to reimplement partitioner in the desired way.

Fixes #206

@sourcelevel-bot
Copy link

Hello, @barthez! This is your first Pull Request that will be reviewed by Ebert, an automatic Code Review service. It will leave comments on this diff with potential issues and style violations found in the code as you push new commits. You can also see all the issues found on this Pull Request on its review page. Please check our documentation for more information.

Copy link
Member

@bjhaid bjhaid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this, looks mostly good to me, I left a question.

lib/kafka_ex/partitioner.ex Show resolved Hide resolved
fn
(%{key: key}, {:ok, nil}) when is_binary(key) -> {:ok, key}
(%{key: key}, {:ok, key}) -> {:ok, key}
(%{key: other_key}, {:ok, key}) when is_binary(other_key) -> {:error, "Inconsistent keys within messages", key}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a key is set in the first message, but the second message has a key of nil, this 3rd clause will not catch it, and will continue as if it were set to the previously detected key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine. If key is set only on first message (or only in 1 message), it is ok to assume it is a key for all messages.

Don't you agree?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; In that case, we probably don't need to check any beyond the first, so we can just look at the head of the list and get its key, rather than iterating over a potentially large list of messages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, I'm unsure. Do you know how the Java client would handle this? Does it check all messages in the set, or just the first to determine partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java client allows to send you 1 message at the time, so there is no problem with selecting the key ;).

I decided I will make get_key function to return key only when all messages in the set have the same key.

alias KafkaEx.Utils.Murmur

test "murmur2" do
assert Murmur.murmur2("rule") == -1673595344
Copy link
Member

@joshuawscott joshuawscott Dec 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the source for these expected values? I am seeing different results from online murmur2 (http://murmurhash.shorelabs.com/) and the ruby digest-murmurhash gem. For "rule" I get 2621371952

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I'm seeing different results for every assert to negative numbers, but for redundancy or guest the expected value is the correct but the number are positives, also why are you using murmur2 instead of murmur3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expected values are calculated with java implementation and confirmed with C implementation.

You are getting a different number because the JS implementation returns result casted to uint32 (what umurmur2 is doing).

I chose murmur2 to align with Java implementation. If the same hashing algorithm will be used, it is ensured that messages with the same keys are routed to the same partition regardless of the client.

I'm going to add umurmur2 tests as well.

@barthez
Copy link
Contributor Author

barthez commented Jan 9, 2019

@bjhaid @joshuawscott You can take another look.

lib/kafka_ex/utils/murmur.ex Outdated Show resolved Hide resolved
lib/kafka_ex/utils/murmur.ex Show resolved Hide resolved
lib/kafka_ex/utils/murmur.ex Outdated Show resolved Hide resolved
@joshuawscott
Copy link
Member

Please also run mix format on the codebase to ensure we keep it consistent

@barthez
Copy link
Contributor Author

barthez commented Jan 14, 2019

@bjhaid @joshuawscott Thanks for your comments. I believe I addressed all of them. You can take a final look.

Copy link
Member

@bjhaid bjhaid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

lib/kafka_ex/utils/murmur.ex Outdated Show resolved Hide resolved
Copy link
Member

@joshuawscott joshuawscott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@yoelfme
Copy link

yoelfme commented Jan 15, 2019

wow, that's nice, I was working in a mini lib to do that for my own, but now I can use these changes, thanks @barthez good work, when do you think that you are going to do a new release?, but again thanks!

Add default partitioner that would assign partition if one isn't set for
produce request. Partition number is calculated from the message key
using murmur2 hashing algorithm, to align with Java implementation. Both
Java and Elixir clients will partition messages by keys in the same,
deterministic way.

In the case when the key is not provided, the partition is assigned
randomly.

Partitioned behavior is available to allow users to reimplement
partitioner in the desired way.
@barthez
Copy link
Contributor Author

barthez commented Jan 17, 2019

@bjhaid Can we have this merged & released?

@bjhaid
Copy link
Member

bjhaid commented Jan 17, 2019

@bjhaid Can we have this merged & released?

@barthez I have not released in a while, so I'll defer doing the release to @joshuawscott, I'll merge though

@bjhaid bjhaid merged commit 403024e into kafkaex:master Jan 17, 2019
@joshuawscott joshuawscott mentioned this pull request Apr 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants