-
Notifications
You must be signed in to change notification settings - Fork 162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a "create topics" action #297
Add a "create topics" action #297
Conversation
Hello, @jbruggem! This is your first Pull Request that will be reviewed by Ebert, an automatic Code Review service. It will leave comments on this diff with potential issues and style violations found in the code as you push new commits. You can also see all the issues found on this Pull Request on its review page. Please check our documentation for more information. |
5a5494d
to
d0300b4
Compare
@jbruggem are you ready for review on this yet? Feel free to ping me when you are. |
Thanks for offering! It's actually probably relevant to get a first review. There's probably already a lot of stuff you can point out that will help push this through. |
I should probably rebase first, though ? |
Rebasing would be ideal, yes. I can go ahead and look it over once the conflict is resolved. Thanks! |
82acb5b
to
f374cfb
Compare
I've rebased it. I'm not sure if I've managed to follow the spirit of the existing code correctly in all circumstances. In particular, I had to add the notion of API version in the parsing of metadata and it felt like I didn't do it cleanly enough. thanks again for the review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this! Can you confirm if 0.9.0 supports create topic if it does then you probably don't need a new server type, also you need to add something like this:
kafka_ex/lib/kafka_ex/server_0_p_8_p_0.ex
Line 56 in f374cfb
def kafka_server_offset_fetch(_, _state), do: raise "Offset Fetch is not supported in 0.8.0 version of kafka" |
to all the servers not supporting create_topic, so a user gets a useful error message if they try using it for a wrongly configured server
|
||
def update_metadata(state, api_version) do | ||
{correlation_id, metadata} = retrieve_metadata_with_version(state.brokers, state.correlation_id, config_sync_timeout(), api_version) | ||
metadata_brokers = metadata.brokers |> Enum.map(&(%{&1 | is_controller: &1.node_id == metadata.controller_id})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change necessary for you PR can you indicate why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had to add the api_version
argument to functions handling metadata because in the Metadata API response object version 0 does not provide the controller_id
value, which is needed to create a topic.
Therefore, it was necessary to allow support for the version 1 of the Metadata API, hence the need to specify the version in the function call.
A new server version was added because CreateTopics exists only since 0.10.1 (see also the release notes).
I already did that, but maybe not the way I should have ? |
Sorry my bad, you did the right thing! |
@@ -2,6 +2,8 @@ defmodule KafkaEx.Protocol.Metadata do | |||
alias KafkaEx.Protocol | |||
import KafkaEx.Protocol.Common | |||
|
|||
@default_api_version 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After your previous comment wondering if we should create a new metadata_v1 module? I have not thought deeply about this just throwing it out as I imagine the more versions of things we support to more clunky the constructing request and parsing of response may become...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a relevant question. That's indeed the kind of question that require a deep understanding of this project, which I'm not sure I have yet.
Especially since 0.10.0.0, the client is supposed to ask the broker which API versions are supported (see "Retrieving Supported API versions"). The "broker returns its full list of supported ApiKeys and versions", meaning KafkaEx should use that information to decide which API message and version to use (and allow)
This means that for version >= 0.10.0.0, KafkaEx's behaviour should be quite different than the current hard-coded mapping of versions -> code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave the decision on how to proceed to @joshuawscott and @dantswain both of whom have been more involved in the project more recently than I have, however I think we should have different version modules and after looking up the supported api version either reject the api-version provided by the user or pick the newest one we have a module for and use the module... last time I looked I think there are a couple of versions of the requests that will require some amount of work to implement them, we need as much help as we can get :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if maybe this discussion should happen in another issue, so that we can move forward with this merge request. It seems the scale is much bigger than just adding support for a new message in the protocol. I'd be happy to help, of course !
Would that be OK ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dantswain @joshuawscott what do yall think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it wouldn't be accurate to have a metadata_v1 module, since the API version is per individual API message. For example, CreateTopics could be v3, but the same server could accept SyncGroup v2. Each of the message types needs to be able to generate messages based on the API version(s) that the broker indicates are available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joshuawscott agreed.
78cb301
to
16e9a0f
Compare
* all tests pass locally * added a docker_up script for ubuntu * fix mistakes in kafka_create_topics definition * fix test command in ci_tests
95e91ad
to
db9b487
Compare
All tests pass locally (142 tests, 0 failures -- I've run them many times), but not on the CI. I suppose there are race conditions which become visible in that context. I tried to increase the number of retries, to no avail. |
061ac11
to
f9bf497
Compare
Ebert has finished reviewing this Pull Request and has found:
You can see more details about this review at https://ebertapp.io/github/kafkaex/kafka_ex/pulls/297. |
ae47859
to
c6c4b6e
Compare
I restarted the two tests that failed (elixir 1.1 & elixir 1.5) |
Thanks ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments, but I did not get all the way through the PR unfortunately. I might not be able to touch this for a couple of days, but feel free to disagree with any of the points I raised.
Cheers!
end | ||
|
||
@spec encode_nullable_string(String.t) :: binary | ||
defp encode_nullable_string(text) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullable string could be a nil, but the spec doesn't allow that. This function could use pattern matching instead of an if
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullable string could be a nil, but the spec doesn't allow that.
So I should leave it as it is ?
This function could use pattern matching instead of an
if
as well.
I'll replace the if
with pattern matching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by 'spec' I meant the typespec of the function: encode_nullable_string should take nil | String.t
@@ -2,6 +2,8 @@ defmodule KafkaEx.Protocol.Metadata do | |||
alias KafkaEx.Protocol | |||
import KafkaEx.Protocol.Common | |||
|
|||
@default_api_version 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it wouldn't be accurate to have a metadata_v1 module, since the API version is per individual API message. For example, CreateTopics could be v3, but the same server could accept SyncGroup v2. Each of the message types needs to be able to generate messages based on the API version(s) that the broker indicates are available.
lib/kafka_ex/protocol/metadata.ex
Outdated
@@ -93,18 +97,44 @@ defmodule KafkaEx.Protocol.Metadata do | |||
} | |||
end | |||
|
|||
def create_request(correlation_id, client_id, ""), do: KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id) <> << 0 :: 32-signed >> | |||
def valid_api_version(v) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that we need this function, since we could just have the api_version arguments default to @default_api_version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll fix that
lib/kafka_ex/protocol/metadata.ex
Outdated
@@ -113,13 +143,55 @@ defmodule KafkaEx.Protocol.Metadata do | |||
parse_brokers(brokers_size - 1, rest, [%Broker{node_id: node_id, host: host, port: port} | brokers]) | |||
end | |||
|
|||
defp parse_brokers_v1(0, rest, brokers), do: {brokers, rest} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Attaching version numbers to the functions will become cumbersome, I think. Many of the APIs have several versions, but those version might not have any differences, which might lend itself better to having one a function parameter that is the API version.
For example, Metadata Response has no differences in the brokers
field from versions 1-6.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. I think it's going to become cumbersome either way, to be honest :).
I've switched to using a function parameter instead. Tell me what you think !
lib/kafka_ex/server.ex
Outdated
@@ -378,17 +396,27 @@ defmodule KafkaEx.Server do | |||
|
|||
# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity | |||
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic \\ []), do: retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, 0) | |||
|
|||
# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity | |||
def retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, api_version, topic \\ []), do: retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, api_version, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should just add api_version
as the last argument to retrieve_metadata
and default it to @default_api_version
(0). No need to add another function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
lib/kafka_ex/server.ex
Outdated
# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity | ||
def retrieve_metadata(_, correlation_id, _sync_timeout, topic, 0, error_code) do | ||
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}") | ||
{correlation_id, %Metadata.Response{}} | ||
end | ||
|
||
# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity | ||
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, error_code) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be removed by using a default parameter as suggested above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
c6c4b6e
to
d3f384e
Compare
Ok, I addressed all comments the best I could. Tests pass locally (not always on Travis). |
* add a new target to Travis CI's config * fix an error in docker_up_ubuntu
… in parse_brokers
8f4cd86
to
af4a79a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple more minor things that I see, and then it looks good
Thanks so much for taking this on!
|
||
resp = KafkaEx.create_topics([request], timeout: 2000) | ||
# error = TOPIC_ALREADY_EXISTS | ||
assert {36, name} == parse_create_topic_resp(resp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to add 36 to the @error_map
in lib/kafka_ex/protocol.ex
and then refer to this as the error code (:topic_already_exists
) rather than 36
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, thanks ! I've added a few error codes, might as well :).
OK, I think this is now ready for merge ! |
I'm going to wait until the end of the day to let @bjhaid or @dantswain weigh in if they wish, then I'll merge. Thanks again! |
My pleasure! We were happy to be able to contribute to a nice open-source project, and it was very useful for us to have this feature implemented. I'm open to the idea of further contributions of course. |
awesome 🎉. Thanks for all your help ! |
TODO:
A new server version was added because CreateTopics was added in 0.10.1 (see also the release notes).
Closes #168