-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix Intermittent test failures in ProxyPublishConsumeTest.socketTest #253
Conversation
} catch (Exception e) { | ||
log.error(e.getMessage()); | ||
} | ||
newFixedThreadPool(1).submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The executor thread will be leaked in this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, updated it.
cea3e06
to
1988a30
Compare
} | ||
}); | ||
// let's wait for clients to be stopped | ||
Thread.sleep(2000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of always sleeping 2 seconds, we could use the future returned by executor.submit()
and do a get()
with 2sec timeout.
Also, can you do the same change in ProxyPublishConsumeWithoutZKTest
and ProxyPublishConsumeTls
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Fixes apache#247 The original implementation of message publishing cannot guarantee that a topic partition's pending writes are completed in order. This PR is to fix the issue by refactoring the handler for Produce requests. The basic steps of handling a single partition's produce request are: Get PersistentTopic from topic manager; Convert Kafka's MemoryRecords to Pulsar's ByteBuf; Call PersistentTopic#publishMessages to write ByteBuf to BK asynchronously. This PR adds a PendingProduce class to compose pending step 1 and 2 to a CompletableFuture, then add the PendingProduce object to a queue (PendingProduceQueue) which is associated with the partition name in a map named pendingProduceQueueMap. When the step 1 and 2 are completed, the queue will try to remove all ready PendingProduce objects in the head and call PersistentTopic#publishMessages. We use synchronized keyword to make the remove operation thread safe. Therefore, the first two steps can be executed in parallel, but the third step is executed in order. In addition, this PR fixes the current message order test by using different batch size. In the original test, all messages are batched to a single batch, so the disorder never happens because there's only one batch. * Refactor handleProduceRequest to fix message disorder issue * Fix existed message order test * Synchronize PendingProduce#publishMessages * Test message order with different batch.size config
Motivation
#237 sometime
WebSocketClient.stop()
gets stuck and it cause the test-case failure. So, wait for 2-seconds for clients to be closed and then ignore it.