Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Random latency spikes when producing messages #2912

Closed
4 of 7 tasks
meetsitaram opened this issue May 31, 2020 · 34 comments · Fixed by #3340
Closed
4 of 7 tasks

Random latency spikes when producing messages #2912

meetsitaram opened this issue May 31, 2020 · 34 comments · Fixed by #3340

Comments

@meetsitaram
Copy link

Description

I have been running latency tests for my application using librdkafka and noticed some random latency spikes of upto 1 second when producing messages. This happens even when my queue.buffering.max.ms is only set to 10 milliseconds. I am trying to investigate if the issue is within librdkafka, or something specific to my kafka configuration or at the server.

How to reproduce

The issue can be reproduced by running kafka server locally on Mac, and running examples/rdkakfa_performance, setting two extra configurations:

rd_kafka_conf_set(conf, "queue.buffering.max.ms", "10", NULL, 0);
rd_kafka_conf_set(conf, "request.required.acks", "1", NULL, 0);
./examples/rdkafka_performance -t test-topic -b localhost:9092 -P -s 100 -r 1000 -l -A latency.txt

This issue can be reproduced more often by producing at a rate of 1k msgs/sec. As this happens at random, it has to be running for a few minutes before we see any spikes in latency.

Checklist

  • librdkafka version (release number or git tag): 1.4.2
  • Apache Kafka version: 2.5.0
  • librdkafka client configuration: queue.buffering.max.ms=10, request.required.acks=1
  • Operating system: macOs Mojave
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue

Below are some sample logs from running examples/rdkafka_performance. For a produce rate of 1k/sec, and queue buffering of 10ms, we should be seeing max of 10 messages in queue. However, some times, the queue has 100s of messages.

$ ./examples/rdkafka_performance -t test-topic -b localhost:9092 -P -s 100 -r 1000 -l -A latency.txt
% Sending messages of size 100 bytes
% 833 messages produced 828 delivered, 5 in queue
% 1681 messages produced 1676 delivered, 5 in queue
% 2511 messages produced 2508 delivered, 3 in queue
% 3311 messages produced 2788 delivered, 523 in queue
% 4132 messages produced 4127 delivered, 5 in queue
% 4963 messages produced 4961 delivered, 2 in queue
% 5815 messages produced 5810 delivered, 5 in queue
% 6654 messages produced 6649 delivered, 5 in queue
% 7462 messages produced 6964 delivered, 498 in queue
% 8273 messages produced 7864 delivered, 409 in queue
% 9107 messages produced 9104 delivered, 3 in queue
% 9959 messages produced 9954 delivered, 5 in queue
% 10795 messages produced 10789 delivered, 6 in queue
% 11621 messages produced 11618 delivered, 3 in queue
% 12404 messages produced 11892 delivered, 512 in queue
% 13230 messages produced 13221 delivered, 9 in queue
% 14048 messages produced 13424 delivered, 624 in queue
% 14888 messages produced 14882 delivered, 6 in queue
% 15730 messages produced 15726 delivered, 4 in queue
% 16559 messages produced 16442 delivered, 117 in queue
% 17390 messages produced 17383 delivered, 7 in queue
% 18217 messages produced 17751 delivered, 466 in queue
% 19048 messages produced 19041 delivered, 7 in queue
% 19875 messages produced 19867 delivered, 8 in queue
% 20706 messages produced 20698 delivered, 8 in queue
% 21534 messages produced 21527 delivered, 7 in queue
% 22349 messages produced 21612 delivered, 737 in queue
% 23179 messages produced 23171 delivered, 8 in queue
% 23551 messages produced 23551 delivered, 0 in queue

Per message top latencies reach above 900 milliseconds:

$ cat latency.txt | sort -n -r | head -10
945274
943359
941492
940687
940258
939312
936452
935604
935409
932852

I also verified that this is not just because server is not able to handle the load, by running at peak rate:

$ ./examples/rdkafka_performance -t test-topic -b localhost:9092 -P -s 100
% Sending messages of size 100 bytes
% 1335399 messages produced (133539900 bytes), 1309811 delivered (offset 7433195, 0 failed) in 1002ms: 1305974 msgs/s and 130.60 MB/s, 0 produce failures, 25589 in queue, no compression
% 2675854 messages produced (267585400 bytes), 2659421 delivered (offset 7769786, 0 failed) in 2003ms: 1327255 msgs/s and 132.73 MB/s, 4 produce failures, 16435 in queue, no compression

