Skip to content

Commit

Permalink
Migrating from 101tec ZkClient to Helix ZkClient and replacing AdminU…
Browse files Browse the repository at this point in the history
…tils with AdminClient

Helix ZkClient optimizes getChildren by implementing pagination which is required to improve
scalability in Brooklin server to list large number of child nodes.
As a extension of this effort we also need to replace kafka.admin.AdminUtils usage with
kafka.client.AdminClient because AdminUtils depends on older 101tec library.
  • Loading branch information
Suraj Kasi committed Aug 20, 2021
1 parent ddde7ae commit 4348b9d
Show file tree
Hide file tree
Showing 19 changed files with 264 additions and 177 deletions.
5 changes: 2 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ project(':datastream-server-api') {
project(':datastream-utils') {
dependencies {
compile project(':datastream-common')
compile "com.101tec:zkclient:$zkclientVersion"
compile "org.apache.helix:zookeeper-api:$helixZkclientVersion"
compile "com.google.guava:guava:$guavaVersion"
testCompile project(":datastream-kafka_$scalaSuffix")
testCompile project(":datastream-testcommon_$scalaSuffix")
Expand Down Expand Up @@ -319,7 +319,7 @@ project(':datastream-client') {
project(':datastream-server') {

dependencies {
compile "com.101tec:zkclient:$zkclientVersion"
compile "org.apache.helix:zookeeper-api:$helixZkclientVersion"
compile "org.codehaus.jackson:jackson-core-asl:$jacksonVersion"
compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"

Expand Down Expand Up @@ -355,7 +355,6 @@ project(':datastream-server-restli') {
compile "com.linkedin.pegasus:restli-netty-standalone:$pegasusVersion"
compile "com.linkedin.pegasus:r2-jetty:$pegasusVersion"
compile "com.linkedin.parseq:parseq:$parseqVersion"
compile "com.101tec:zkclient:$zkclientVersion"
compile "org.codehaus.jackson:jackson-core-asl:$jacksonVersion"
compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
compile "org.apache.commons:commons-lang3:$commonslang3Version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ private Datastream createDatastream(String name, String topicName) {
@Test
public void testConnectorWithStartPosition() throws UnsupportedEncodingException, DatastreamValidationException {
String topicName = "testConnectorWithStartPosition";
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _zkUtils, topicName, 0, 100);
long ts = System.currentTimeMillis();
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _zkUtils, topicName, 100, 100);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _adminClient, topicName, 0, 100);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _adminClient, topicName, 100, 100);
Datastream ds = createDatastream("testConnectorPopulatesPartitions", topicName);
Map<Integer, Long> offsets = Collections.singletonMap(0, 100L);
KafkaConnector connector =
Expand All @@ -92,8 +91,8 @@ public void testInitializeDatastreamWithNonexistentTopic() throws DatastreamVali
@Test
public void testPopulatingDefaultSerde() throws Exception {
String topicName = "testPopulatingDefaultSerde";
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _zkUtils, topicName, 0, 100);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _zkUtils, topicName, 100, 100);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _adminClient, topicName, 0, 100);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _adminClient, topicName, 100, 100);
Datastream ds = createDatastream("testPopulatingDefaultSerde", topicName);
KafkaConnector connector =
new KafkaConnector("test", getDefaultConfig(null), "testCluster");
Expand All @@ -107,7 +106,7 @@ public void testPopulatingDefaultSerde() throws Exception {
@Test
public void testConnectorPopulatesPartitions() throws UnsupportedEncodingException, DatastreamValidationException {
String topicName = "testConnectorPopulatesPartitions";
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _zkUtils, topicName, 0, 10);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _adminClient, topicName, 0, 10);

Datastream ds = createDatastream("testConnectorPopulatesPartitions", topicName);
KafkaConnector connector =
Expand Down Expand Up @@ -147,9 +146,9 @@ private void executeTestGroupIdAssignment(boolean isGroupIdHashingEnabled) throw
String topicName2 = "topic2";
String topicName3 = "topic3";

TestKafkaConnectorTask.produceEvents(_kafkaCluster, _zkUtils, topicName1, 0, 100);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _zkUtils, topicName2, 0, 100);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _zkUtils, topicName3, 0, 100);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _adminClient, topicName1, 0, 100);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _adminClient, topicName2, 0, 100);
TestKafkaConnectorTask.produceEvents(_kafkaCluster, _adminClient, topicName3, 0, 100);

