Skip to content

Commit

Permalink
Rate limit IO-based queue wakeups to linger.ms (#2509)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Sep 10, 2019
1 parent a12b909 commit c103cfd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,8 @@ void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, int fd,
qio->fd = fd;
qio->size = size;
qio->payload = (void *)(qio+1);
qio->ts_rate = rkq->rkq_rk->rk_conf.buffering_max_us;
qio->ts_last = 0;
qio->event_cb = NULL;
qio->event_cb_opaque = NULL;
memcpy(qio->payload, payload, size);
Expand Down
9 changes: 9 additions & 0 deletions src/rdkafka_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ struct rd_kafka_q_io {
int fd;
void *payload;
size_t size;
rd_ts_t ts_rate; /**< How often the IO wakeup may be performed (us) */
rd_ts_t ts_last; /**< Last IO wakeup */
/* For callback-based signalling */
void (*event_cb) (rd_kafka_t *rk, void *opaque);
void *event_cb_opaque;
Expand Down Expand Up @@ -288,6 +290,7 @@ static RD_INLINE RD_UNUSED int rd_kafka_q_is_fwded (rd_kafka_q_t *rkq) {
*/
static RD_INLINE RD_UNUSED
void rd_kafka_q_io_event (rd_kafka_q_t *rkq) {
rd_ts_t now;

if (likely(!rkq->rkq_qio))
return;
Expand All @@ -297,6 +300,12 @@ void rd_kafka_q_io_event (rd_kafka_q_t *rkq) {
return;
}

now = rd_clock();
if (likely(rkq->rkq_qio->ts_last + rkq->rkq_qio->ts_rate > now))
return;

rkq->rkq_qio->ts_last = now;

/* Ignore errors, not much to do anyway. */
if (rd_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload,
(int)rkq->rkq_qio->size) == -1)
Expand Down

0 comments on commit c103cfd

Please sign in to comment.