Debugging internals

I tried debugging the rdkafka code, and noticed the major lag is happening at https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_transport.c#L965 in poll(). However, I do not understand what that means, and also if that is the main cause of delay.

@meetsitaram
Copy link
Author

I also ran a java producer against the same server and topic with below config and did see spikes of upto 150ms, but nothing close to 900ms.

props.put(ProducerConfig.LINGER_MS_CONFIG, 10); 
props.put(ProducerConfig.ACKS_CONFIG, "1");

@edenhill
Copy link
Contributor

How many partitions does test-topic have, how many brokers in your cluster, and what is the replication-factor of the topic?

@meetsitaram
Copy link
Author

Partitions: 4, Cluster Size: 12, Replication Factor: 6

I was also able to reproduce the issue on my local mac with:
Partitions: 1, Cluster Size: 1, Replication Factor: 1

@edenhill
Copy link
Contributor

See if the latency spikes can be correlated with disk flush/writes or some other broker activity by increasing the broker log level and inspecting the logs.

@meetsitaram
Copy link
Author

I tried changing broker log.flush config parameters to be 600seconds, 60 seconds and 1 second. However, it is hard to find any clear pattern of the latency spike.

Also, even after enabling debug log level in kafka/log4j.properties, i do not see any logs on the server related to disk flush:

log4j.rootLogger=DEBUG, stdout, kafkaAppender
log4j.logger.kafka=DEBUG, kafkaAppender

Below are the broker parameters I changed in kafka/server.properties:

log.flush.interval.messages=1000000
log.flush.interval.ms=1000
log.flush.offset.checkpoint.interval.ms=1000
log.flush.scheduler.interval.ms = 1000
log.flush.start.offset.checkpoint.interval.ms=1000

@meetsitaram
Copy link
Author

This is happening only when request.required.acks=1 is set along with queue.buffering.max.ms=10. I do not see the spikes when using default request.required.acks.

@edenhill
Copy link
Contributor

I tried debugging the rdkafka code, and noticed the major lag is happening at https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_transport.c#L965 in poll(). However, I do not understand what that means, and also if that is the main cause of delay.

Can you add a debug printf to (or similar) to see what the poll timeout parameter is set to when you get the latency spikes?

@meetsitaram
Copy link
Author

It is being set to 10 at normal times, but set to 999 during these spikes:

 poll timed out,  time_spent: 12291, r:0, tmout:10 
 poll timed out,  time_spent: 9370, r:0, tmout:8 
 poll timed out,  time_spent: 1100, r:0, tmout:1 
% 5611 messages produced (561100 bytes), 4846 delivered (offset 1889110, 0 failed) in 7004ms: 691 msgs/s and 0.07 MB/s, 0 produce failures, 765 in queue, no compression
 poll timed out,  time_spent: 1002570, r:0, tmout:999 
 poll timed out,  time_spent: 12067, r:0, tmout:10 
% 3193 messages produced (319300 bytes), 3191 delivered (offset 1895382, 0 failed) in 4003ms: 796 msgs/s and 0.08 MB/s, 0 produce failures, 2 in queue, no compression
 poll timed out,  time_spent: 8886, r:0, tmout:7 
% 3961 messages produced (396100 bytes), 3200 delivered (offset 1895391, 0 failed) in 5004ms: 639 msgs/s and 0.06 MB/s, 0 produce failures, 761 in queue, no compression
 poll timed out,  time_spent: 1000559, r:0, tmout:999 

This is the debug log i added:

