-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix Intermittent test failure #280
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
merlimat
reviewed
Mar 6, 2017
@@ -265,6 +272,15 @@ public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exceptio | |||
pulsarClient.close(); | |||
} | |||
|
|||
private boolean isAllConsumersConnected(List<Consumer> consumers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
areAllConsumersConnected
merlimat
approved these changes
Mar 7, 2017
hrsakai
pushed a commit
to hrsakai/pulsar
that referenced
this pull request
Dec 10, 2020
Previously, the producer maximum batch size was hard-coded to 128 KB. Now, the produdcer maximum batch size is exposed via ProducerOptions and defaults to 128 KB Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
hangc0276
pushed a commit
to hangc0276/pulsar
that referenced
this pull request
May 26, 2021
…uf (apache#280) Currently, KoP has some significant performance overhead in the conversion between Kafka's records and Pulsar's messages. For pure Kafka users, i.e. no Pulsar client is involved, the conversion is unnecessary. So this PR provides a config `entry.format` to specify if the entry's format can be accepted by Pulsar clients. And a `EntryFormatter` is used to do the format conversion instead of static methods of `MessageRecordUtils`. In future, we'll implement a new `EntryFormatter` to avoid this conversion so that the performance can be improved a lot. After that, users can choose whether to support Pulsar clients for KoP.
hangc0276
pushed a commit
to hangc0276/pulsar
that referenced
this pull request
May 26, 2021
It's a follow PR of [apache#280](streamnative/kop#280). This PR adds an optional value to `entry.format` config so that user can improve the performance because there's little entry conversion between `MemoryRecords` and entry's `ByteBuf`, while the cost is that Kafka client won't be compatible with Pulsar client. Because the entry format of `kafka` still adds a `MessageMetadata` header to record the `entry.format`, it will be possible for Pulsar broker/client to decode these messages later. This PR adds a `KafkaEntryFormatter` class associated to `entry.format=kafka` and apply this config to existed tests that have relation to Kafka client but not Pulsar client. An exception is `SaslPlainTest`, it will fail when multiple test instances are created because `LOOKUP_CACHE` is a static variable, the `findBroker` call of other test instances may be influenced. So we create two separated derived classes for test. Finally this PR adds a performance test that can be run to see the time cost of different `EntryFormatter`s, a simple run result is: ``` --- fixed records for 1000 times --- PulsarEntryFormatter encode time: 7287 ms KafkaEntryFormatter encode time: 79 ms NoHeaderKafkaEntryFormatter encode time: 0 ms --- random records for 1000 times --- PulsarEntryFormatter encode time: 6947 ms KafkaEntryFormatter encode time: 78 ms NoHeaderKafkaEntryFormatter encode time: 1 ms ``` The default batch is 2048 messages and each message contains 1024 bytes. We can see the behavior of `KafkaEntryFormatter` has some overhead caused by adding header and copying the original bytes from `MemoryRecords`. However it's not significant much. But `PulsarEntryFormatter` takes about 7 ms for each batch, which may be not acceptable. Another performance test that compares `PulsarEntryFormatter` and `KafkaEntryFormatter` is done by running KoP with Pulsar 2.8.0-SNAPSHOT (commit 11b9359) standalone on my LapTop. ```bash $ ./bin/kafka-producer-perf-test.sh --topic my-topic --producer-props bootstrap.servers=127.0.0.1:9092 \ --num-records 100000000 --record-size 128 --throughput -1 ``` Sampling from 5 lines from the output after the throughput/latency are stable, and calculating the average value. The result is: | entryFormat | Throughput (MB/sec) | Avg. Latency (ms) | Max Latency (ms) | | - | - | - | - | | pulsar | 39.768 | 740.74 | 802.8 | | kafka | 44.212 | 667.98 | 756.6 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
Fix: #279 Intermittent test failure in BrokerServiceThrottlingTest.testLookupThrottlingForClientByBrokerInternalRetry
Modifications
Wait strategically for consumers to get reconnect with broker