Skip to content

Commit

Permalink
Address all comments plus:
Browse files Browse the repository at this point in the history
- add unit tests for all the metrics
- add integrations tests with the producer or consumer while they're active
- configurable group initial rebalance delay ms to make integration tests reusable with both producer and consumer
  • Loading branch information
emasab authored Oct 1, 2024
1 parent e979a1b commit 6de0833
Show file tree
Hide file tree
Showing 16 changed files with 747 additions and 449 deletions.
76 changes: 36 additions & 40 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -961,24 +961,21 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) {
rd_kafka_assignors_term(rk);

if (rk->rk_type == RD_KAFKA_CONSUMER) {
rd_kafka_assignment_destroy(rk);
if (rk->rk_consumer.q)
rd_kafka_q_destroy(rk->rk_consumer.q);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_current.rd_avg_poll_idle_ratio);
&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_rollover.rd_avg_poll_idle_ratio);

&rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_current.rd_avg_commit_latency);
&rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_rollover.rd_avg_commit_latency);

&rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_current.rd_avg_rebalance_latency);
&rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency);

rd_kafka_assignment_destroy(rk);
if (rk->rk_consumer.q)
rd_kafka_q_destroy(rk->rk_consumer.q);
&rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency);
}

/* Purge op-queues */
Expand Down Expand Up @@ -2535,27 +2532,6 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
#endif

if (type == RD_KAFKA_CONSUMER) {
rd_avg_init(
&rk->rk_telemetry.rd_avg_current.rd_avg_poll_idle_ratio,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(
&rk->rk_telemetry.rd_avg_rollover.rd_avg_poll_idle_ratio,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);

rd_avg_init(
&rk->rk_telemetry.rd_avg_current.rd_avg_commit_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(
&rk->rk_telemetry.rd_avg_rollover.rd_avg_commit_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);

rd_avg_init(
&rk->rk_telemetry.rd_avg_current.rd_avg_rebalance_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(
&rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);

rd_kafka_assignment_init(rk);

if (RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) {
Expand All @@ -2570,6 +2546,29 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
rk->rk_consumer.q = rd_kafka_q_keep(rk->rk_rep);
}

rd_avg_init(
&rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio,
RD_AVG_GAUGE, 0, 1, 2, rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio,
RD_AVG_GAUGE, 0, 1, 2, rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2,
rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency,
RD_AVG_GAUGE, 0, 900000 * 1000, 2,
rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2,
rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2,
rk->rk_conf.enable_metrics_push);

} else if (type == RD_KAFKA_PRODUCER) {
rk->rk_eos.transactional_id =
rd_kafkap_str_new(rk->rk_conf.eos.transactional_id, -1);
Expand Down Expand Up @@ -3179,8 +3178,7 @@ static rd_kafka_op_res_t rd_kafka_consume_callback0(
struct consume_ctx ctx = {.consume_cb = consume_cb, .opaque = opaque};
rd_kafka_op_res_t res;

if (timeout_ms)
rd_kafka_app_poll_start(rkq->rkq_rk, rd_true);
rd_kafka_app_poll_start(rkq->rkq_rk, 0, timeout_ms);

res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, RD_KAFKA_Q_CB_RETURN,
rd_kafka_consume_cb, &ctx);
Expand Down Expand Up @@ -3248,10 +3246,10 @@ static rd_kafka_message_t *
rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
rd_kafka_op_t *rko;
rd_kafka_message_t *rkmessage = NULL;
rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
rd_ts_t now = rd_clock();
rd_ts_t abs_timeout = rd_timeout_init0(now, timeout_ms);

if (timeout_ms)
rd_kafka_app_poll_start(rk, rd_true);
rd_kafka_app_poll_start(rk, now, timeout_ms);

rd_kafka_yield_thread = 0;
while ((
Expand Down Expand Up @@ -3924,8 +3922,6 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,

switch ((int)rko->rko_type) {
case RD_KAFKA_OP_FETCH:
rk->rk_ts_last_poll_start = rd_clock();
rk->rk_ts_last_poll_end = 0;
if (!rk->rk_conf.consume_cb ||
cb_type == RD_KAFKA_Q_CB_RETURN ||
cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
Expand Down
26 changes: 17 additions & 9 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1875,12 +1875,12 @@ static rd_kafka_buf_t *rd_kafka_waitresp_find(rd_kafka_broker_t *rkb,
rkbuf->rkbuf_ts_sent);
if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Fetch) {
rd_avg_add(&rkb->rkb_telemetry.rd_avg_current
.rd_avg_fetch_latency,
.rkb_avg_fetch_latency,
rkbuf->rkbuf_ts_sent);
} else if (rkbuf->rkbuf_reqhdr.ApiKey ==
RD_KAFKAP_OffsetCommit) {
rd_avg_add(&rkb->rkb_rk->rk_telemetry.rd_avg_current
.rd_avg_commit_latency,
.rk_avg_commit_latency,
rkbuf->rkbuf_ts_sent);
}
if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING &&
Expand Down Expand Up @@ -4818,9 +4818,13 @@ void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency);
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency);
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_rollover.rd_avg_fetch_latency);
rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_current.rd_avg_fetch_latency);
if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency);
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency);
}