rd_ts_t t_start = rd_clock();
r = poll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
if (r <= 0) {
        rd_ts_t t_end = rd_clock();
        printf(" poll timed out,  time_spent: %ld, r:%d, tmout:%d \n", t_end - t_start, r, tmout);

@edenhill
Copy link
Contributor

edenhill commented Jul 7, 2020

Interesting.
If you could repro this with your debug print and debug=all configured it would be great.

@meetsitaram
Copy link
Author

Here is the full log file - https://gist.github.com/meetsitaram/19fbc055b41f2d9abc2ebd9facd10296

sample log at latency spike time:

%7|1594163356.134|SEND|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Sent ProduceRequest (v7, 1208 bytes @ 0, CorrId 2269)
%7|1594163356.134|RECV|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Received ProduceResponse (v7, 54 bytes, CorrId 2269, rtt 0.59ms)
%7|1594163356.135|MSGSET|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0]: MessageSet with 10 message(s) (MsgId 0, BaseSeq -1) delivered
%7|1594163356.135|TOPPAR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0] 1 message(s) in xmit queue (1 added from partition queue)
%7|1594163356.144|TOPPAR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0] 8 message(s) in xmit queue (7 added from partition queue)
%7|1594163356.145|TOPPAR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0] 9 message(s) in xmit queue (1 added from partition queue)
%7|1594163356.145|PRODUCE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0]: Produce MessageSet with 9 message(s) (1042 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
%7|1594163356.145|SEND|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Sent ProduceRequest (v7, 1099 bytes @ 0, CorrId 2270)
%7|1594163356.146|RECV|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Received ProduceResponse (v7, 54 bytes, CorrId 2270, rtt 0.59ms)
%7|1594163356.146|MSGSET|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0]: MessageSet with 9 message(s) (MsgId 0, BaseSeq -1) delivered
%7|1594163357.148|TOPPAR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0] 763 message(s) in xmit queue (763 added from partition queue)
 out,  time_spent: 12485, r:0, tmout:10 
 poll timed out,  time_spent: 11038, r:0, tmout:10 
 poll timed out,  time_spent: 11677, r:0, tmout:10 
 poll timed out,  time_spent: 10111, r:0, tmout:10 
 poll timed out,  time_spent: 11289, r:0, tmout:10 
% 22197 messages produced (2219700 bytes), 22195 delivered (offset 91180, 0 failed) in 28016ms: 792 msgs/s and 0.08 MB/s, 0 produce failures, 2 in queue, no compression
 poll timed out,  time_spent: 11004, r:0, tmout:10 
 poll timed out,  time_spent: 10825, r:0, tmout:10 
 poll timed out,  time_spent: 8924, r:0, tmout:8 
 poll timed out,  time_spent: 1330, r:0, tmout:1 
% 22962 messages produced (2296200 bytes), 22224 delivered (offset 91209, 0 failed) in 29017ms: 765 msgs/s and 0.08 MB/s, 0 produce failures, 738 in queue, no compression
 poll timed out,  time_spent: 1001766, r:0, tmout:998 
lag before create producer request, 1001707
latency 1001731 1001731 0
latency 1000728 1000731 3
...
%7|1594163357.148|PRODUCE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0]: Produce MessageSet with 763 message(s) (84642 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
%7|1594163357.148|SEND|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Sent ProduceRequest (v7, 84699 bytes @ 0, CorrId 2271)
%7|1594163357.148|TOPPAR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0] 1 message(s) in xmit queue (1 added from partition queue)
%7|1594163357.149|RECV|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Received ProduceResponse (v7, 54 bytes, CorrId 2271, rtt 0.91ms)
%7|1594163357.149|MSGSET|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0]: MessageSet with 763 message(s) (MsgId 0, BaseSeq -1) delivered
%7|1594163357.149|TOPPAR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0] 2 message(s) in xmit queue (1 added from partition queue)
%7|1594163357.160|TOPPAR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0] 11 message(s) in xmit queue (9 added from partition queue)
%7|1594163357.161|PRODUCE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/0: test-topic [0]: Produce MessageSet with 11 message(s) (1260 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)

@meetsitaram
Copy link
Author

From this log, it appears the delay is happening even before the produce call.

@meetsitaram
Copy link
Author

Any suggestions on how to troubleshoot further?

@meetsitaram
Copy link
Author

would be helpful if anyone can give more clues on what to debug. I would like to troubleshoot this issue, but not sure how to proceed further. thx.

@meetsitaram
Copy link
Author

@edenhill I think i found the bug.

Here is what i understood so far. The broker thread loops constantly with rd_kafka_max_block_ms = 1000 default timeout. Along the way, the timeout is shortened based on various conditions. One of those conditions is to honour queue.buffering.max.ms contract.

wait_max = rd_kafka_msg_enq_time(rkm) + rkb->rkb_rk->rk_conf.buffering_max_us;

if (wait_max > now) {
        /* Wait for more messages or queue.buffering.max.ms
                * to expire. */
        if (wait_max < *next_wakeup)
                *next_wakeup = wait_max;
        return 0;
}

However, before that logic if reached, there are multiple conditions in rd_kafka_toppar_producer_serve() that return without honouring that check. One of them is if the message count is 0, it returns without updating the next_wakeup time.

