-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed ordering issue in KeyShared dispatcher when adding consumer #7106
Conversation
@merlimat If I understand correctly all consumers can't get new messages if a new consumer joined right? The messages will add to the redelivery queue will lead to the next read entry operation replay the redelivery queue so that new messages can't keep going. This may cause an increase in consumption latency for all consumers. But, I think most of the existing consumers can continue to consume new messages because they do not overlap with the newly added consumer in any hash range. So in the #6977, I added a buffer for the messages that need to wait for mark delete position processed. This can minimize the consumption latency jitter caused by the newly added consumer especially when consumers are hungry. This will be more useful for e2e latency-sensitive scenes. I want to listen to your thoughts, maybe my thoughts are more worrying. Other than that, I have no objections to this PR. And except added a buffer, I think #6977 is the same, #6977 added a field in the consumer named |
@merlimat I thought about it carefully. The buffer is still possible to break the message order dispatching of the same key. It is better not to add the buffer at present. |
Yes, that was my thought. To guarantee the ordering we need for these consumers to wait for that messages to be acked before start consuming. On top of this I also have one more change with a setting for |
It sounds pretty good |
@merlimat Could you please help introduce the great approach at http://pulsar.apache.org/docs/en/concepts-messaging/#key_shared? This is very helpful for users to understand Key_Shared. I will merge this PR first.
Looking forward to your new PR. |
…te-update * 'website-update' of github.com:zeo1995/pulsar: (432 commits) Fixed ordering issue in KeyShared dispatcher when adding consumer (apache#7106) Fix Duplicated messages are sent to dead letter topic apache#6960 (apache#7021) [Issue 2793][Doc]--Update the TLS hostname verification for CPP and Python clients (apache#7162) [Doc]--set netty mex frame size (apache#7174) [Doc] Update for the maximum message size (apache#7171) Fixed KeyShared consumers getting stuck on delivery (apache#7105) [apache#6003][pulsar-functions] Possibility to add builtin Functions (apache#6895) [Issue 6921][pulsar-broker-common] Replaced "Paths.get(...).getParent()", because it's system dependent and uses '\' as path separator on Windows (apache#6992) Improve broker unit test CI (apache#7173) Fix typo in exception message (apache#7027) Support KeyValue Schema Use Null Key And Null Value (apache#7139) [Doc]--Update documents for support consumer priority level in failover mode (apache#7136) Add schema config to cpp and cgo docs. (apache#7137) [Doc]--Update for the maximum message size (apache#7160) [C++] Expose ZSTD and Snappy compression to C API (apache#7014) [pulsar-proxy] add proxyLogLevel into config file (apache#6948) Add multi-hosts example for bookkeeperMetadataServiceUri (apache#6998) support for termination of partitioned topic (apache#6126) Use pure-java Air-Compressor instead of JNI based libraries (apache#5390) [Issues 5709]remove the namespace checking (apache#5716) ... # Conflicts: # site2/website/scripts/split-swagger-by-version.js
…ache#7106) ### Motivation Fixes: apache#6554 Ordering is broken in KeyShared dispatcher if a new consumer `c2` comes in and an existing consumer `c1` goes out. This is because messages with keys previously assigned to `c1` may now route to `c2`. The solution here is to have new consumers joining in a "paused" state. For example, consider this sequence: 1. Subscriptions has `c1` and `c2` consumers 2. `c3` joins. Some of the keys are now supposed to go to `c3`. 3. Instead of starting delivering to `c3`. We mark the current readPosition (let's call it `rp0_c3`) of the cursor for `c3`. 4. Any message that now hashes to `c3` and that has `messageId >= rp0_c3` will be deferred for later re-delivery 5. Any message that might get re-delivered (eg: originally sent to `c1`, but `c1` has failed) to `c3` and that has `messageId < rp0_c3` will be sent to `c3` 6. When the markDelete position of the cursor will move past `rp0_c3` the restriction on `c3` will be lifted. Essentially, `c3` joins but can only receive old messages, until everything that was read before joining gets acked.
…ache#7106) ### Motivation Fixes: apache#6554 Ordering is broken in KeyShared dispatcher if a new consumer `c2` comes in and an existing consumer `c1` goes out. This is because messages with keys previously assigned to `c1` may now route to `c2`. The solution here is to have new consumers joining in a "paused" state. For example, consider this sequence: 1. Subscriptions has `c1` and `c2` consumers 2. `c3` joins. Some of the keys are now supposed to go to `c3`. 3. Instead of starting delivering to `c3`. We mark the current readPosition (let's call it `rp0_c3`) of the cursor for `c3`. 4. Any message that now hashes to `c3` and that has `messageId >= rp0_c3` will be deferred for later re-delivery 5. Any message that might get re-delivered (eg: originally sent to `c1`, but `c1` has failed) to `c3` and that has `messageId < rp0_c3` will be sent to `c3` 6. When the markDelete position of the cursor will move past `rp0_c3` the restriction on `c3` will be lifted. Essentially, `c3` joins but can only receive old messages, until everything that was read before joining gets acked.
…ey_Shared consumer stuck on delivery (#7553) ### Motivation In some case of Key_Shared consumer, messages ordering was broken. Here is how to reproduce(I think it is one of case to reproduce this issue). 1. Connect Consumer1 to Key_Shared subscription `sub` and stop to receive - receiverQueueSize: 500 2. Connect Producer and publish 500 messages with key `(i % 10)` 3. Connect Consumer2 to same subscription and start to receive - receiverQueueSize: 1 - since #7106 , Consumer2 can't receive (expected) 4. Producer publish more 500 messages with same key generation algorithm 5. After that, Consumer1 start to receive 6. Check Consumer2 message ordering - sometimes message ordering was broken in same key Consumer1: ``` Connected: Tue Jul 14 09:36:39 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/key-shared-test] [sub0] [820f0] Prefetched messages: 499 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 Received: my-message-0 PublishTime: 1594687006203 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-1 PublishTime: 1594687006243 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-2 PublishTime: 1594687006247 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-498 PublishTime: 1594687008727 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-499 PublishTime: 1594687008731 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-500 PublishTime: 1594687038742 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-990 PublishTime: 1594687040094 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-994 PublishTime: 1594687040103 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-995 PublishTime: 1594687040105 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-997 PublishTime: 1594687040113 Date: Tue Jul 14 09:37:46 JST 2020 ``` Consumer2: ``` Connected: Tue Jul 14 09:37:03 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider Received: my-message-501 MessageId: 4:1501:-1 PublishTime: 1594687038753 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-502 MessageId: 4:1502:-1 PublishTime: 1594687038755 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-503 MessageId: 4:1503:-1 PublishTime: 1594687038759 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-506 MessageId: 4:1506:-1 PublishTime: 1594687038785 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-508 MessageId: 4:1508:-1 PublishTime: 1594687038812 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-901 MessageId: 4:1901:-1 PublishTime: 1594687039871 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-509 MessageId: 4:1509:-1 PublishTime: 1594687038815 Date: Tue Jul 14 09:37:46 JST 2020 ordering was broken, key: 1 oldNum: 901 newNum: 511 Received: my-message-511 MessageId: 4:1511:-1 PublishTime: 1594687038826 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-512 MessageId: 4:1512:-1 PublishTime: 1594687038830 Date: Tue Jul 14 09:37:46 JST 2020 ... ``` I think this issue is caused by #7105. Here is an example. 1. dispatch messages 2. Consumer2 was stuck and `totalMessagesSent=0` - Consumer2 availablePermits was 0 3. skip redeliver messages temporally - Consumer2 availablePermits was back to 1 4. dispatch new messages - new message was dispatched to Consumer2 5. back to redeliver messages 4. dispatch messages - ordering was broken ### Modifications Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery.
…ey_Shared consumer stuck on delivery (apache#7553) ### Motivation In some case of Key_Shared consumer, messages ordering was broken. Here is how to reproduce(I think it is one of case to reproduce this issue). 1. Connect Consumer1 to Key_Shared subscription `sub` and stop to receive - receiverQueueSize: 500 2. Connect Producer and publish 500 messages with key `(i % 10)` 3. Connect Consumer2 to same subscription and start to receive - receiverQueueSize: 1 - since apache#7106 , Consumer2 can't receive (expected) 4. Producer publish more 500 messages with same key generation algorithm 5. After that, Consumer1 start to receive 6. Check Consumer2 message ordering - sometimes message ordering was broken in same key Consumer1: ``` Connected: Tue Jul 14 09:36:39 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/key-shared-test] [sub0] [820f0] Prefetched messages: 499 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 Received: my-message-0 PublishTime: 1594687006203 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-1 PublishTime: 1594687006243 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-2 PublishTime: 1594687006247 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-498 PublishTime: 1594687008727 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-499 PublishTime: 1594687008731 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-500 PublishTime: 1594687038742 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-990 PublishTime: 1594687040094 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-994 PublishTime: 1594687040103 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-995 PublishTime: 1594687040105 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-997 PublishTime: 1594687040113 Date: Tue Jul 14 09:37:46 JST 2020 ``` Consumer2: ``` Connected: Tue Jul 14 09:37:03 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider Received: my-message-501 MessageId: 4:1501:-1 PublishTime: 1594687038753 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-502 MessageId: 4:1502:-1 PublishTime: 1594687038755 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-503 MessageId: 4:1503:-1 PublishTime: 1594687038759 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-506 MessageId: 4:1506:-1 PublishTime: 1594687038785 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-508 MessageId: 4:1508:-1 PublishTime: 1594687038812 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-901 MessageId: 4:1901:-1 PublishTime: 1594687039871 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-509 MessageId: 4:1509:-1 PublishTime: 1594687038815 Date: Tue Jul 14 09:37:46 JST 2020 ordering was broken, key: 1 oldNum: 901 newNum: 511 Received: my-message-511 MessageId: 4:1511:-1 PublishTime: 1594687038826 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-512 MessageId: 4:1512:-1 PublishTime: 1594687038830 Date: Tue Jul 14 09:37:46 JST 2020 ... ``` I think this issue is caused by apache#7105. Here is an example. 1. dispatch messages 2. Consumer2 was stuck and `totalMessagesSent=0` - Consumer2 availablePermits was 0 3. skip redeliver messages temporally - Consumer2 availablePermits was back to 1 4. dispatch new messages - new message was dispatched to Consumer2 5. back to redeliver messages 4. dispatch messages - ordering was broken ### Modifications Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery.
…ey_Shared consumer stuck on delivery (apache#7553) ### Motivation In some case of Key_Shared consumer, messages ordering was broken. Here is how to reproduce(I think it is one of case to reproduce this issue). 1. Connect Consumer1 to Key_Shared subscription `sub` and stop to receive - receiverQueueSize: 500 2. Connect Producer and publish 500 messages with key `(i % 10)` 3. Connect Consumer2 to same subscription and start to receive - receiverQueueSize: 1 - since apache#7106 , Consumer2 can't receive (expected) 4. Producer publish more 500 messages with same key generation algorithm 5. After that, Consumer1 start to receive 6. Check Consumer2 message ordering - sometimes message ordering was broken in same key Consumer1: ``` Connected: Tue Jul 14 09:36:39 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/key-shared-test] [sub0] [820f0] Prefetched messages: 499 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 Received: my-message-0 PublishTime: 1594687006203 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-1 PublishTime: 1594687006243 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-2 PublishTime: 1594687006247 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-498 PublishTime: 1594687008727 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-499 PublishTime: 1594687008731 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-500 PublishTime: 1594687038742 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-990 PublishTime: 1594687040094 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-994 PublishTime: 1594687040103 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-995 PublishTime: 1594687040105 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-997 PublishTime: 1594687040113 Date: Tue Jul 14 09:37:46 JST 2020 ``` Consumer2: ``` Connected: Tue Jul 14 09:37:03 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider Received: my-message-501 MessageId: 4:1501:-1 PublishTime: 1594687038753 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-502 MessageId: 4:1502:-1 PublishTime: 1594687038755 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-503 MessageId: 4:1503:-1 PublishTime: 1594687038759 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-506 MessageId: 4:1506:-1 PublishTime: 1594687038785 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-508 MessageId: 4:1508:-1 PublishTime: 1594687038812 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-901 MessageId: 4:1901:-1 PublishTime: 1594687039871 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-509 MessageId: 4:1509:-1 PublishTime: 1594687038815 Date: Tue Jul 14 09:37:46 JST 2020 ordering was broken, key: 1 oldNum: 901 newNum: 511 Received: my-message-511 MessageId: 4:1511:-1 PublishTime: 1594687038826 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-512 MessageId: 4:1512:-1 PublishTime: 1594687038830 Date: Tue Jul 14 09:37:46 JST 2020 ... ``` I think this issue is caused by apache#7105. Here is an example. 1. dispatch messages 2. Consumer2 was stuck and `totalMessagesSent=0` - Consumer2 availablePermits was 0 3. skip redeliver messages temporally - Consumer2 availablePermits was back to 1 4. dispatch new messages - new message was dispatched to Consumer2 5. back to redeliver messages 4. dispatch messages - ordering was broken ### Modifications Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery.
…ey_Shared consumer stuck on delivery (apache#7553) ### Motivation In some case of Key_Shared consumer, messages ordering was broken. Here is how to reproduce(I think it is one of case to reproduce this issue). 1. Connect Consumer1 to Key_Shared subscription `sub` and stop to receive - receiverQueueSize: 500 2. Connect Producer and publish 500 messages with key `(i % 10)` 3. Connect Consumer2 to same subscription and start to receive - receiverQueueSize: 1 - since apache#7106 , Consumer2 can't receive (expected) 4. Producer publish more 500 messages with same key generation algorithm 5. After that, Consumer1 start to receive 6. Check Consumer2 message ordering - sometimes message ordering was broken in same key Consumer1: ``` Connected: Tue Jul 14 09:36:39 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/key-shared-test] [sub0] [820f0] Prefetched messages: 499 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 Received: my-message-0 PublishTime: 1594687006203 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-1 PublishTime: 1594687006243 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-2 PublishTime: 1594687006247 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-498 PublishTime: 1594687008727 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-499 PublishTime: 1594687008731 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-500 PublishTime: 1594687038742 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-990 PublishTime: 1594687040094 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-994 PublishTime: 1594687040103 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-995 PublishTime: 1594687040105 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-997 PublishTime: 1594687040113 Date: Tue Jul 14 09:37:46 JST 2020 ``` Consumer2: ``` Connected: Tue Jul 14 09:37:03 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider Received: my-message-501 MessageId: 4:1501:-1 PublishTime: 1594687038753 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-502 MessageId: 4:1502:-1 PublishTime: 1594687038755 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-503 MessageId: 4:1503:-1 PublishTime: 1594687038759 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-506 MessageId: 4:1506:-1 PublishTime: 1594687038785 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-508 MessageId: 4:1508:-1 PublishTime: 1594687038812 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-901 MessageId: 4:1901:-1 PublishTime: 1594687039871 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-509 MessageId: 4:1509:-1 PublishTime: 1594687038815 Date: Tue Jul 14 09:37:46 JST 2020 ordering was broken, key: 1 oldNum: 901 newNum: 511 Received: my-message-511 MessageId: 4:1511:-1 PublishTime: 1594687038826 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-512 MessageId: 4:1512:-1 PublishTime: 1594687038830 Date: Tue Jul 14 09:37:46 JST 2020 ... ``` I think this issue is caused by apache#7105. Here is an example. 1. dispatch messages 2. Consumer2 was stuck and `totalMessagesSent=0` - Consumer2 availablePermits was 0 3. skip redeliver messages temporally - Consumer2 availablePermits was back to 1 4. dispatch new messages - new message was dispatched to Consumer2 5. back to redeliver messages 4. dispatch messages - ordering was broken ### Modifications Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery.
…ey_Shared consumer stuck on delivery (#7553) ### Motivation In some case of Key_Shared consumer, messages ordering was broken. Here is how to reproduce(I think it is one of case to reproduce this issue). 1. Connect Consumer1 to Key_Shared subscription `sub` and stop to receive - receiverQueueSize: 500 2. Connect Producer and publish 500 messages with key `(i % 10)` 3. Connect Consumer2 to same subscription and start to receive - receiverQueueSize: 1 - since #7106 , Consumer2 can't receive (expected) 4. Producer publish more 500 messages with same key generation algorithm 5. After that, Consumer1 start to receive 6. Check Consumer2 message ordering - sometimes message ordering was broken in same key Consumer1: ``` Connected: Tue Jul 14 09:36:39 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/key-shared-test] [sub0] [820f0] Prefetched messages: 499 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 Received: my-message-0 PublishTime: 1594687006203 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-1 PublishTime: 1594687006243 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-2 PublishTime: 1594687006247 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-498 PublishTime: 1594687008727 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-499 PublishTime: 1594687008731 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-500 PublishTime: 1594687038742 Date: Tue Jul 14 09:37:46 JST 2020 ... Received: my-message-990 PublishTime: 1594687040094 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-994 PublishTime: 1594687040103 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-995 PublishTime: 1594687040105 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-997 PublishTime: 1594687040113 Date: Tue Jul 14 09:37:46 JST 2020 ``` Consumer2: ``` Connected: Tue Jul 14 09:37:03 JST 2020 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider Received: my-message-501 MessageId: 4:1501:-1 PublishTime: 1594687038753 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-502 MessageId: 4:1502:-1 PublishTime: 1594687038755 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-503 MessageId: 4:1503:-1 PublishTime: 1594687038759 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-506 MessageId: 4:1506:-1 PublishTime: 1594687038785 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-508 MessageId: 4:1508:-1 PublishTime: 1594687038812 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-901 MessageId: 4:1901:-1 PublishTime: 1594687039871 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-509 MessageId: 4:1509:-1 PublishTime: 1594687038815 Date: Tue Jul 14 09:37:46 JST 2020 ordering was broken, key: 1 oldNum: 901 newNum: 511 Received: my-message-511 MessageId: 4:1511:-1 PublishTime: 1594687038826 Date: Tue Jul 14 09:37:46 JST 2020 Received: my-message-512 MessageId: 4:1512:-1 PublishTime: 1594687038830 Date: Tue Jul 14 09:37:46 JST 2020 ... ``` I think this issue is caused by #7105. Here is an example. 1. dispatch messages 2. Consumer2 was stuck and `totalMessagesSent=0` - Consumer2 availablePermits was 0 3. skip redeliver messages temporally - Consumer2 availablePermits was back to 1 4. dispatch new messages - new message was dispatched to Consumer2 5. back to redeliver messages 4. dispatch messages - ordering was broken ### Modifications Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery. (cherry picked from commit c7ac08b)
Note: this is based on top of #6791, #7104 and #7105. Once these are merged, I'll rebase here. For the sake of this review, check commit ad2e39bMotivation
Fixes: #6554
Ordering is broken in KeyShared dispatcher if a new consumer
c2
comes in and an existing consumerc1
goes out.This is because messages with keys previously assigned to
c1
may now route toc2
.The solution here is to have new consumers joining in a "paused" state.
For example, consider this sequence:
c1
andc2
consumersc3
joins. Some of the keys are now supposed to go toc3
.c3
. We mark the current readPosition (let's call itrp0_c3
) of the cursor forc3
.c3
and that hasmessageId >= rp0_c3
will be deferred for later re-deliveryc1
, butc1
has failed) toc3
and that hasmessageId < rp0_c3
will be sent toc3
rp0_c3
the restriction onc3
will be lifted.Essentially,
c3
joins but can only receive old messages, until everything that was read before joining gets acked.