Properties properties = getDefaultConfig(null);
properties.put(AbstractKafkaConnector.IS_GROUP_ID_HASHING_ENABLED, Boolean.toString(isGroupIdHashingEnabled));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -37,7 +38,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import kafka.utils.ZkUtils;

import com.linkedin.data.template.StringMap;
import com.linkedin.datastream.common.Datastream;
Expand Down Expand Up @@ -77,8 +77,8 @@ public class TestKafkaConnectorTask extends BaseKafkaZkTest {
private static final int POLL_TIMEOUT_MS = 25000;
private static final long CONNECTOR_AWAIT_STOP_TIMEOUT_MS = 30000;

protected static void produceEvents(DatastreamEmbeddedZookeeperKafkaCluster cluster, ZkUtils zkUtils, String topic, int index, int numEvents)
throws UnsupportedEncodingException {
protected static void produceEvents(DatastreamEmbeddedZookeeperKafkaCluster cluster, AdminClient adminClient,
String topic, int index, int numEvents) throws UnsupportedEncodingException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBrokers());
props.put(ProducerConfig.ACKS_CONFIG, "all");
Expand All @@ -89,7 +89,7 @@ protected static void produceEvents(DatastreamEmbeddedZookeeperKafkaCluster clus
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());

createTopic(zkUtils, topic);
createTopic(adminClient, topic);
try (Producer<byte[], byte[]> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < numEvents; i++) {
final int finalIndex = index;
Expand Down Expand Up @@ -149,18 +149,18 @@ public void testKafkaGroupId() throws Exception {
@Test
public void testConsumeWithStartingOffsetAndNoResetStrategy() throws Exception {
String topic = "pizza1";
createTopic(_zkUtils, topic);
createTopic(_adminClient, topic);

LOG.info("Sending first set of events");

//produce 100 msgs to topic before start
produceEvents(_kafkaCluster, _zkUtils, topic, 0, 100);
produceEvents(_kafkaCluster, _adminClient, topic, 0, 100);
Map<Integer, Long> startOffsets = Collections.singletonMap(0, 100L);

LOG.info("Sending second set of events");

//produce 100 msgs to topic before start
produceEvents(_kafkaCluster, _zkUtils, topic, 100, 100);
produceEvents(_kafkaCluster, _adminClient, topic, 100, 100);

//start
MockDatastreamEventProducer datastreamProducer = new MockDatastreamEventProducer();
Expand All @@ -180,7 +180,7 @@ public void testConsumeWithStartingOffsetAndNoResetStrategy() throws Exception {
LOG.info("Sending third set of events");

//send 100 more msgs
produceEvents(_kafkaCluster, _zkUtils, topic, 1000, 100);
produceEvents(_kafkaCluster, _adminClient, topic, 1000, 100);

if (!PollUtils.poll(() -> datastreamProducer.getEvents().size() == 200, 100, POLL_TIMEOUT_MS)) {
Assert.fail("did not transfer 200 msgs within timeout. transferred " + datastreamProducer.getEvents().size());
Expand All @@ -194,18 +194,18 @@ public void testConsumeWithStartingOffsetAndNoResetStrategy() throws Exception {
@Test
public void testConsumeWithStartingOffsetAndResetStrategy() throws Exception {
String topic = "pizza1";
createTopic(_zkUtils, topic);
createTopic(_adminClient, topic);

LOG.info("Sending first set of events");

//produce 100 msgs to topic before start
produceEvents(_kafkaCluster, _zkUtils, topic, 0, 100);
produceEvents(_kafkaCluster, _adminClient, topic, 0, 100);
Map<Integer, Long> startOffsets = Collections.singletonMap(0, 100L);

LOG.info("Sending second set of events");

//produce 100 msgs to topic before start
produceEvents(_kafkaCluster, _zkUtils, topic, 100, 100);
produceEvents(_kafkaCluster, _adminClient, topic, 100, 100);

//start
MockDatastreamEventProducer datastreamProducer = new MockDatastreamEventProducer();
Expand All @@ -226,7 +226,7 @@ public void testConsumeWithStartingOffsetAndResetStrategy() throws Exception {
LOG.info("Sending third set of events");

//send 100 more msgs
produceEvents(_kafkaCluster, _zkUtils, topic, 1000, 100);
produceEvents(_kafkaCluster, _adminClient, topic, 1000, 100);

if (!PollUtils.poll(() -> datastreamProducer.getEvents().size() == 200, 100, POLL_TIMEOUT_MS)) {
Assert.fail("did not transfer 200 msgs within timeout. transferred " + datastreamProducer.getEvents().size());
Expand All @@ -240,7 +240,7 @@ public void testConsumeWithStartingOffsetAndResetStrategy() throws Exception {
@Test
public void testCommittingOffsetRegularly() throws Exception {
String topic = "pizza1";
createTopic(_zkUtils, topic);
createTopic(_adminClient, topic);

//start
MockDatastreamEventProducer datastreamProducer = new MockDatastreamEventProducer();
Expand Down Expand Up @@ -278,10 +278,10 @@ public Consumer<byte[], byte[]> createConsumer(Properties properties) {
@Test
public void testConsumerBaseCase() throws Exception {
String topic = "Pizza2";
createTopic(_zkUtils, topic);
createTopic(_adminClient, topic);

LOG.info("Sending first event, to avoid an empty topic.");
produceEvents(_kafkaCluster, _zkUtils, topic, 0, 1);
produceEvents(_kafkaCluster, _adminClient, topic, 0, 1);

LOG.info("Creating and Starting KafkaConnectorTask");
Datastream datastream = getDatastream(_broker, topic);
Expand All @@ -292,7 +292,7 @@ public void testConsumerBaseCase() throws Exception {
KafkaConnectorTask connectorTask = createKafkaConnectorTask(task);

LOG.info("Producing 100 msgs to topic: " + topic);
produceEvents(_kafkaCluster, _zkUtils, topic, 1000, 100);
produceEvents(_kafkaCluster, _adminClient, topic, 1000, 100);

if (!PollUtils.poll(() -> datastreamProducer.getEvents().size() == 100, 100, POLL_TIMEOUT_MS)) {
Assert.fail("did not transfer 100 msgs within timeout. transferred " + datastreamProducer.getEvents().size());
Expand All @@ -308,10 +308,10 @@ public void testConsumerPositionTracking() throws Exception {
final KafkaBasedConnectorConfig config = new KafkaBasedConnectorConfigBuilder().build();

final String topic = "ChicagoStylePizza";
createTopic(_zkUtils, topic);
createTopic(_adminClient, topic);

LOG.info("Sending first event, to avoid an empty topic.");
produceEvents(_kafkaCluster, _zkUtils, topic, 0, 1);
produceEvents(_kafkaCluster, _adminClient, topic, 0, 1);

LOG.info("Creating and Starting KafkaConnectorTask");
final Datastream datastream = getDatastream(_broker, topic);
Expand All @@ -322,7 +322,7 @@ public void testConsumerPositionTracking() throws Exception {
final KafkaConnectorTask connectorTask = createKafkaConnectorTask(task, config);

LOG.info("Producing 100 msgs to topic: " + topic);
produceEvents(_kafkaCluster, _zkUtils, topic, 1000, 100);
produceEvents(_kafkaCluster, _adminClient, topic, 1000, 100);

if (!PollUtils.poll(() -> datastreamProducer.getEvents().size() == 100, 100, POLL_TIMEOUT_MS)) {
Assert.fail("did not transfer 100 msgs within timeout. transferred " + datastreamProducer.getEvents().size());
Expand All @@ -336,7 +336,7 @@ public void testConsumerPositionTracking() throws Exception {
@Test
public void testRewindWhenSkippingMessage() throws Exception {
String topic = "pizza1";
createTopic(_zkUtils, topic);
createTopic(_adminClient, topic);
AtomicInteger i = new AtomicInteger(0);

//Throw exactly one error message when sending the messages, causing partition to be paused for exactly once
Expand Down Expand Up @@ -414,10 +414,10 @@ public void testSslConsumerProperties() {
@Test
public void testFlakyProducer() throws Exception {
String topic = "pizza3";
createTopic(_zkUtils, topic);
createTopic(_adminClient, topic);

LOG.info("Sending first event, to avoid an empty topic.");
produceEvents(_kafkaCluster, _zkUtils, topic, 0, 1);
produceEvents(_kafkaCluster, _adminClient, topic, 0, 1);

class State {
int messagesProcessed = 0;
Expand All @@ -443,7 +443,7 @@ class State {
KafkaConnectorTask connectorTask = createKafkaConnectorTask(task);

LOG.info("Producing 100 msgs to topic: " + topic);
produceEvents(_kafkaCluster, _zkUtils, topic, 1000, 100);
produceEvents(_kafkaCluster, _adminClient, topic, 1000, 100);

if (!PollUtils.poll(() -> state.messagesProcessed >= 100, 100, POLL_TIMEOUT_MS)) {
Assert.fail("did not transfer 100 msgs within timeout. transferred " + state.messagesProcessed);
Expand All @@ -458,10 +458,10 @@ class State {
@SuppressWarnings("rawtypes")
public void testFlakyConsumer() throws Exception {
String topic = "Pizza2";
createTopic(_zkUtils, topic);
createTopic(_adminClient, topic);

LOG.info("Sending first event, to avoid an empty topic.");
produceEvents(_kafkaCluster, _zkUtils, topic, 0, 1);
produceEvents(_kafkaCluster, _adminClient, topic, 0, 1);

LOG.info("Creating and Starting KafkaConnectorTask");
Datastream datastream = getDatastream(_broker, topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testConsumerOffsetsDiagEndpoint() {
// create topics
List<String> topics = new ArrayList<>();
IntStream.range(0, TOPIC_COUNT).forEach(i -> topics.add("topic" + i));
topics.forEach(topic -> createTopic(_zkUtils, topic, PARTITION_COUNT));
topics.forEach(topic -> createTopic(_adminClient, topic, PARTITION_COUNT));

// setup datastream and connector
Datastream datastream = KafkaMirrorMakerConnectorTestUtils.createDatastream("topicStream", _broker, "topic\\d+");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import kafka.admin.AdminUtils;

import com.linkedin.data.template.StringMap;
import com.linkedin.datastream.common.Datastream;
Expand Down Expand Up @@ -221,9 +220,7 @@ public void testValidateDatastreamUpdatePausedPartitions() throws Exception {
coordinator.initializeDatastream(datastream);

// create topic
if (!AdminUtils.topicExists(_zkUtils, topic)) {
AdminUtils.createTopic(_zkUtils, topic, 2, 1, new Properties(), null);
}
createTopic(_adminClient, topic, 2);

// Make sure "*" is converted to a list of partitions
pausedPartitions.put(topic, new HashSet<>(Collections.singletonList("*")));
Expand Down Expand Up @@ -274,9 +271,9 @@ public void testMirrorMakerConnectorBasics() {
String saltyTopic = "SaltyPizza";
String saladTopic = "HealthySalad";

createTopic(_zkUtils, saladTopic);
createTopic(_zkUtils, yummyTopic);
createTopic(_zkUtils, saltyTopic);
createTopic(_adminClient, saladTopic);
createTopic(_adminClient, yummyTopic);
createTopic(_adminClient, saltyTopic);

// create a datastream to consume from topics ending in "Pizza"
Datastream datastream = KafkaMirrorMakerConnectorTestUtils.createDatastream("pizzaStream", _broker, "\\w+Pizza");
Expand Down Expand Up @@ -378,7 +375,7 @@ public void testStartConsumingFromNewTopic() {
String yummyTopic = "YummyPizza";
String saltyTopic = "SaltyPizza";

createTopic(_zkUtils, yummyTopic);
createTopic(_adminClient, yummyTopic);

// create a datastream to consume from topics ending in "Pizza"
Datastream datastream = KafkaMirrorMakerConnectorTestUtils.createDatastream("pizzaStream", _broker, "\\w+Pizza");
Expand Down Expand Up @@ -409,7 +406,7 @@ public void testStartConsumingFromNewTopic() {
"Expected record from YummyPizza topic but got record from " + record.getDestination().get());

// create a new topic ending in "Pizza" and produce an event to it
createTopic(_zkUtils, saltyTopic);
createTopic(_adminClient, saltyTopic);
KafkaMirrorMakerConnectorTestUtils.produceEvents(saltyTopic, 1, _kafkaCluster);

if (!PollUtils.poll(() -> datastreamProducer.getEvents().size() == 2, POLL_PERIOD_MS, POLL_TIMEOUT_MS)) {
Expand All @@ -429,8 +426,8 @@ public void testConsumptionUnaffectedByDeleteTopic() {
String yummyTopic = "YummyPizza";
String saltyTopic = "SaltyPizza";

createTopic(_zkUtils, yummyTopic);
createTopic(_zkUtils, saltyTopic);
createTopic(_adminClient, yummyTopic);
createTopic(_adminClient, saltyTopic);

// create a datastream to consume from topics ending in "Pizza"
Datastream datastream = KafkaMirrorMakerConnectorTestUtils.createDatastream("pizzaStream", _broker, "\\w+Pizza");
Expand Down Expand Up @@ -460,7 +457,7 @@ public void testConsumptionUnaffectedByDeleteTopic() {
datastreamProducer.getEvents().clear();

// delete the topic
deleteTopic(_zkUtils, yummyTopic);
deleteTopic(_adminClient, yummyTopic);

// produce another event to SaltyPizza
KafkaMirrorMakerConnectorTestUtils.produceEvents(saltyTopic, 1, _kafkaCluster);
Expand Down Expand Up @@ -611,7 +608,7 @@ public void testFetchPartitionChange() throws Exception {

String yummyTopic = "YummyPizza";

createTopic(_zkUtils, yummyTopic, 1);
createTopic(_adminClient, yummyTopic, 1);

// create a datastream to consume from topics ending in "Pizza"
Datastream datastream = KafkaMirrorMakerConnectorTestUtils.createDatastream("pizzaStream", _broker, "\\w+Pizza");
Expand All @@ -637,7 +634,7 @@ public void testFetchPartitionChange() throws Exception {
ImmutableSet.of(yummyTopic + "-0"));

String saltyTopic = "SaltyPizza";
createTopic(_zkUtils, saltyTopic, 2);
createTopic(_adminClient, saltyTopic, 2);

Assert.assertTrue(PollUtils.poll(() -> partitionChangeCalls.get() == 2, POLL_PERIOD_MS, POLL_TIMEOUT_MS));
partitionInfo = connector.getDatastreamPartitions();
Expand Down Expand Up @@ -716,7 +713,7 @@ public void testChangeDatatstreamAssignment() throws Exception {
@Test
public void testGetStateForMultipleTasks() throws Exception {
List<String> topics = Arrays.asList("YummyPizza", "SaltyPizza", "HealthySalad");
topics.forEach(t -> createTopic(_zkUtils, t));
topics.forEach(t -> createTopic(_adminClient, t));

// create a datastream to consume from topics ending in "Pizza"
Datastream datastream = KafkaMirrorMakerConnectorTestUtils.createDatastream("pizzaStream", _broker, "\\w+Pizza");
Expand Down
Loading

0 comments on commit 4348b9d

Please sign in to comment.