r = rktp->rktp_xmit_msgq.rkmq_msg_cnt;
if (r == 0)
    return 0;

This results in invoking rd_kafka_transport_poll() with almost 1 second timeout. If the poll() doesn't return until the timeout, there are messages getting queued up during this 1 second interval in a separate thread. The queue.buffering.max.ms condition will fail for all these queued up messages and could result in P99 latency of close to a second for the messages in that batch.

I was able to replicate the issue and resolve it my making the below change, although i haven't done any thorough testing.

if (r == 0) {
    *next_wakeup = now +  rkb->rkb_rk->rk_conf.buffering_max_us;
    return 0;
}

Please let me know if this makes sense so i can create a PR for this.

@edenhill
Copy link
Contributor

edenhill commented Sep 2, 2020

Great investigation @meetsitaram !

The rktp_xmit_msgq is the broker's local queue (without locks) that any new messages on the (locked) rktp_msgq are moved to in toppar_serve.
If xmit_msgq is empty it means there were no messages on rktp_msgq.

When a message is appended to rktp_msgq and the queue was previously empty an IO-based wakeup is triggered on the broker thread, waking it up from its blocking IO poll.
This is what should be happening here, but the implementation is more complex than that so it might be that we're either hitting the IO-wakeups rate-limiter (at most linger.ms), or the forwarded-to-queue (it is complicated) is non-empty.

Setting the next_wakeup to buffering_max_us (linger.ms) is not a good solution because with a low linger.ms it will cause a lot of unwarranted wakeups.

@meetsitaram
Copy link
Author

@edenhill Hitting the IO-wakeup rate-limiter when a message is appended to a previously empty rktp_msgq is also a problem. The rate-limit is only for linger.ms, but once the first message is enqueued, it will never trigger a wakeup again even after enqueuing subsequent messages and hence broker thread is blocked until the poll times out after 1 second.

