diff --git a/pom.xml b/pom.xml
index 30a14f3efb7e3..6aff0c64e4c57 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,6 +90,7 @@ flexible messaging model and an intuitive client API.
pulsar-testclient
pulsar-broker-auth-athenz
pulsar-client-auth-athenz
+ pulsar-client-kafka-compat
pulsar-zookeeper
all
diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pom.xml
new file mode 100644
index 0000000000000..6c5cb27d95835
--- /dev/null
+++ b/pulsar-client-kafka-compat/pom.xml
@@ -0,0 +1,43 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.pulsar
+ pulsar
+ 1.20.0-incubating-SNAPSHOT
+ ..
+
+
+ pulsar-client-kafka-compat
+ Pulsar Kafka compatibility
+
+ pom
+
+
+ pulsar-client-kafka
+ pulsar-client-kafka-tests
+
+
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml
new file mode 100644
index 0000000000000..59c2b35eabe8a
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml
@@ -0,0 +1,70 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.pulsar
+ pulsar-client-kafka-compat
+ 1.20.0-incubating-SNAPSHOT
+ ..
+
+
+ pulsar-client-kafka-tests
+ Pulsar Kafka compatibility :: Tests
+
+ Tests to verify the correct shading configuration for the pulsar-client-kafka wrapper
+
+
+
+ ${project.groupId}
+ pulsar-client-kafka
+ ${project.version}
+
+
+
+ ${project.groupId}
+ pulsar-broker
+ ${project.version}
+ test
+
+
+
+ ${project.groupId}
+ pulsar-broker
+ ${project.version}
+ test
+ test-jar
+
+
+
+ ${project.groupId}
+ managed-ledger
+ ${project.version}
+ test
+ test-jar
+
+
+
+
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
new file mode 100644
index 0000000000000..59c459a51e628
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumerExample {
+ public static void main(String[] args) {
+ String topic = "persistent://sample/standalone/ns/my-topic";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "pulsar://localhost:6650");
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", IntegerDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ Consumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+
+ while (true) {
+ ConsumerRecords records = consumer.poll(100);
+ records.forEach(record -> {
+ log.info("Received record: {}", record);
+ });
+
+ // Commit last offset
+ consumer.commitSync();
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
new file mode 100644
index 0000000000000..2cb5aacfaddcd
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerExample {
+ public static void main(String[] args) {
+ String topic = "persistent://sample/standalone/ns/my-topic";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "pulsar://localhost:6650");
+
+ props.put("key.serializer", IntegerSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ Producer producer = new KafkaProducer<>(props);
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord(topic, i, Integer.toString(i)));
+ log.info("Message {} sent successfully", i);
+ }
+
+ producer.close();
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java
new file mode 100644
index 0000000000000..034d2f2d1a1f5
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.tests;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class KafkaApiTest extends BrokerTestBase {
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 30000)
+ public void testSimpleProducerConsumer() throws Exception {
+ String topic = "persistent://sample/standalone/ns/testSimpleProducerConsumer";
+
+ Properties producerProperties = new Properties();
+ producerProperties.put("bootstrap.servers", brokerUrl.toString());
+ producerProperties.put("key.serializer", IntegerSerializer.class.getName());
+ producerProperties.put("value.serializer", StringSerializer.class.getName());
+ Producer producer = new KafkaProducer<>(producerProperties);
+
+ Properties consumerProperties = new Properties();
+ consumerProperties.put("bootstrap.servers", brokerUrl.toString());
+ consumerProperties.put("group.id", "my-subscription-name");
+ consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName());
+ consumerProperties.put("value.deserializer", StringDeserializer.class.getName());
+ consumerProperties.put("enable.auto.commit", "true");
+ Consumer consumer = new KafkaConsumer<>(consumerProperties);
+ consumer.subscribe(Arrays.asList(topic));
+
+ List offsets = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ RecordMetadata md = producer.send(new ProducerRecord(topic, i, "hello-" + i)).get();
+ offsets.add(md.offset());
+ log.info("Published message at {}", Long.toHexString(md.offset()));
+ }
+
+ producer.flush();
+ producer.close();
+
+ for (int i = 0; i < 10; i++) {
+ ConsumerRecords records = consumer.poll(1000);
+ assertEquals(records.count(), 1);
+
+ int idx = i;
+ records.forEach(record -> {
+ log.info("Received record: {}", record);
+ assertEquals(record.key().intValue(), idx);
+ assertEquals(record.value(), "hello-" + idx);
+ assertEquals(record.offset(), offsets.get(idx).longValue());
+ });
+ }
+
+ consumer.close();
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaApiTest.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
new file mode 100644
index 0000000000000..14fcd845af39b
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -0,0 +1,124 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.pulsar
+ pulsar-client-kafka-compat
+ 1.20.0-incubating-SNAPSHOT
+ ..
+
+
+ pulsar-client-kafka
+ Pulsar Kafka compatibility :: API
+
+ Drop-in replacement for Kafka client library that publishes and consumes
+ messages on Pulsar topics
+
+
+ 0.10.2.1
+
+
+
+
+ ${project.groupId}
+ pulsar-client
+ ${project.version}
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+
+ ${project.groupId}
+ pulsar-broker
+ ${project.version}
+ test
+
+
+
+ ${project.groupId}
+ pulsar-broker
+ ${project.version}
+ test
+ test-jar
+
+
+
+ ${project.groupId}
+ managed-ledger
+ ${project.version}
+ test
+ test-jar
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ package
+
+ shade
+
+
+ true
+ true
+
+
+ org.apache.kafka:kafka-clients
+
+
+
+
+ org.apache.kafka.clients.producer.KafkaProducer
+ org.apache.kafka.clients.producer.OriginalKafkaProducer
+
+
+ org.apache.kafka.clients.producer.PulsarKafkaProducer
+ org.apache.kafka.clients.producer.KafkaProducer
+
+
+ org.apache.kafka.clients.consumer.KafkaConsumer
+ org.apache.kafka.clients.consumer.OriginalKafkaConsumer
+
+
+ org.apache.kafka.clients.consumer.PulsarKafkaConsumer
+ org.apache.kafka.clients.consumer.KafkaConsumer
+
+
+
+
+
+
+
+
+
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
new file mode 100644
index 0000000000000..f4a4c5d8551a3
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+
+import com.google.common.collect.Lists;
+
+public class PulsarKafkaConsumer implements Consumer, MessageListener {
+
+ private static final long serialVersionUID = 1L;
+
+ private final PulsarClient client;
+
+ private final Deserializer keyDeserializer;
+ private final Deserializer valueDeserializer;
+
+ private final String groupId;
+ private final boolean isAutoCommit;
+
+ private final ConcurrentMap consumers = new ConcurrentHashMap<>();
+
+ private final Map lastReceivedOffset = new ConcurrentHashMap<>();
+ private final Map lastCommittedOffset = new ConcurrentHashMap<>();
+
+ private static class QueueItem {
+ final org.apache.pulsar.client.api.Consumer consumer;
+ final Message message;
+
+ QueueItem(org.apache.pulsar.client.api.Consumer consumer, Message message) {
+ this.consumer = consumer;
+ this.message = message;
+ }
+ }
+
+ // Since a single Kafka consumer can receive from multiple topics, we need to multiplex all the different
+ // topics/partitions into a single queues
+ private final BlockingQueue receivedMessages = new ArrayBlockingQueue<>(1000);
+
+ public PulsarKafkaConsumer(Map configs) {
+ this(configs, null, null);
+ }
+
+ public PulsarKafkaConsumer(Map configs, Deserializer keyDeserializer,
+ Deserializer valueDeserializer) {
+ this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
+ keyDeserializer, valueDeserializer);
+ }
+
+ public PulsarKafkaConsumer(Properties properties) {
+ this(properties, null, null);
+ }
+
+ public PulsarKafkaConsumer(Properties properties, Deserializer keyDeserializer,
+ Deserializer valueDeserializer) {
+ this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
+ keyDeserializer, valueDeserializer);
+ }
+
+ @SuppressWarnings("unchecked")
+ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer,
+ Deserializer valueDeserializer) {
+
+ if (keyDeserializer == null) {
+ this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ Deserializer.class);
+ this.keyDeserializer.configure(config.originals(), true);
+ } else {
+ this.keyDeserializer = keyDeserializer;
+ config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ }
+
+ if (valueDeserializer == null) {
+ this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ Deserializer.class);
+ this.valueDeserializer.configure(config.originals(), true);
+ } else {
+ this.valueDeserializer = valueDeserializer;
+ config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ }
+
+ groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
+ isAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+
+ String serviceUrl = config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
+
+ Properties properties = new Properties();
+ config.originals().forEach((k, v) -> properties.put(k, v));
+ ClientConfiguration clientConf = PulsarKafkaConfig.getClientConfiguration(properties);
+ try {
+ client = PulsarClient.create(serviceUrl, clientConf);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void received(org.apache.pulsar.client.api.Consumer consumer, Message msg) {
+ // Block listener thread if the application is slowing down
+ try {
+ receivedMessages.put(new QueueItem(consumer, msg));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Set assignment() {
+ throw new UnsupportedOperationException("Cannot access the partitions assignements");
+ }
+
+ /**
+ * Get the current subscription. Will return the same topics used in the most recent call to
+ * {@link #subscribe(Collection, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
+ *
+ * @return The set of topics currently subscribed to
+ */
+ @Override
+ public Set subscription() {
+ return consumers.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ }
+
+ @Override
+ public void subscribe(Collection topics) {
+ List> futures = new ArrayList<>();
+ try {
+ for (String topic : topics) {
+ // Create individual subscription on each partition, that way we can keep using the
+ // acknowledgeCumulative()
+ PartitionedTopicMetadata partitionMetadata = ((PulsarClientImpl) client)
+ .getPartitionedTopicMetadata(topic).get();
+
+ ConsumerConfiguration conf = new ConsumerConfiguration();
+ conf.setSubscriptionType(SubscriptionType.Failover);
+ conf.setMessageListener(this);
+ if (partitionMetadata.partitions > 1) {
+ // Subscribe to each partition
+ conf.setConsumerName(ConsumerName.generateRandomName());
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ String partitionName = DestinationName.get(topic).getPartition(i).toString();
+ CompletableFuture future = client
+ .subscribeAsync(partitionName, groupId, conf);
+ int partitionIndex = i;
+ future.thenAccept(
+ consumer -> consumers.putIfAbsent(new TopicPartition(topic, partitionIndex), consumer));
+ futures.add(future);
+ }
+
+ } else {
+ // Topic has a single partition
+ CompletableFuture future = client.subscribeAsync(topic,
+ groupId, conf);
+ future.thenAccept(consumer -> consumers.putIfAbsent(new TopicPartition(topic, 0), consumer));
+ futures.add(future);
+ }
+ }
+
+ // Wait for all consumers to be ready
+ futures.forEach(CompletableFuture::join);
+
+ } catch (Exception e) {
+ // Close all consumer that might have been sucessfully created
+ futures.forEach(f -> {
+ try {
+ f.get().close();
+ } catch (Exception e1) {
+ // Ignore. Consumer already had failed
+ }
+ });
+
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void subscribe(Collection topics, ConsumerRebalanceListener callback) {
+ throw new UnsupportedOperationException("ConsumerRebalanceListener is not supported");
+ }
+
+ @Override
+ public void assign(Collection partitions) {
+ throw new UnsupportedOperationException("Cannot manually assign partitions");
+ }
+
+ @Override
+ public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+ throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
+ }
+
+ @Override
+ public void unsubscribe() {
+ consumers.values().forEach(c -> {
+ try {
+ c.unsubscribe();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ConsumerRecords poll(long timeoutMillis) {
+ try {
+ QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (item == null) {
+ return (ConsumerRecords) ConsumerRecords.EMPTY;
+ }
+
+ if (isAutoCommit) {
+ // Commit the offset of previously dequeued messages
+ commitAsync();
+ }
+
+ DestinationName dn = DestinationName.get(item.consumer.getTopic());
+ String topic = dn.getPartitionedTopicName();
+ int partition = dn.isPartitioned() ? dn.getPartitionIndex() : 0;
+ Message msg = item.message;
+ MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+ long offset = MessageIdUtils.getOffset(msgId);
+
+ TopicPartition tp = new TopicPartition(topic, partition);
+
+ K key = getKey(topic, msg);
+ V value = valueDeserializer.deserialize(topic, msg.getData());
+
+ TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
+ long timestamp = msg.getPublishTime();
+
+ if (msg.getEventTime() > 0) {
+ // If we have Event time, use that in preference
+ timestamp = msg.getEventTime();
+ timestampType = TimestampType.CREATE_TIME;
+ }
+
+ ConsumerRecord consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp,
+ timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);
+
+ Map>> records = new HashMap<>();
+ records.put(tp, Lists.newArrayList(consumerRecord));
+
+ // Update last offset seen by application
+ lastReceivedOffset.put(tp, offset);
+ return new ConsumerRecords<>(records);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private K getKey(String topic, Message msg) {
+ if (!msg.hasKey()) {
+ return null;
+ }
+
+ if (keyDeserializer instanceof StringDeserializer) {
+ return (K) msg.getKey();
+ } else {
+ // Assume base64 encoding
+ byte[] data = Base64.getDecoder().decode(msg.getKey());
+ return keyDeserializer.deserialize(topic, data);
+ }
+ }
+
+ @Override
+ public void commitSync() {
+ try {
+ doCommitOffsets(getCurrentOffsetsMap()).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void commitSync(Map offsets) {
+ try {
+ doCommitOffsets(offsets).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void commitAsync() {
+ doCommitOffsets(getCurrentOffsetsMap());
+ }
+
+ @Override
+ public void commitAsync(OffsetCommitCallback callback) {
+ Map offsets = getCurrentOffsetsMap();
+ doCommitOffsets(offsets).handle((v, throwable) -> {
+ callback.onComplete(offsets, throwable != null ? new Exception(throwable) : null);
+ return null;
+ });
+ }
+
+ @Override
+ public void commitAsync(Map offsets, OffsetCommitCallback callback) {
+ doCommitOffsets(offsets).handle((v, throwable) -> {
+ callback.onComplete(offsets, throwable != null ? new Exception(throwable) : null);
+ return null;
+ });
+ }
+
+ private CompletableFuture doCommitOffsets(Map offsets) {
+ List> futures = new ArrayList<>();
+
+ offsets.forEach((topicPartition, offsetAndMetadata) -> {
+ org.apache.pulsar.client.api.Consumer consumer = consumers.get(topicPartition);
+
+ lastCommittedOffset.put(topicPartition, offsetAndMetadata);
+ futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
+ });
+
+ return FutureUtil.waitForAll(futures);
+ }
+
+ private Map getCurrentOffsetsMap() {
+ Map offsets = new HashMap<>();
+ lastReceivedOffset.forEach((topicPartition, offset) -> {
+ OffsetAndMetadata om = new OffsetAndMetadata(offset);
+ offsets.put(topicPartition, om);
+ });
+
+ return offsets;
+ }
+
+ @Override
+ public void seek(TopicPartition partition, long offset) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void seekToBeginning(Collection partitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void seekToEnd(Collection partitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long position(TopicPartition partition) {
+ return lastReceivedOffset.get(partition);
+ }
+
+ @Override
+ public OffsetAndMetadata committed(TopicPartition partition) {
+ return lastCommittedOffset.get(partition);
+ }
+
+ @Override
+ public Map metrics() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List partitionsFor(String topic) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map> listTopics() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set paused() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void pause(Collection partitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void resume(Collection partitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map offsetsForTimes(Map timestampsToSearch) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map beginningOffsets(Collection partitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map endOffsets(Collection partitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close(long timeout, TimeUnit unit) {
+ try {
+ if (isAutoCommit) {
+ commitAsync();
+ }
+ client.closeAsync().get(timeout, unit);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void wakeup() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
new file mode 100644
index 0000000000000..5d0da1c60c490
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig;
+
+public class PulsarKafkaProducer implements Producer {
+
+ private final PulsarClient client;
+ private final ProducerConfiguration pulsarProducerConf;
+
+ private final ConcurrentMap producers = new ConcurrentHashMap<>();
+
+ private final Serializer keySerializer;
+ private final Serializer valueSerializer;
+
+ /** Map that contains the last future for each producer */
+ private final ConcurrentMap> lastSendFuture = new ConcurrentHashMap<>();
+
+ public PulsarKafkaProducer(Map configs) {
+ this(configs, null, null);
+ }
+
+ public PulsarKafkaProducer(Map configs, Serializer keySerializer,
+ Serializer valueSerializer) {
+ this(configs, new Properties(), keySerializer, valueSerializer);
+ }
+
+ public PulsarKafkaProducer(Properties properties) {
+ this(properties, null, null);
+ }
+
+ public PulsarKafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer) {
+ this(new HashMap<>(), properties, keySerializer, valueSerializer);
+ }
+
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ private PulsarKafkaProducer(Map conf, Properties properties, Serializer keySerializer,
+ Serializer valueSerializer) {
+ properties.forEach((k, v) -> conf.put((String) k, v));
+
+ ProducerConfig producerConfig = new ProducerConfig(conf);
+
+ if (keySerializer == null) {
+ this.keySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ Serializer.class);
+ this.keySerializer.configure(producerConfig.originals(), true);
+ } else {
+ this.keySerializer = keySerializer;
+ producerConfig.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ }
+
+ if (valueSerializer == null) {
+ this.valueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ Serializer.class);
+ this.valueSerializer.configure(producerConfig.originals(), true);
+ } else {
+ this.valueSerializer = valueSerializer;
+ producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ }
+
+ String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
+ ClientConfiguration clientConf = PulsarKafkaConfig.getClientConfiguration(properties);
+ try {
+ client = PulsarClient.create(serviceUrl, clientConf);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+
+ pulsarProducerConf = new ProducerConfiguration();
+ pulsarProducerConf.setBatchingEnabled(true);
+
+ // To mimic the same batching mode as Kafka, we need to wait a very little amount of
+ // time to batch if the client is trying to send messages fast enough
+ long lingerMs = Long.parseLong(properties.getProperty(ProducerConfig.LINGER_MS_CONFIG, "1"));
+ pulsarProducerConf.setBatchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
+
+ String compressionType = properties.getProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG);
+ if ("gzip".equals(compressionType)) {
+ pulsarProducerConf.setCompressionType(CompressionType.ZLIB);
+ } else if ("lz4".equals(compressionType)) {
+ pulsarProducerConf.setCompressionType(CompressionType.LZ4);
+ }
+
+ pulsarProducerConf.setBlockIfQueueFull(
+ Boolean.parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")));
+ }
+
+ @Override
+ public Future send(ProducerRecord record) {
+ return send(record, null);
+ }
+
+ @Override
+ public Future send(ProducerRecord record, Callback callback) {
+ org.apache.pulsar.client.api.Producer producer;
+
+ try {
+ producer = producers.computeIfAbsent(record.topic(), topic -> createNewProducer(topic));
+ } catch (Exception e) {
+ callback.onCompletion(null, e);
+ CompletableFuture future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+
+ Message msg = getMessage(record);
+
+ CompletableFuture future = new CompletableFuture<>();
+ CompletableFuture sendFuture = producer.sendAsync(msg);
+ lastSendFuture.put(record.topic(), sendFuture);
+
+ sendFuture.thenAccept((messageId) -> {
+ future.complete(getRecordMetadata(record.topic(), msg, messageId));
+ }).exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
+
+ future.handle((recordMetadata, exception) -> {
+ callback.onCompletion(recordMetadata, new Exception(exception));
+ return null;
+ });
+
+ return future;
+ }
+
+ @Override
+ public void flush() {
+ lastSendFuture.forEach((topic, future) -> {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Remove the futures to remove eventually failed operations in order to trigger errors only once
+ lastSendFuture.remove(topic, future);
+ });
+ }
+
+ @Override
+ public List partitionsFor(String topic) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map metrics() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void close() {
+ close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close(long timeout, TimeUnit unit) {
+ try {
+ client.closeAsync().get(timeout, unit);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private org.apache.pulsar.client.api.Producer createNewProducer(String topic) {
+ try {
+ return client.createProducer(topic);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Message getMessage(ProducerRecord record) {
+ if (record.partition() != null) {
+ throw new UnsupportedOperationException("");
+ }
+
+ MessageBuilder builder = MessageBuilder.create();
+ if (record.key() != null) {
+ builder.setKey(getKey(record.topic(), record.key()));
+ }
+ if (record.timestamp() != null) {
+ builder.setEventTime(record.timestamp());
+ }
+ builder.setContent(valueSerializer.serialize(record.topic(), record.value()));
+ return builder.build();
+ }
+
+ private String getKey(String topic, K key) {
+ // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+ if (keySerializer instanceof StringSerializer) {
+ return (String) key;
+ } else {
+ byte[] keyBytes = keySerializer.serialize(topic, key);
+ return Base64.getEncoder().encodeToString(keyBytes);
+ }
+ }
+
+ private RecordMetadata getRecordMetadata(String topic, Message msg, MessageId messageId) {
+ MessageIdImpl msgId = (MessageIdImpl) messageId;
+
+ // Combine ledger id and entry id to form offset
+ long offset = MessageIdUtils.getOffset(msgId);
+ int partition = msgId.getPartitionIndex();
+
+ TopicPartition tp = new TopicPartition(topic, partition);
+
+ return new RecordMetadata(tp, offset, 0, msg.getPublishTime(), 0, msg.hasKey() ? msg.getKey().length() : 0,
+ msg.getData().length);
+
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
new file mode 100644
index 0000000000000..d8f76805b7d5b
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+public class MessageIdUtils {
+ public static final long getOffset(MessageId messageId) {
+ MessageIdImpl msgId = (MessageIdImpl) messageId;
+ long ledgerId = msgId.getLedgerId();
+ long entryId = msgId.getEntryId();
+
+ // Combine ledger id and entry id to form offset
+ // Use less than 32 bits to represent entry id since it will get
+ // rolled over way before overflowing the max int range
+ long offset = (ledgerId << 28) | entryId;
+ return offset;
+ }
+
+ public static final MessageId getMessageId(long offset) {
+ // Demultiplex ledgerId and entryId from offset
+ long ledgerId = offset >>> 28;
+ long entryId = offset & 0x0F_FF_FF_FFL;
+
+ return new MessageIdImpl(ledgerId, entryId, -1);
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaConfig.java
new file mode 100644
index 0000000000000..2396bac83c6a4
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaConfig.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Properties;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientConfiguration;
+
+public class PulsarKafkaConfig {
+
+ /// Config variables
+ public static final String AUTHENTICATION_CLASS = "pulsar.authentication.class";
+ public static final String USE_TLS = "pulsar.use.tls";
+ public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path";
+ public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection";
+
+ public static ClientConfiguration getClientConfiguration(Properties properties) {
+ ClientConfiguration conf = new ClientConfiguration();
+
+ if (properties.containsKey(AUTHENTICATION_CLASS)) {
+ String className = properties.getProperty(AUTHENTICATION_CLASS);
+ try {
+ @SuppressWarnings("unchecked")
+ Class clazz = (Class) Class.forName(className);
+ Authentication auth = clazz.newInstance();
+ conf.setAuthentication(auth);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ conf.setUseTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
+ conf.setUseTls(Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+ if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
+ conf.setTlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+ }
+
+ return conf;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
new file mode 100644
index 0000000000000..0766ea5357a9b
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.tests;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.PulsarKafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class KafkaConsumerTest extends BrokerTestBase {
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testSimpleConsumer() throws Exception {
+ String topic = "persistent://sample/standalone/ns/testSimpleConsumer";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokerUrl.toString());
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ Consumer consumer = new PulsarKafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+
+ Producer pulsarProducer = pulsarClient.createProducer(topic);
+
+ for (int i = 0; i < 10; i++) {
+ Message msg = MessageBuilder.create().setKey(Integer.toString(i)).setContent(("hello-" + i).getBytes())
+ .build();
+ pulsarProducer.send(msg);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ ConsumerRecords records = consumer.poll(100);
+ assertEquals(records.count(), 1);
+ int idx = i;
+ records.forEach(record -> {
+ assertEquals(record.key(), Integer.toString(idx));
+ assertEquals(record.value(), "hello-" + idx);
+ });
+
+ consumer.commitSync();
+ }
+
+ consumer.close();
+ }
+
+ @Test
+ public void testConsumerAutoCommit() throws Exception {
+ String topic = "persistent://sample/standalone/ns/testConsumerAutoCommit";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokerUrl.toString());
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "true");
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ Consumer consumer = new PulsarKafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+
+ Producer pulsarProducer = pulsarClient.createProducer(topic);
+
+ for (int i = 0; i < 10; i++) {
+ Message msg = MessageBuilder.create().setKey(Integer.toString(i)).setContent(("hello-" + i).getBytes())
+ .build();
+ pulsarProducer.send(msg);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ ConsumerRecords records = consumer.poll(100);
+ assertEquals(records.count(), 1);
+ int idx = i;
+ records.forEach(record -> {
+ assertEquals(record.key(), Integer.toString(idx));
+ assertEquals(record.value(), "hello-" + idx);
+ });
+ }
+
+ consumer.close();
+
+ // Re-open consumer and verify every message was acknowledged
+ Consumer consumer2 = new PulsarKafkaConsumer<>(props);
+ consumer2.subscribe(Arrays.asList(topic));
+
+ ConsumerRecords records = consumer2.poll(100);
+ assertEquals(records.count(), 0);
+ consumer2.close();
+ }
+
+ @Test
+ public void testConsumerManualOffsetCommit() throws Exception {
+ String topic = "persistent://sample/standalone/ns/testConsumerManualOffsetCommit";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokerUrl.toString());
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ Consumer consumer = new PulsarKafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+
+ Producer pulsarProducer = pulsarClient.createProducer(topic);
+
+ for (int i = 0; i < 10; i++) {
+ Message msg = MessageBuilder.create().setKey(Integer.toString(i)).setContent(("hello-" + i).getBytes())
+ .build();
+ pulsarProducer.send(msg);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ ConsumerRecords records = consumer.poll(100);
+ assertEquals(records.count(), 1);
+ int idx = i;
+ records.forEach(record -> {
+ assertEquals(record.key(), Integer.toString(idx));
+ assertEquals(record.value(), "hello-" + idx);
+
+ Map offsets = new HashMap<>();
+ offsets.put(new TopicPartition(record.topic(), record.partition()),
+ new OffsetAndMetadata(record.offset()));
+ consumer.commitSync(offsets);
+ });
+ }
+
+ consumer.close();
+
+ // Re-open consumer and verify every message was acknowledged
+ Consumer consumer2 = new PulsarKafkaConsumer<>(props);
+ consumer2.subscribe(Arrays.asList(topic));
+
+ ConsumerRecords records = consumer2.poll(100);
+ assertEquals(records.count(), 0);
+ consumer2.close();
+ }
+
+ @Test
+ public void testPartitions() throws Exception {
+ String topic = "persistent://sample/standalone/ns/testPartitions";
+
+ // Create 8 partitions in topic
+ admin.persistentTopics().createPartitionedTopic(topic, 8);
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokerUrl.toString());
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "true");
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ ProducerConfiguration conf = new ProducerConfiguration();
+ conf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+ Producer pulsarProducer = pulsarClient.createProducer(topic);
+
+ // Create 2 Kakfa consumer and verify each gets half of the messages
+ List> consumers = new ArrayList<>();
+ for (int c = 0; c < 2; c++) {
+ Consumer consumer = new PulsarKafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+ consumers.add(consumer);
+ }
+
+ int N = 8 * 3;
+
+ for (int i = 0; i < N; i++) {
+ Message msg = MessageBuilder.create().setKey(Integer.toString(i)).setContent(("hello-" + i).getBytes())
+ .build();
+ pulsarProducer.send(msg);
+ }
+
+ consumers.forEach(consumer -> {
+ int expectedMessaged = N / consumers.size();
+ for (int i = 0; i < expectedMessaged; i++) {
+ ConsumerRecords records = consumer.poll(100);
+ assertEquals(records.count(), 1);
+ }
+
+ // No more messages for this consumer
+ ConsumerRecords records = consumer.poll(100);
+ assertEquals(records.count(), 0);
+ });
+
+ consumers.forEach(Consumer::close);
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaProducerTest.java
new file mode 100644
index 0000000000000..f15624da10275
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaProducerTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.tests;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.PulsarKafkaProducer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class KafkaProducerTest extends BrokerTestBase {
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testSimpleProducer() throws Exception {
+ String topic = "persistent://sample/standalone/ns/testSimpleProducer";
+
+ Consumer pulsarConsumer = pulsarClient.subscribe(topic, "my-subscription");
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokerUrl.toString());
+
+ props.put("key.serializer", IntegerSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ Producer producer = new PulsarKafkaProducer<>(props);
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord(topic, i, "hello-" + i));
+ }
+
+ producer.flush();
+ producer.close();
+
+ for (int i = 0; i < 10; i++) {
+ Message msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
+ assertEquals(new String(msg.getData()), "hello-" + i);
+ pulsarConsumer.acknowledge(msg);
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 0e09d4f368a56..5f17cb31074b8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -18,9 +18,7 @@
*/
package org.apache.pulsar.client.impl;
-import java.util.List;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -28,8 +26,14 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -59,8 +63,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, String subscriptio
this.maxReceiverQueueSize = receiverQueueSize;
this.subscription = subscription;
this.conf = conf;
- this.consumerName = conf.getConsumerName() == null
- ? DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 5) : conf.getConsumerName();
+ this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
this.subscribeFuture = subscribeFuture;
this.listener = conf.getMessageListener();
if (receiverQueueSize <= 1) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fc488b3292a0d..525c5dc0f3989 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -30,6 +30,7 @@
import java.util.BitSet;
import java.util.List;
import java.util.NavigableMap;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -1173,6 +1174,11 @@ public boolean hasReachedEndOfTopic() {
return hasReachedEndOfTopic;
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, subscription, consumerName);
+ }
+
private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 9b6f7ed353609..f32e66886bf6b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -59,7 +59,7 @@ public long getEntryId() {
return entryId;
}
- int getPartitionIndex() {
+ public int getPartitionIndex() {
return partitionIndex;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 33791d46ed0c5..5f1c0d32ae838 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -450,7 +450,7 @@ EventLoopGroup eventLoopGroup() {
return eventLoopGroup;
}
- private CompletableFuture getPartitionedTopicMetadata(String topic) {
+ public CompletableFuture getPartitionedTopicMetadata(String topic) {
CompletableFuture metadataFuture;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ConsumerName.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ConsumerName.java
new file mode 100644
index 0000000000000..b5ef45be76110
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ConsumerName.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.UUID;
+
+import org.apache.commons.codec.digest.DigestUtils;
+
+public class ConsumerName {
+ public static String generateRandomName() {
+ return DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 5);
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/DestinationName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/DestinationName.java
index 28b1408e1e2ba..69083c584355c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/DestinationName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/DestinationName.java
@@ -172,7 +172,7 @@ public String getEncodedLocalName() {
}
public DestinationName getPartition(int index) {
- if (this.toString().contains(PARTITIONED_TOPIC_SUFFIX)) {
+ if (index == -1 || this.toString().contains(PARTITIONED_TOPIC_SUFFIX)) {
return this;
}
String partitionName = this.toString() + PARTITIONED_TOPIC_SUFFIX + index;
@@ -186,6 +186,26 @@ public int getPartitionIndex() {
return partitionIndex;
}
+ public boolean isPartitioned() {
+ return partitionIndex != -1;
+ }
+
+ /**
+ * For partitions in a topic, return the base partitioned topic name
+ * Eg:
+ *
+ * persistent://prop/cluster/ns/my-topic-partition-1
--> persistent://prop/cluster/ns/my-topic
+ * persistent://prop/cluster/ns/my-topic
--> persistent://prop/cluster/ns/my-topic
+ *
+ */
+ public String getPartitionedTopicName() {
+ if (isPartitioned()) {
+ return destination.substring(0, destination.lastIndexOf("-partition-"));
+ } else {
+ return destination;
+ }
+ }
+
/**
* @return partition index of the destination. It returns -1 if the destination (topic) is not partitioned.
*/