-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrating from AdminUtils with AdminClient #848
Migrating from AdminUtils with AdminClient #848
Conversation
@@ -21,4 +21,5 @@ ext { | |||
testngVersion = "7.1.0" | |||
zkclientVersion = "0.11" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should probably delete this as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no usages you should
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still required. Will remove this in a the following PR where I fully remove and replace 101tec ZkClient with Helix ZkClient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@surajkn did a walkthrough of the changes and explained the differences in APIs. Waiting for him to confirm config changes for Kafka client before approving the PR.
90983f2
to
4348b9d
Compare
* @param topic the topic to wait for broker assignment | ||
* @param brokerList the brokers in the Kafka cluster | ||
* @throws IllegalStateException if the topic is not ready before the timeout ({@value #DEFAULT_TIMEOUT_MS} ms) | ||
*/ | ||
public static void waitForTopicCreation(ZkUtils zkUtils, String topic, String brokerList) throws IllegalStateException { | ||
Validate.notNull(zkUtils); | ||
public static void waitForTopicCreation(AdminClient adminClient, String topic, String brokerList) throws IllegalStateException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we need this anymore. In this patch I replace calls to AdminUtils.createTopic with calls to AdminClient.createTopics method, and createTopics essentially returns a "KafkaFuture" object on which I currently do a blocking wait (to wait for topic to be created). So we probably don't need this anymore? I have currently removed all calls to this method in test cases and its working fine.
* @param adminClient AdminClient instance to check if topics exists | ||
* @param topic Topic name | ||
*/ | ||
public static boolean topicExists(AdminClient adminClient, String topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to import "topicExists" method from BaseKafkaZkTest class here but I kept getting "package com.linkedin.datastream.testutil does not exist" error so I copied the method here. Its probably some classpath issue but I am not sure how to resolve it in this repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because of the circular dependency problem we discussed offline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's a circular dep issue, then could we reference this function from BaseKafkaZkTest class's call to avoid duplicate code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can check but I doubt that will work. I guess trying to do it either way will still result in circular dependency.
// Removing from topic config since its passed as a direct argument | ||
topicProperties.remove("replicationFactor"); | ||
// TopicExistsException is thrown if topic exists. Its a no-op. | ||
adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By doing ".all().get()" we have a blocking wait for the topic to be created. I am not sure what exactly was the behavior with AdminUtils and/or if doing a blocking wait is the correct thing to do here. I am guessing this is the correct way?
However, having said that. On repeated execution of all the unit tests, I noticed that the test "testCreateDatastreamHappyPathDefaultRetention" is a little flaky in that it randomly fails every now and then (very rarely though, hard to reproduce). This test calls "getRetention" which queries topic config and returns retention time if its exists in policy else null, and when this test fails its because getRetention returned null instead of actual expected value. I am not sure if this test was flaky even before. I am not sure how to explain this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summarizing the offline discussion here:
There's no callbacks and returned futures with AdminUtils.createTopic
. No hints that it's an asynchronous operation. I'm not a scala expert, but from the looks of it it's a synchronous call.
As for the failing/flaky test:
- Was this test flaky even before or it became flaky as a result of the API changes? We need to rule out changes in behavior of the system after switching from
AdminUtils
toAdminClient
- Does adding a thread sleep after topic creation (and before querying the retention of the new topic) make the test pass?
For (1), we'll reach out to Kafka to confirm that there are no changes in behavior in createTopic APIs. For (2) we can try fixing the flaky test in a separate PR.
} | ||
} catch (Throwable e) { | ||
LOG.error("Creating topic {} failed with exception {}", topicName, e); | ||
throw e; | ||
} | ||
} | ||
|
||
private AdminClient getAdminClient(Datastream datastream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to cache AdminClient instances based on the destinationBrokers ? Depends on how frequently this method is called and how expensive it is to re-instantiate AdminClient object, but I am not sure of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In an offline discussion with Jhora we decided to not do it for now and do it later if the need arises.
|
||
KafkaTestUtils.waitForTopicCreation(_zkUtils, topicName, _kafkaCluster.getBrokers()); | ||
//KafkaTestUtils.waitForTopicCreation(_adminClient, topicName, _kafkaCluster.getBrokers()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove this in following update to this patch. Same for another instance below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// TopicExistsException is thrown if topic exists. Its a no-op. | ||
adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
if (e instanceof TopicExistsException || e.getCause() instanceof TopicExistsException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be missing something here, but org.apache.kafka.common.errors.TopicExistsException
is neither an InterruptedException
nor an ExecutionException
according to https://kafka.apache.org/28/javadoc//org/apache/kafka/common/errors/TopicExistsException.html.
How can the first condition of this if statement ever be true? (and I know this code was there even before)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I found this reference implementation in LiKafkaTransporterProviderAdmin implementation here
But let me double check this once anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So looks like TopicExistsException has an inheritance hierarchy (chain) that inherits from java.lang.Exception and both InterruptedException and ExecutionException also inherit from same java.lang.Excepption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that, my point was that TopicExistsException
is neither a descendant of InterruptedException
nor is it a descendant of ExecutionException
. That first condition is always false
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea so, we may need recursive calls to get the cause of the exceptions until we hit the topic exists right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I suspect TopicExistsException is raised as "ExecutionException(new TopicExistsException())" and that is why it needs to be handled this way. If I split it and add a separate catch block for TopicExistsException then it fails to catch that exception and that exception goes uncaught. I tried it and saw a few unit tests fail because of it.
* @param adminClient AdminClient instance to check if topics exists | ||
* @param topic Topic name | ||
*/ | ||
public static boolean topicExists(AdminClient adminClient, String topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because of the circular dependency problem we discussed offline?
@@ -21,4 +21,5 @@ ext { | |||
testngVersion = "7.1.0" | |||
zkclientVersion = "0.11" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no usages you should
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please split this PR and separate out the admin client change? AdminClient change can be merged as is.
For the zk client migration part, we can see if we have a jfrog branching. Otherwise, we can duplicate some of the code and have different classes called using config knob and then later get rid of the knob, if things are working fine. It may duplicate some code, but will definitely unblock you and remove your dependency from jfrog branching.
4348b9d
to
7eb32d2
Compare
Undid the ZkClient changes to revert back to extending 101tec ZkClient
* @param adminClient AdminClient instance to check if topics exists | ||
* @param topic Topic name | ||
*/ | ||
public static boolean topicExists(AdminClient adminClient, String topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's a circular dep issue, then could we reference this function from BaseKafkaZkTest class's call to avoid duplicate code?
...java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnector.java
Show resolved
Hide resolved
datastream-testcommon/src/main/java/com/linkedin/datastream/testutil/BaseKafkaZkTest.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProviderAdmin.java
Show resolved
Hide resolved
// TopicExistsException is thrown if topic exists. Its a no-op. | ||
adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
if (e instanceof TopicExistsException || e.getCause() instanceof TopicExistsException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea so, we may need recursive calls to get the cause of the exceptions until we hit the topic exists right?
7eb32d2
to
9a12243
Compare
Replace kafka.admin.AdminUtils usage with kafka.client.AdminClient because AdminUtils is deprecated. Also in a following effort 101tec ZkClient will be replaced by helix ZkClient and this is required for that.
9a12243
to
3aca65b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Replace kafka.admin.AdminUtils usage with kafka.client.AdminClient because AdminUtils is deprecated. Also in a following effort 101tec ZkClient will be replaced by helix ZkClient and this is required for that.
Replace kafka.admin.AdminUtils usage with kafka.client.AdminClient because AdminUtils is deprecated.
Also in a following effort 101tec ZkClient will be replaced by helix ZkClient and this is required for that.
Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.
Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md