mtx_lock(&rkb->rkb_logname_lock);
rd_free(rkb->rkb_logname);
Expand Down Expand Up @@ -4933,12 +4937,16 @@ rd_kafka_broker_t *rd_kafka_broker_add(rd_kafka_t *rk,
rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency,
RD_AVG_GAUGE, 0, 100 * 1000, 2,
rk->rk_conf.enable_metrics_push);
rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rd_avg_fetch_latency,
RD_AVG_GAUGE, 0, 100 * 1000, 2,
if (rk->rk_type == RD_KAFKA_CONSUMER) {
rd_avg_init(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2,
rk->rk_conf.enable_metrics_push);
rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rd_avg_fetch_latency,
RD_AVG_GAUGE, 0, 100 * 1000, 2,
rd_avg_init(
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2,
rk->rk_conf.enable_metrics_push);
}

rd_refcnt_init(&rkb->rkb_refcnt, 0);
rd_kafka_broker_keep(rkb); /* rk_broker's refcount */
Expand Down
18 changes: 10 additions & 8 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,25 +198,27 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
int32_t connects; /**< Connection attempts,
* successful or not. */
} rkb_historic_c;

struct {
rd_avg_t rkb_avg_rtt; /* Current RTT avg */
rd_avg_t rkb_avg_throttle; /* Current throttle avg */
rd_avg_t
rkb_avg_outbuf_latency; /**< Current latency
* between buf_enq0
* and writing to socket
*/
rd_avg_t rd_avg_fetch_latency; /**< Current fetch
latency avg */
rkb_avg_outbuf_latency; /**< Current latency
* between buf_enq0
* and writing to socket
*/
rd_avg_t rkb_avg_fetch_latency; /**< Current fetch
* latency avg */
} rd_avg_current;

struct {
rd_avg_t rkb_avg_rtt; /**< Rolled over RTT avg */
rd_avg_t
rkb_avg_throttle; /**< Rolled over throttle avg */
rd_avg_t rkb_avg_outbuf_latency; /**< Rolled over outbuf
* latency avg */
rd_avg_t rd_avg_fetch_latency; /**< Rolled over fetch
latency avg */
rd_avg_t rkb_avg_fetch_latency; /**< Rolled over fetch
* latency avg */
} rd_avg_rollover;
} rkb_telemetry;

Expand Down
23 changes: 14 additions & 9 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -359,16 +359,19 @@ static int rd_kafka_cgrp_set_state(rd_kafka_cgrp_t *rkcg, int state) {
void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state) {
if ((int)rkcg->rkcg_join_state == join_state)
return;
if (join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
rd_avg_add(&rkcg->rkcg_curr_coord->rkb_rk->rk_telemetry
.rd_avg_current.rd_avg_rebalance_latency,
rd_clock() - rkcg->rkcg_ts_rebalance_start);
} else if (((int)rkcg->rkcg_join_state ==
RD_KAFKA_CGRP_JOIN_STATE_STEADY) ||
((int)rkcg->rkcg_join_state ==
RD_KAFKA_CGRP_JOIN_STATE_INIT)) {

if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT ||
rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
/* Start timer when leaving the INIT or STEADY state */
rkcg->rkcg_ts_rebalance_start = rd_clock();
} else if (join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
/* End timer when reaching the STEADY state */
rd_dassert(rkcg->rkcg_ts_rebalance_start);
rd_avg_add(&rkcg->rkcg_rk->rk_telemetry.rd_avg_current
.rk_avg_rebalance_latency,
rd_clock() - rkcg->rkcg_ts_rebalance_start);
}

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE",
"Group \"%.*s\" changed join state %s -> %s "
"(state %s)",
Expand Down Expand Up @@ -6506,7 +6509,9 @@ static rd_kafka_op_res_t rd_kafka_cgrp_op_serve(rd_kafka_t *rk,
break;

case RD_KAFKA_OP_SUBSCRIBE:
rd_kafka_app_polled(rk);
/* We just want to avoid reaching max poll interval,
* without anything else is done on poll. */
rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock());

/* New atomic subscription (may be NULL) */
if (rkcg->rkcg_group_protocol ==
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_cgrp.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ typedef struct rd_kafka_cgrp_s {
* assignment */
} rkcg_c;

/* Timestamp of last rebalance start */
rd_ts_t rkcg_ts_rebalance_start;

} rd_kafka_cgrp_t;
Expand Down
Loading

0 comments on commit 6de0833

Please sign in to comment.