-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: Fair message consumption from all partitions in partitioned-cons… #153
Conversation
CLA is valid! |
bf35ae0
to
5281a5c
Compare
@@ -142,7 +137,7 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { | |||
} | |||
|
|||
private void resumeReceivingFromPausedConsumersIfNeeded() { | |||
if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) { | |||
if (incomingMessages.size() < maxReceiverQueueSize && !pausedConsumers.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the threshold was there to prevent from flickering for each message (block-unblock-block-unblock...). Would that be an issue here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.. so, here issue is: this condition incomingMessages.size() <= sharedQueueResumeThreshold
will prevent pausedConsumers
to add messages into shared queue. and it will allow unpaused consumer to keep adding messages to queue if client is receiving messages at the same rate.
So, to create a fair chance: if any paused-consumer is present then rather allowing unpaused consumer adding messages, add it to the pausedConsumers
list and let it to be picked up from pausedConsumers
in round-robin.
Updated PR with the change.
8da178d
to
f622cfd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@rdhabalia There seems to be conflict with master |
@merlimat : resolved the conflict. |
* fix producer handler memory leak * consumer removed from client handler when closed * golanglint * add tests & fix deadlock when handler close * make handlers delete in closeOnce to prevent unnecessary calls * goimports
**Fixes:** apache#166 apache#153 apache#99 **Issue:** KoP uses [Kafka-2.0.0](https://github.com/streamnative/kop/blob/78d9ba3487d4d7c85a5d667d45d9b38aaa7c824f/pom.xml#L46) which supports [API_VERSION's](https://kafka.apache.org/protocol.html#The_Messages_ApiVersions) **0** --> **2** When **_Kafka-Clients-2.4.x+_**(using `API_VERSION: 3`) connects to KoP, it panics and following error stack is observed: `10:22:23.281 [pulsar-io-22-4] ERROR io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - error to get Response ByteBuf: java.lang.IllegalArgumentException: Invalid version for API key API_VERSIONS: 3 at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:312) ~[?:?] at org.apache.kafka.common.protocol.ApiKeys.responseSchema(ApiKeys.java:286) ~[?:?] at org.apache.kafka.common.requests.ApiVersionsResponse.toStruct(ApiVersionsResponse.java:129) ~[?:?] at org.apache.kafka.common.requests.ResponseUtils.serializeResponse(ResponseUtils.java:40) ~[?:?]` **Resolved By:** Returning an `UNSUPPORTED_VERSION` [error-code: 35](https://kafka.apache.org/protocol.html#protocol_error_codes), which would notify the **_kafka-client_** to lower it's `API_VERSION`. As no list of `ApiKeys & versions` were available for the **kafka-clients** to refer, it safely falls-back to using `API_VERSION: 0` and KoP continues processing the kafka-messages using `API_VERSION: 0`. **Tested producing/consuming with Kafka-Clients:** > 2.0.0 2.2.2 2.3.1 2.4.0 2.4.1 2.5.0 2.5.1 2.6.0 **More...** KoP could have returned the list of supported `ApiKeys & versions` while sending the `UNSUPPORTED_VERSION` error-code, which would have made the **_kafka-client_** select the **_latest_** supported `API_VERSION` and use `API_VERSION: 2` instead of falling all the way back and using `API_VERSION: 0` Notes on how **_Kafka-Brokers_** is supposed to handle this scenario: > 2. On receiving ApiVersionsRequest, a broker returns its full list of supported ApiKeys and versions regardless of current authentication state (e.g., before SASL authentication on an SASL listener, do note that no Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished). If this is considered to leak information about the broker version a workaround is to use SSL with client authentication which is performed at an earlier stage of the connection where the ApiVersionRequest is not available. Also, note that broker versions older than 0.10.0.0 do not support this API and will either ignore the request or close connection in response to the request. > > 3. If multiple versions of an API are supported by broker and client, clients are recommended to use the latest version supported by the broker and itself. _Reference: [Kafka-Protocol Guide](https://kafka.apache.org/protocol.html#api_versions)_ We analyzed how various **_Kafka-Brokers_** respond to a similar situation where the **_kafka-client's_** `API_VERSION` is higher than what is supported by the **_Kafka-Broker_**. ![Kafka-Broker-Client-Wireshark-Results](https://user-images.githubusercontent.com/63665447/91243701-34d3a500-e710-11ea-9752-f9980333ce1d.png) _Reference: Wireshark packet captures - [Kafka-Protocol-Study.zip](https://github.com/streamnative/kop/files/5127018/Kafka-Protocol-Study.zip)_ From the study we can infer that, in a similar `API_VERSION` mismatch scenario the **_Kafka-Brokers_** doesn't return the list of supported `ApiKeys & versions` when notifying the **_kafka-client_** with the `UNSUPPORTED_VERSION` [error-code: 35](https://kafka.apache.org/protocol.html#protocol_error_codes). Thus, forcing the **_kafka-clients_** to fall-back to using `API_VERSION: 0`. To keep KoP working in sync with the **_Kafka-Broker_** working, we decided not to return the list of supported `ApiKeys & versions`.
…umer
Motivation
Partitioned-consumer may not fairly consume messages from consumers added into
pausedConsumers
list until client clean-up queue up tosharedQueueResumeThreshold
by receiving messages from the queue.Modifications
consume from
pausedConsumers
when queue has space to add message.Result
All partitioned-consumers will have fair chance to consume messages.