-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] moving get sequenceId into the sync code segment #17836
[fix][client] moving get sequenceId into the sync code segment #17836
Conversation
### Motivation When the producer send message in the multiple thread, the message with the smaller sequenceId can be push later than the message with the bigger sequenceId. Then there will be exception thrown at persistentTopic::publishTxnMessage ### Modification Move getting sequenceId in the sync code.
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
Outdated
Show resolved
Hide resolved
The pr had no activity for 30 days, mark with Stale label. |
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
Outdated
Show resolved
Hide resolved
For your example, is this helpful to reproduce the issue you mentioned? producer.newMessage(transaction).value("msg1".getBytes()).sequenceId(1).send();
producer.newMessage(transaction).value("msg2".getBytes()).sequenceId(3).send();
producer.newMessage(transaction).value("msg3".getBytes()).sequenceId(2).send();
producer.newMessage(transaction).value("msg4".getBytes()).sequenceId(4).send(); |
No, it is a sync method. You can reproduce it using the test I provided. |
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
Outdated
Show resolved
Hide resolved
## Motivation 1. fix flaky test #18466 caused by txn async send method 2. decrease run time by optimizing receive method ## Modification 1. fix flaky test * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` This also can be resolved by #17836 and #18486 later. 2. decrease run time by optimizing receive method * modify ` Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNull(message);` to ` Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message);` * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);` * keep other `consumer.receive(x, y)` no change.
pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
Outdated
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## master #17836 +/- ##
============================================
+ Coverage 46.17% 49.47% +3.29%
+ Complexity 10359 8272 -2087
============================================
Files 703 468 -235
Lines 68845 52184 -16661
Branches 7382 5564 -1818
============================================
- Hits 31788 25817 -5971
+ Misses 33448 23509 -9939
+ Partials 3609 2858 -751
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use msgIdGenerator++
instead of msgIdGeneratorUpdater.getAndIncrement(this)
now, right?
## Motivation 1. fix flaky test apache#18466 caused by txn async send method 2. decrease run time by optimizing receive method ## Modification 1. fix flaky test * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` This also can be resolved by apache#17836 and apache#18486 later. 2. decrease run time by optimizing receive method * modify ` Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNull(message);` to ` Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message);` * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);` * keep other `consumer.receive(x, y)` no change.
…e#17836) ### Motivation When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id. The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null` https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409 And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`. https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490 https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560 For example: We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block. And then the msg3 with sequence ID 2 will never be persistent successfully. ### Modification Add a method to update `sequenceId` and move the method in the sync code. Via apache#16196 we should update message metadata before computing the message size.
…e#17836) ### Motivation When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id. The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null` https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409 And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`. https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490 https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560 For example: We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block. And then the msg3 with sequence ID 2 will never be persistent successfully. ### Modification Add a method to update `sequenceId` and move the method in the sync code. Via apache#16196 we should update message metadata before computing the message size.
## Motivation 1. fix flaky test apache#18466 caused by txn async send method 2. decrease run time by optimizing receive method ## Modification 1. fix flaky test * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` This also can be resolved by apache#17836 and apache#18486 later. 2. decrease run time by optimizing receive method * modify ` Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNull(message);` to ` Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message);` * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);` * keep other `consumer.receive(x, y)` no change.
…e#17836) ### Motivation When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id. The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null` https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409 And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`. https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490 https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560 For example: We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block. And then the msg3 with sequence ID 2 will never be persistent successfully. ### Modification Add a method to update `sequenceId` and move the method in the sync code. Via apache#16196 we should update message metadata before computing the message size.
When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id. The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null` https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409 And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`. https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490 https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560 For example: We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block. And then the msg3 with sequence ID 2 will never be persistent successfully. Add a method to update `sequenceId` and move the method in the sync code. Via #16196 we should update message metadata before computing the message size. (cherry picked from commit 7e258af)
Motivation
When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id.
The
internalSendWithTxnAsync
callinternalSendAsync
Asynchronously whentxn != null
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Line 409 in aeb4503
And the
sendAsync
acquire sequence ID is not included in the synchronized block withserializeAndSendMessage
.pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Line 490 in aeb4503
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Lines 555 to 560 in aeb4503
For example:
We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block.
And then the msg3 with sequence ID 2 will never be persistent successfully.
Modification
Add a method to update
sequenceId
and move the method in the sync code.Via #16196 we should update message metadata before computing the message size.
reproduce
This test can reproduce this problem, but this is not guaranteed to recur.
error logs
Documentation
doc-not-needed
(Please explain why)
Matching PR in forked repository
PR in forked repository: liangyepianzhou#6