Looks like this is what is happening. rd_kafka_toppar_enq_msg() triggers a wakeup only after first msg is enqueued, but with rate_limit set to true:

        if (unlikely(queue_len == 1 &&
                     (wakeup_q = rktp->rktp_msgq_wakeup_q)))

        if (wakeup_q) {
                rd_kafka_q_yield(wakeup_q, rd_true/*rate-limit*/);

rd_kafka_q_io_event(rkq, rate_limit) then checks for rate_limit and will not wake up as there is still time left:

        if (rate_limit) {
                if (likely(rkq->rkq_qio->ts_last + rkq->rkq_qio->ts_rate > now)) 
                        return;

No further wakeup calls are made upon enqueuing further messages. I am not familiar with how a forwarded-to-queue plays role in here.

Can we atleast invoke rd_kafka_q_yield without rate_limit from rd_kafka_toppar_enq_msg() rd_kafka_q_yield(wakeup_q, rd_false/*no rate-limiting*/); as this happens only when queue_len is 1? I tested and verified this approach also resolves the latency spikes.

@edenhill
Copy link
Contributor

Rate-limiting was introduced to limit CPU wakeups and thus CPU usage:
#2509

So we need to be careful not to reintroduce that problem.

@edenhill
Copy link
Contributor

edenhill commented Sep 11, 2020

@shanson7 Would it be possible for you to try out the suggested fix by toggling the rd_true rate-limit to rd_false here (see link) and then run your workload tests, monitoring throughput and CPU usage as in #2509?
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_partition.c#L689

@shanson7
Copy link
Contributor

I spoke with @meetsitaram and told him the steps I used to test before. Hopefully he can reproduce. I basically just used time ./examples/rdkafka_performance ... where rdkafka_performance was built with and without particular changes (or different versions) and compared the output from time which includes wall time and cpu time.

@meetsitaram
Copy link
Author

As suspected, removing the rate-limit does impact performance severely. Below are the performance results for varying linger.ms, with existing code, fix with removing rate-limit, and fix with next-wakeup updated to linger.ms. Though performance was severly hit with removing rate-limit, updating next-wakeup looks promising. This approach also makes the wakeup logic, rate-limited or not, unnecessary.

# produce 10M messages, each of size 20bytes, and with request.required.acks=1
$ time ./examples/rdkafka_performance -P -t test-topic -b kafka:9092 -s 20 -X queue.buffering.max.ms=1 -X request.required.acks=1 -c 10000000
config version real user sys rate msgs/s
latest 0m10.487s 0m16.164s 0m0.718s 954K
queue.buffering.max.ms=1000 latest 0m15.662s 0m18.275s 0m0.624s 639K
queue.buffering.max.ms=1000 update next-wakeup 0m15.096s 0m19.076s 0m0.651s 663K
queue.buffering.max.ms=1000 no rate-limit 1m7.589s 0m29.113s 0m41.555s 148K
queue.buffering.max.ms=1 latest 0m13.094s 0m18.854s 0m1.455s 765K
queue.buffering.max.ms=1 update next-wakeup 0m12.958s 0m17.014s 0m1.040s 772K
queue.buffering.max.ms=1 no rate-limit 1m47.763s 0m29.180s 0m16.338s 92K
queue.buffering.max.ms=10 latest 0m9.907s 0m15.080s 0m0.569s 1077K
queue.buffering.max.ms=10 update next-wakeup 0m9.514s 0m15.336s 0m0.764s 1052K
queue.buffering.max.ms=10 no rate-limit 1m22.794s 0m28.835s 0m55.055s 120K

Your concerns about too many unnecessary wakeups, when updating next-wakeup with linger.ms, are highlighted below. when linger.ms set to 0, and msg rate is minimal, the fix results in excessive cpu usage. We need to somehow eliminate this scenario. However, when set to 1ms, the increase in cpu usage is minimal. If users set a low linger.ms, isn't it reasonable for them to expect a little increase in cpu usage for the improved latencies?

config version real user sys %cpu rate msgs/s
queue.buffering.max.ms=0 latest 7m17.474s 0m39.214s 0m13.971s 60/6 22K
queue.buffering.max.ms=0 update next-wakeup 6m57.738s 4m5.322s 1m26.738s 100/30/8 25K
queue.buffering.max.ms=0 -r 1 -c 60 latest 1m0.018s 0m0.138s 0m0.020s 0.3
queue.buffering.max.ms=0 -r 1 -c 60 update next-wakeup 1m0.024s 2m15.261s 0m37.344s 290
queue.buffering.max.ms=1 -r 1 -c 60 latest 1m0.024s 0m0.134s 0m0.028s 0.3
queue.buffering.max.ms=1 -r 1 -c 60 update next-wakeup 1m0.015s 0m0.998s 0m1.249s 4

I also tested and made sure the latency spikes are resolved

config version Median Latency P99 latency(ms) P999 latency(ms)
queue.buffering.max.ms=10 -r 500 -c 30000 -l -A latency.txt latest 14 769 934
queue.buffering.max.ms=10 -r 1000 -c 60000 -l -A latency.txt latest 13 780 954
queue.buffering.max.ms=10 -r 500000 -c 10000000 -l -A latency.txt latest 12 53 200
queue.buffering.max.ms=10 -r 500 -c 30000 -l -A latency.txt update next-wakeup 13 20 43
queue.buffering.max.ms=10 -r 1000 -c 60000 -l -A latency.txt update next-wakeup 12 22 46
queue.buffering.max.ms=10 -r 500000 -c 10000000 -l -A latency.txt update next-wakeup 12 60 150
  • All tests are done from a producer running on a linux machine, and sending messages to a topic with 4 partitions on a remote cluster

@meetsitaram
Copy link
Author

@edenhill any comments?

@syepes
Copy link

syepes commented Dec 18, 2020

@edenhill | @meetsitaram
Is this rate_limit configurable without rebuilding the lib? if yes what's the flag

@sanjay24
Copy link
Contributor

sanjay24 commented Mar 7, 2021

@edenhill hope this gets picked soon!

@edenhill
Copy link
Contributor

edenhill commented Apr 13, 2021

Thanks for a great analysis and testing of the different approaches, @meetsitaram , and sorry for not picking this up sooner.

If users set a low linger.ms, isn't it reasonable for them to expect a little increase in cpu usage for the improved latencies?

I don't think that is an acceptable assumption, e.g., a low rate but low latency producer should not have high CPU usage.

A problem summary:
The underlying problem that the queue-wakeups aim to solve is having the same thread (broker thread) wait for both IO and queue-events/cond-var, and doing so by making queue-events IO events that will wake up poll().

So what we have is:

  • msgq wakeup rate-limiting, this was put in place for the case where the produce() interval < linger.ms, to limit the number of IO wakeups.
  • io-poll default timeout of 1s rather than linger.ms, but relying on the wakeups, to keep CPU down in idle periods.

But their combined logic does not cover the case where the produce interval is < linger.ms but we still want to adhere to linger.ms. For that we would need the rate-limiter on msg enqueue to say "don't wake it up now but in linger.ms", but there is no way to accomplish that with today's io-evented queues.
We could add a timer for this purpose, but I think it would be quite costly performance wise with an extra timer lock, scheduling of a timer, etc.

We arrive in this state when:

  • Message is produced, wakeup is triggered (ts_last updated)
  • Broker thread constructs a ProduceRequest and sends it.
  • Next producer_serve() does not have messages to send (no messages, or linger.ms > now), next_wakeup is set to INFINITE.
  • ops_io_serve() is called with the max_blocking_ms (1s) due to INFINITE.
  • Message is produced, but the rate-limit interval ts_rate has not elapsed - no wakeup is performed.
  • ops_io_serve() eventually times out after 1s..
  • Broker thread constructs a ProduceRequest and sends it..

Optimally we want to allow a maximum of one wakeup-events per wakeup, anything more than that is a waste of resources.
So what if we reset ts_last on each read from the queue (e.g, ops_io_serve)? The next message enqueue will be allowed to wake up the queue.

Edit: An even simpler implementation would be to just have a bool to track if a wakeup has been performed or not. Saves us from calling rd_clock().,

What do you all think?

P.S. The long term proper solution is to split out IO into its own thread and only do condvar-triggerd queues internally, but that's a big undertaking and not something we have scheduled.

edenhill added a commit that referenced this issue Apr 14, 2021
…of based on rate (#2912)

The IO-based wakeup (rkq_qio) now has a `sent` boolean that tracks whether
a wakeup has been sent, this boolean is reset by the queue reader each time
it polls the queue - effectively allowing only one wakeup-event per non-polling
period and voiding the need for rate-limiting the wakeups.
@edenhill
Copy link
Contributor

edenhill commented Apr 14, 2021

Please try the qwakeupfix branch.

edenhill added a commit that referenced this issue Apr 14, 2021
…of based on rate (#2912)

The IO-based wakeup (rkq_qio) now has a `sent` boolean that tracks whether
a wakeup has been sent, this boolean is reset by the queue reader each time
it polls the queue - effectively allowing only one wakeup-event per non-polling
period and voiding the need for rate-limiting the wakeups.
edenhill added a commit that referenced this issue Apr 14, 2021
…of based on rate (#2912)

The IO-based wakeup (rkq_qio) now has a `sent` boolean that tracks whether
a wakeup has been sent, this boolean is reset by the queue reader each time
it polls the queue - effectively allowing only one wakeup-event per non-polling
period and voiding the need for rate-limiting the wakeups.
@edenhill
Copy link
Contributor

Merged to master, so try master instead.

@edenhill
Copy link
Contributor

Were you able to verify the fixes on master?

@meetsitaram
Copy link
Author

Ran some tests using examples/rdkafka_performance and looks like the issue has been fixed without adding any extra overhead in terms of cpu or memory usage. Will integrate these changes with our app and update here if the spikes still happen. Here are some comparisons between old code (at b31363f before rate-limit fix : 9832f1d) against latest master with fix:

latencies

config version Median Latency P99 latency(ms) P999 latency(ms)
-X queue.buffering.max.ms=10 -r 500 -c 30000 old 7 764 976
-X queue.buffering.max.ms=10 -r 1000 -c 60000 old 6.7 758 978
-X queue.buffering.max.ms=10 -r 500000 -c 10000000 old 7.2 826 976
-X queue.buffering.max.ms=10 -r 500 -c 30000 latest master 6.4 11.8 12.1
-X queue.buffering.max.ms=10 -r 1000 -c 60000 latest master 5.8 11.7 11.9
-X queue.buffering.max.ms=10 -r 500000 -c 10000000 latest master 6.1 11.2 12.6

peak performance

config version real user sys rate msgs/s
-X queue.buffering.max.ms=1 old 0m6.416s 0m7.470s 0m0.486s 1.64M
-X queue.buffering.max.ms=10 old 0m5.295s 0m6.795s 0m0.419s 1.98M
-X queue.buffering.max.ms=1000 old 0m20.210s 0m7.774s 0m0.861s 470K
-X queue.buffering.max.ms=1 latest master 0m6.749s 0m7.950s 0m3.370s 1.47M
-X queue.buffering.max.ms=10 latest master 0m6.804s 0m8.127s 0m3.231s 1.44M
-X queue.buffering.max.ms=1000 latest master 0m6.780s 0m7.578s 0m3.452s 1.46M

resource usage at low rates

config version %cpu memory usage
-X queue.buffering.max.ms=0 -r 1 -c 60 old 0 1.5MB
-X queue.buffering.max.ms=10 -r 1 -c 60 old 0 1.5MB
-X queue.buffering.max.ms=0 -r 1 -c 60 latest master 0 1.5MB
-X queue.buffering.max.ms=10 -r 1 -c 60 latest master 0 1.5MB

resource usage at peak rates

config version %cpu memory usage
-X queue.buffering.max.ms=0 -c 10000000 old 165 24MB
-X queue.buffering.max.ms=10 -c 10000000 old 135 9MB
-X queue.buffering.max.ms=0 -c 10000000 latest master 165 25MB
-X queue.buffering.max.ms=10 -c 10000000 latest master 165 7MB

Will integrate these changes with our app and update this issue if the spikes still happen.

@meetsitaram
Copy link
Author

Thanks for the fix. Works like magic.

@meetsitaram
Copy link
Author

Can you take a look at increase in system time at peak rates? It is now around 0m3.370s with the fix instead of 0m0.486s earlier. Is that expected?

@edenhill
Copy link
Contributor

Thank you, again, for verifying this fix!

As for the increased system times, isn't that expected since without the rate-limiter we now do a lot more wakeups?
E.g., the P99 median latency dropped from 800 to 11 ms with the fix, thus at least x72 more wakeups (thus syscalls).

@edenhill
Copy link
Contributor

edenhill commented Apr 5, 2022

The wakeup rate-limiting discussed here and in #3538 that was introduced in v1.7.0 proved to be problematic for certain produce patterns, so I redesigned the wakeup signalling and made it more formal (https://github.com/edenhill/librdkafka/blob/qlatency/src/rdkafka_msg.c#L1671-L1813).

From my testing we're now back at v1.6 levels of thruput, or even higher, with lower CPU usage, and the wakeup signalling should also fix the corner-cases that v1.7.0 addressed as well as the ones that v1.7.0 added.

It would be great if you could try out the qlatency branch with your workloads and see how it behaves.

cc @meetsitaram @shanson7

@edenhill
Copy link
Contributor

edenhill commented Apr 8, 2022

This is now merged to master.
Would still be great for you to try it out.
I'll close this issue to mark it as resolved for now, though.

@edenhill edenhill closed this as completed Apr 8, 2022
@meetsitaram
Copy link
Author

Thanks for the fix. Overall, i do not see any issues with our use case. Below are some test results with various scenarios.

latencies

  • latencies look good, except for a small increase in the p999 values (might be specific to my local setup), but the latency spikes of 1 sec are not there.
config version Median Latency P99 latency(ms) P999 latency(ms)
-X queue.buffering.max.ms=10 -r 500 -c 30000 latest master 6.7 13.4 16.7
-X queue.buffering.max.ms=10 -r 1000 -c 60000 latest master 5.9 11.9 23.2
-X queue.buffering.max.ms=10 -r 500000 -c 10000000 latest master 12 18.3 42.8

peak performance

  • peak performance looks good (relatively better than before) and system times are back to normal values that were seen prior to earlier fix.
config version real user sys rate msgs/s
-X queue.buffering.max.ms=1 latest master 0m4.777s 0m6.218s 0m0.335s 2.1M
-X queue.buffering.max.ms=10 latest master 0m5.244s 0m6.678s 0m0.377s 1.92M
-X queue.buffering.max.ms=1000 latest master 0m5.311s 0m6.778s 0m0.358s 1.89M

resource usage at low rates

  • do not see any significant increase in resources at low rates
config version %cpu memory usage
-X queue.buffering.max.ms=0 -r 1 -c 60 latest master 0 1.9MB
-X queue.buffering.max.ms=10 -r 1 -c 60 latest master 0 1.9MB

resource usage at peak rates

  • resource utilization looks good and is same as before at peak rates
config version %cpu memory usage
-X queue.buffering.max.ms=0 -c 10000000 latest master 165 20MB
-X queue.buffering.max.ms=10 -c 10000000 latest master 135 9MB

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants