-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: allow AsyncProducer to have MaxOpenRequests inflight produce requests per broker #1686
Conversation
@bai install dependencies failed, pls check and run again |
Could you please rebase your branch off of latest master? 🙏 |
This comment has been minimized.
This comment has been minimized.
0778cd0
to
d6201da
Compare
I rebased this PR off of latest master. |
@dnwe Do you think it's something we want to get merged? Asking since this PR's been open for a while. |
@bai thanks for the reminder — I reviewed the changes and they seemed sensible, essentially in-lining the existing sendAndReceive promise-based approach but modifying it to allow for buffering MaxOpenRequests. I did some quick benchmarking in the regular case (when brokers are responsive) and confirmed no noticable dip in producer throughput, so I think it is safe to merge this on the assumption that it is only likely to help with a small uptick in memory consumption if brokers are slow to response to the produce requests |
…2182) * producer: ensure that the management message (fin) is never "leaked" Since async producer now support multiple inflight messages thanks to #1686 and #2094, it now may "leak" the "fin" internal management message to Kafka (and to the client) when broker producer is reconnecting to Kafka broker and retries multiple inflight messages at the same time. * test:async-producer: test broker restart (this fixes #2150) * tests:async-producer: disable logger in TestAsyncProducerBrokerRestart * tests:async-producer: protect leader with a mutex to make race detector happy * test:async-producer: set 5mn default finish timeout * async-producer: do not clear bp.currentRetries when fin message is received just after syn * async-producer: use debug logger when fin message is handled for a healthy brokerProducer * test:async-producer:restart: make emptyValues atomic to avoid races * test:async-producer:restart: rename produceRequestTest to countRecordsWithEmptyValue * test:async-producer:restart: reduce retry backoff timeout to speed up the test * test:async-producer:restart: remove bogus 0
MOTIVATION
now, producer use method
sendAndReceive
of Broker to produceMessage, which is blocking model for next send, producer will wait too long when broker slows down, which cause later request waste too much time.CHANGE
we split produce and receive method, proudcer send proudcer produceSet one by one, and will push promise to pending channel to wait, so prodcuer will not block on produce, which decrease the cost of next send request. pending channel is consumed one by one too, because the request/response is in order, so next response always arrives later