-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka client Java API wrapper #753
Conversation
pulsar-client-kafka-compat/pom.xml
Outdated
</excludes> | ||
</filter> | ||
</filters> | ||
<relocations> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just out of curiosity - that means the PulsarKafkaProducer needs to have exactly same constructors as the original KafkaProducer, and same applied for consumer, right? How hard is to keep such compatibility going forward?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need to use the same constructor as the original class.
I think we can maintain compatibility up to a specific kafka-clients
version. That should be noted in the documentation. So far, I haven't seen differences in the constructors from 0.9 to 0.11 versions.
In case Kafka adds a new constructor in next releases, the client app would still work unless it tries to use the constructor form.
} | ||
|
||
@Override | ||
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
callback is not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
} | ||
|
||
@Override | ||
public void flush() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the flush semantic here is a bit problematic. the flush might potentially wait for records sent after the flush call. if you have a high send rate, this might be potentially spinning for a long time.
if pulsar producer has the ordering property on producing message, you might be able to just keep track the last outstanding send and wait for its callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We can just keep a hashmap with the future associated with the last send operation.
Then, in the flush()
we can iterate over a snapshot of that map.
|
||
@Override | ||
public List<PartitionInfo> partitionsFor(String topic) { | ||
return Collections.emptyList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to implement this in future? add an issue and todo item here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can easily get the partition list info, though most of the other things wont apply here:
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
...
I think we should just throw UnsupportedOperationException
here for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah sounds good to me
try { | ||
outstandingWrites.wait(); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush() is expecting an interrupted exception. so you can just re-throw e?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signature for flush()
doesn't have any checked exceptions :
/**
* Flush any accumulated records from the producer. Blocks until all sends are complete.
*/
public void flush();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see. the kafka producer was written in scala, so it is able to throw InterruptedException without declare it in the method signature.
V value = valueDeserializer.deserialize(topic, msg.getData()); | ||
|
||
ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, msg.getPublishTime(), | ||
TimestampType.CREATE_TIME, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can support EVENT_TIME as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, fixed that on both producer and consumer, to pass and retrieve the event time.
@Override | ||
public ConsumerRecords<K, V> poll(long timeoutMillis) { | ||
try { | ||
QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be improved in future: you can pull all the data in the buffer, and only wait if there is no data in the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is the current behavior on the blocking queue.
records.put(tp, Lists.newArrayList(consumerRecord)); | ||
|
||
// Update last message id seen by application | ||
lastMessageId.put(item.consumer, msgId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if broker redelivers the messages, how does it impact here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the lastMessageId
will rollback as well, and the next commit()
operation will acknowledge cumulative on that earlier message id.
If it was already acknowledged, it will become a no-op in the broker.
} | ||
|
||
@Override | ||
public void commitSync() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can commitSync be a shortcut of commitAsync and wait for callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to refactor it. as it is commitAsync()
doesn't return or take a callback, I'll add a private method and use it from the API methods.
|
||
@Override | ||
public long position(TopicPartition partition) { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be implemented, no? you kept the lastMessageId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it can be implemented, also the committed()
method as well. I just need to change the map, since the key is currently a Consumer
instance.
Addressed comments. Still few more unit tests to add. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1 well done
Updated. I have added separate module to verify the shaded library within tests. @rdhabalia Please take a look. |
retest this please |
throw new RuntimeException(e); | ||
} | ||
|
||
pulsarProducerConf = new ProducerConfiguration(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about setting compression.type
? Pulsar and Kafka both support lz4 and gzip
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, we can easily do that.
|
||
pulsarProducerConf = new ProducerConfiguration(); | ||
pulsarProducerConf.setBatchingEnabled(true); | ||
pulsarProducerConf.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead default 1 how about getting property linger.ms
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantic of linger.ms
is a bit different, in the sense that 0 doesn't mean "no batching" in Kafka.
We could use the linger.ms
value if provided, otherwise use a very low setting by default, eg. 500us
|
||
private Message getMessage(ProducerRecord<K, V> record) { | ||
if (record.partition() != null) { | ||
throw new UnsupportedOperationException(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean this doesn't support kafka topic with partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's just that you cannot manually specify which partition to publish to. That would require some gimnastic to support. We could do it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.. I saw PulsarKafkaConsumer
where we create individual Consumer
for each partitioned. Similar way, here we could have created separate producer for each partition and then send message using key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would be an option as well. I first did the producer part and used the regurlar partitioned producer thing, though it might make sense to deal with the partitions manuallly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to do manual partitions management in the Producer end, but it gets complicated to deal with the Kafka Partitioner
interface. The trick is that relies on a Cluster
view which is overly Kafka specific.
I'll leave it for later if there is a need to support these features too.
// Combine ledger id and entry id to form offset | ||
// Use less than 32 bits to represent entry id since it will get | ||
// rolled over way before overflowing the max int range | ||
long offset = (ledgerId << 28) | entryId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now, as we have started using 64 bit ledgerId in bk so, ledgerId > 2^ 36 will give incorrect value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, though 2^36 is 68 Billions of ledgers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.. just few minor comments..
@rdhabalia updated |
|
||
// Wait for all consumers to be ready | ||
for (int i = 0; i < partitionMetadata.partitions; i++) { | ||
consumers.putIfAbsent(new TopicPartition(topic, i), futures.get(i).get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if one of the partitioned consumer fails to subscribe?
eg: out of c1,c2,c3 -> c2 fails so, do we have to close c1 and c3? and should we remove c1 from consumers
because we already added c1?
retest this please |
|
||
// Wait for all consumers to be ready | ||
for (int i = 0; i < partitionMetadata.partitions; i++) { | ||
consumers.putIfAbsent(new TopicPartition(topic, i), futures.get(i).get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think previous comment is not showing up so, adding here:
what if one of the partitioned consumer fails to subscribe?
eg: out of c1,c2,c3 -> c2 fails so, do we have to close c1 and c3? and should we remove c1 from consumers
because we already added c1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, let me refactor that to make sure we clean all intermediated consumers
@rdhabalia Addressed last comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
retest this please |
For some reason the jenkins build is not getting triggered again. I'll create a new PR. |
Motivation
Add a Kafka API wrapper based on Pulsar client library.
This will allow existing application using the Kafka client library to publish or suscbribe to a Pulsar topic without any code change.
This first iteration is targeting the Kafka high-level consumer with managed offsets, with or without auto-commit.
Examples and documentation on the website will follow this PR.
Modifications
Producer
andConsumer
interfaces that internally use Pulsar client libraryKafkaProducer
class withPulsarKafkaProducer
in the jarResult
The
org.apache.pulsar:pulsar-client-kafka-compact
artifact will be a drop-in replacement fororg.apache.kafka:kafka-clients
.