Skip to content

Commit

Permalink
[fix][client] moving get sequenceId into the sync code segment (#17836)
Browse files Browse the repository at this point in the history
When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id.
The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null`
https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409
And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`.
https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490
https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560
For example:
We send 4 messages (msg1, msg2, msg3, msg4)  to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block.
And then the msg3 with sequence ID 2 will never be persistent successfully.
Add a method to update `sequenceId` and move the method in the sync code.
Via #16196 we should update message metadata before computing the message size.

(cherry picked from commit 7e258af)
  • Loading branch information
liangyepianzhou committed Mar 16, 2023
1 parent cc68533 commit 17b3f2a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -373,4 +376,51 @@ public void testKeyBasedBatchingOrder() throws Exception {
consumer.close();
producer.close();
}

@Test
public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
final String topic = "persistent://my-property/my-ns/testUpdateSequenceIdInSyncCodeSegment";
int totalMessage = 200;
int threadSize = 5;
String topicName = "subscription";
ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
conf.setBrokerDeduplicationEnabled(true);

//build producer/consumer
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer")
.sendTimeout(0, TimeUnit.SECONDS)
.create();

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(topicName)
.subscribe();

CountDownLatch countDownLatch = new CountDownLatch(threadSize);
//Send messages in multiple-thread
for (int i = 0; i < threadSize; i++) {
executorService.submit(() -> {
try {
for (int j = 0; j < totalMessage; j++) {
//The message will be sent with out-of-order sequence ID.
producer.newMessage().sendAsync();
}
} catch (Exception e) {
log.error("Failed to send/ack messages with transaction.", e);
} finally {
countDownLatch.countDown();
}
});
}
//wait the all send op is executed and store its futures in the arraylist.
countDownLatch.await();

for (int i = 0; i < threadSize * totalMessage; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
// Producer id, used to identify a producer within a single connection
protected final long producerId;

// Variable is used through the atomic updater
// Variable is updated in a synchronized block
private volatile long msgIdGenerator;

private final OpSendMsgQueue pendingMessages;
Expand Down Expand Up @@ -169,10 +169,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne

private boolean errorState;

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
.newUpdater(ProducerImpl.class, "msgIdGenerator");

public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
Expand Down Expand Up @@ -489,7 +485,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {

// Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
// into chunks.
final long sequenceId = updateMessageMetadata(msgMetadata, uncompressedSize);
updateMessageMetadata(msgMetadata, uncompressedSize);

// send in chunks
int totalChunks;
Expand Down Expand Up @@ -529,6 +525,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
try {
synchronized (this) {
int readStartIndex = 0;
final long sequenceId = updateMessageMetadataSequenceId(msgMetadata);
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
Expand Down Expand Up @@ -570,15 +567,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
* @param uncompressedSize
* @return the sequence id
*/
private long updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
final long sequenceId;
if (!msgMetadata.hasSequenceId()) {
sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
msgMetadata.setSequenceId(sequenceId);
} else {
sequenceId = msgMetadata.getSequenceId();
}

private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
if (!msgMetadata.hasPublishTime()) {
msgMetadata.setPublishTime(client.getClientClock().millis());

Expand All @@ -592,6 +581,16 @@ private long updateMessageMetadata(final MessageMetadata msgMetadata, final int
}
msgMetadata.setUncompressedSize(uncompressedSize);
}
}

private long updateMessageMetadataSequenceId(final MessageMetadata msgMetadata) {
final long sequenceId;
if (!msgMetadata.hasSequenceId()) {
sequenceId = msgIdGenerator++;
msgMetadata.setSequenceId(sequenceId);
} else {
sequenceId = msgMetadata.getSequenceId();
}
return sequenceId;
}

Expand Down

0 comments on commit 17b3f2a

Please sign in to comment.