This repository has been archived by the owner on Jan 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 137
Add entry formatter to do conversions between MemoryRecords and ByteBuf #280
Merged
BewareMyPower
merged 7 commits into
streamnative:master
from
BewareMyPower:bewaremypower/config-entry-format
Dec 24, 2020
Merged
Add entry formatter to do conversions between MemoryRecords and ByteBuf #280
BewareMyPower
merged 7 commits into
streamnative:master
from
BewareMyPower:bewaremypower/config-entry-format
Dec 24, 2020
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
BewareMyPower
force-pushed
the
bewaremypower/config-entry-format
branch
from
December 22, 2020 02:50
0780706
to
bf82cdf
Compare
BewareMyPower
force-pushed
the
bewaremypower/config-entry-format
branch
from
December 22, 2020 10:05
f6cd48b
to
0b6833b
Compare
BewareMyPower
changed the title
[WIP] Add entry formatter to do conversions between MemoryRecords and ByteBuf
Add entry formatter to do conversions between MemoryRecords and ByteBuf
Dec 22, 2020
Since this PR already includes a lot of changes and the existed tests need refactors as well to apply the new config, I'll open a new PR for the |
BewareMyPower
force-pushed
the
bewaremypower/config-entry-format
branch
from
December 23, 2020 08:41
a921597
to
bcac5b8
Compare
codelipenghui
approved these changes
Dec 24, 2020
This was referenced Dec 24, 2020
Merged
jiazhai
pushed a commit
that referenced
this pull request
Dec 27, 2020
It's a follow PR of [#280](#280). This PR adds an optional value to `entry.format` config so that user can improve the performance because there's little entry conversion between `MemoryRecords` and entry's `ByteBuf`, while the cost is that Kafka client won't be compatible with Pulsar client. Because the entry format of `kafka` still adds a `MessageMetadata` header to record the `entry.format`, it will be possible for Pulsar broker/client to decode these messages later. This PR adds a `KafkaEntryFormatter` class associated to `entry.format=kafka` and apply this config to existed tests that have relation to Kafka client but not Pulsar client. An exception is `SaslPlainTest`, it will fail when multiple test instances are created because `LOOKUP_CACHE` is a static variable, the `findBroker` call of other test instances may be influenced. So we create two separated derived classes for test. Finally this PR adds a performance test that can be run to see the time cost of different `EntryFormatter`s, a simple run result is: ``` --- fixed records for 1000 times --- PulsarEntryFormatter encode time: 7287 ms KafkaEntryFormatter encode time: 79 ms NoHeaderKafkaEntryFormatter encode time: 0 ms --- random records for 1000 times --- PulsarEntryFormatter encode time: 6947 ms KafkaEntryFormatter encode time: 78 ms NoHeaderKafkaEntryFormatter encode time: 1 ms ``` The default batch is 2048 messages and each message contains 1024 bytes. We can see the behavior of `KafkaEntryFormatter` has some overhead caused by adding header and copying the original bytes from `MemoryRecords`. However it's not significant much. But `PulsarEntryFormatter` takes about 7 ms for each batch, which may be not acceptable. Another performance test that compares `PulsarEntryFormatter` and `KafkaEntryFormatter` is done by running KoP with Pulsar 2.8.0-SNAPSHOT (commit 11b9359) standalone on my LapTop. ```bash $ ./bin/kafka-producer-perf-test.sh --topic my-topic --producer-props bootstrap.servers=127.0.0.1:9092 \ --num-records 100000000 --record-size 128 --throughput -1 ``` Sampling from 5 lines from the output after the throughput/latency are stable, and calculating the average value. The result is: | entryFormat | Throughput (MB/sec) | Avg. Latency (ms) | Max Latency (ms) | | - | - | - | - | | pulsar | 39.768 | 740.74 | 802.8 | | kafka | 44.212 | 667.98 | 756.6 |
BewareMyPower
added a commit
that referenced
this pull request
Jan 14, 2021
…uf (#280) Currently, KoP has some significant performance overhead in the conversion between Kafka's records and Pulsar's messages. For pure Kafka users, i.e. no Pulsar client is involved, the conversion is unnecessary. So this PR provides a config `entry.format` to specify if the entry's format can be accepted by Pulsar clients. And a `EntryFormatter` is used to do the format conversion instead of static methods of `MessageRecordUtils`. In future, we'll implement a new `EntryFormatter` to avoid this conversion so that the performance can be improved a lot. After that, users can choose whether to support Pulsar clients for KoP.
BewareMyPower
added a commit
that referenced
this pull request
Jan 14, 2021
It's a follow PR of [#280](#280). This PR adds an optional value to `entry.format` config so that user can improve the performance because there's little entry conversion between `MemoryRecords` and entry's `ByteBuf`, while the cost is that Kafka client won't be compatible with Pulsar client. Because the entry format of `kafka` still adds a `MessageMetadata` header to record the `entry.format`, it will be possible for Pulsar broker/client to decode these messages later. This PR adds a `KafkaEntryFormatter` class associated to `entry.format=kafka` and apply this config to existed tests that have relation to Kafka client but not Pulsar client. An exception is `SaslPlainTest`, it will fail when multiple test instances are created because `LOOKUP_CACHE` is a static variable, the `findBroker` call of other test instances may be influenced. So we create two separated derived classes for test. Finally this PR adds a performance test that can be run to see the time cost of different `EntryFormatter`s, a simple run result is: ``` --- fixed records for 1000 times --- PulsarEntryFormatter encode time: 7287 ms KafkaEntryFormatter encode time: 79 ms NoHeaderKafkaEntryFormatter encode time: 0 ms --- random records for 1000 times --- PulsarEntryFormatter encode time: 6947 ms KafkaEntryFormatter encode time: 78 ms NoHeaderKafkaEntryFormatter encode time: 1 ms ``` The default batch is 2048 messages and each message contains 1024 bytes. We can see the behavior of `KafkaEntryFormatter` has some overhead caused by adding header and copying the original bytes from `MemoryRecords`. However it's not significant much. But `PulsarEntryFormatter` takes about 7 ms for each batch, which may be not acceptable. Another performance test that compares `PulsarEntryFormatter` and `KafkaEntryFormatter` is done by running KoP with Pulsar 2.8.0-SNAPSHOT (commit 11b9359) standalone on my LapTop. ```bash $ ./bin/kafka-producer-perf-test.sh --topic my-topic --producer-props bootstrap.servers=127.0.0.1:9092 \ --num-records 100000000 --record-size 128 --throughput -1 ``` Sampling from 5 lines from the output after the throughput/latency are stable, and calculating the average value. The result is: | entryFormat | Throughput (MB/sec) | Avg. Latency (ms) | Max Latency (ms) | | - | - | - | - | | pulsar | 39.768 | 740.74 | 802.8 | | kafka | 44.212 | 667.98 | 756.6 |
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Currently, KoP has some significant performance overhead in the conversion between Kafka's records and Pulsar's messages. For pure Kafka users, i.e. no Pulsar client is involved, the conversion is unnecessary.
So this PR provides a config
entry.format
to specify if the entry's format can be accepted by Pulsar clients. And aEntryFormatter
is used to do the format conversion instead of static methods ofMessageRecordUtils
.In future, we'll implement a new
EntryFormatter
to avoid this conversion so that the performance can be improved a lot. After that, users can choose whether to support Pulsar clients for KoP.