Skip to content

Commit

Permalink
[KIP-714] Additional consumer metrics (#4808)
Browse files Browse the repository at this point in the history
add missing consumer metrics described in the KIP:

* consumer.coordinator.rebalance.latency.avg
* consumer.coordinator.rebalance.latency.max
* consumer.coordinator.rebalance.latency.total
* consumer.fetch.manager.fetch.latency.avg
* consumer.fetch.manager.fetch.latency.max
* consumer.poll.idle.ratio.avg
* consumer.coordinator.commit.latency.avg
* consumer.coordinator.commit.latency.max

additionally:

* add unit tests for all the metrics
* add integrations tests with the producer or consumer while they're active
* configurable group initial rebalance delay ms to make integration tests reusable with both producer and consumer
---------
Co-authored-by: mahajanadhitya <amahajan@confluent.io>
Co-authored-by: Anchit Jain <anjain@confluent.io>
Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>
  • Loading branch information
4 people authored Oct 3, 2024
1 parent f55f3ec commit d5e1f9f
Show file tree
Hide file tree
Showing 17 changed files with 1,029 additions and 270 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
# librdkafka v2.6.0

librdkafka v2.6.0 is a feature release:

* [KIP-714] Complete consumer metrics support (#4808).
* Fix for permanent fetch errors when using a newer Fetch RPC version with an older
inter broker protocol (#4806).


## Fixes

### Consumer fixes

* Issues: #4806
Fix for permanent fetch errors when brokers support a Fetch RPC version greater than 12
but cluster is configured to use an inter broker protocol that is less than 2.8.
In this case returned topic ids are zero valued and Fetch has to fall back
to version 12, using topic names.
Happening since v2.5.0 (#4806)



# librdkafka v2.5.3

librdkafka v2.5.3 is a feature release.
Expand Down
48 changes: 41 additions & 7 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,18 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) {
rd_kafka_assignment_destroy(rk);
if (rk->rk_consumer.q)
rd_kafka_q_destroy(rk->rk_consumer.q);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency);
rd_avg_destroy(
&rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency);
}

/* Purge op-queues */
Expand Down Expand Up @@ -2534,6 +2546,29 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
rk->rk_consumer.q = rd_kafka_q_keep(rk->rk_rep);
}

rd_avg_init(
&rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio,
RD_AVG_GAUGE, 0, 1, 2, rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio,
RD_AVG_GAUGE, 0, 1, 2, rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2,
rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency,
RD_AVG_GAUGE, 0, 900000 * 1000, 2,
rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2,
rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2,
rk->rk_conf.enable_metrics_push);

} else if (type == RD_KAFKA_PRODUCER) {
rk->rk_eos.transactional_id =
rd_kafkap_str_new(rk->rk_conf.eos.transactional_id, -1);
Expand Down Expand Up @@ -3143,8 +3178,7 @@ static rd_kafka_op_res_t rd_kafka_consume_callback0(
struct consume_ctx ctx = {.consume_cb = consume_cb, .opaque = opaque};
rd_kafka_op_res_t res;

if (timeout_ms)
rd_kafka_app_poll_blocking(rkq->rkq_rk);
rd_kafka_app_poll_start(rkq->rkq_rk, 0, timeout_ms);

res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, RD_KAFKA_Q_CB_RETURN,
rd_kafka_consume_cb, &ctx);
Expand Down Expand Up @@ -3212,16 +3246,15 @@ static rd_kafka_message_t *
rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
rd_kafka_op_t *rko;
rd_kafka_message_t *rkmessage = NULL;
rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
rd_ts_t now = rd_clock();
rd_ts_t abs_timeout = rd_timeout_init0(now, timeout_ms);

if (timeout_ms)
rd_kafka_app_poll_blocking(rk);
rd_kafka_app_poll_start(rk, now, timeout_ms);

rd_kafka_yield_thread = 0;
while ((
rko = rd_kafka_q_pop(rkq, rd_timeout_remains_us(abs_timeout), 0))) {
rd_kafka_op_res_t res;

res =
rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL);

Expand Down Expand Up @@ -3894,7 +3927,8 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
else {
struct consume_ctx ctx = {.consume_cb =
rk->rk_ts_last_poll_end = rd_clock();
struct consume_ctx ctx = {.consume_cb =
rk->rk_conf.consume_cb,
.opaque = rk->rk_conf.opaque};

Expand Down
28 changes: 27 additions & 1 deletion src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,16 @@ static rd_kafka_buf_t *rd_kafka_waitresp_find(rd_kafka_broker_t *rkb,
rd_avg_add(&rkb->rkb_avg_rtt, rkbuf->rkbuf_ts_sent);
rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_rtt,
rkbuf->rkbuf_ts_sent);

if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Fetch) {
rd_avg_add(&rkb->rkb_telemetry.rd_avg_current
.rkb_avg_fetch_latency,
rkbuf->rkbuf_ts_sent);
} else if (rkbuf->rkbuf_reqhdr.ApiKey ==
RD_KAFKAP_OffsetCommit) {
rd_avg_add(&rkb->rkb_rk->rk_telemetry.rd_avg_current
.rk_avg_commit_latency,
rkbuf->rkbuf_ts_sent);
}
if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING &&
rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, 1) == 1)
rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
Expand Down Expand Up @@ -4809,6 +4818,13 @@ void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency);
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency);
if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency);
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency);
}


mtx_lock(&rkb->rkb_logname_lock);
rd_free(rkb->rkb_logname);
Expand Down Expand Up @@ -4921,6 +4937,16 @@ rd_kafka_broker_t *rd_kafka_broker_add(rd_kafka_t *rk,
rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency,
RD_AVG_GAUGE, 0, 100 * 1000, 2,
rk->rk_conf.enable_metrics_push);
if (rk->rk_type == RD_KAFKA_CONSUMER) {
rd_avg_init(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2,
rk->rk_conf.enable_metrics_push);
rd_avg_init(
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2,
rk->rk_conf.enable_metrics_push);
}

rd_refcnt_init(&rkb->rkb_refcnt, 0);
rd_kafka_broker_keep(rkb); /* rk_broker's refcount */
Expand Down
14 changes: 10 additions & 4 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,21 +198,27 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
int32_t connects; /**< Connection attempts,
* successful or not. */
} rkb_historic_c;

struct {
rd_avg_t rkb_avg_rtt; /* Current RTT avg */
rd_avg_t rkb_avg_throttle; /* Current throttle avg */
rd_avg_t
rkb_avg_outbuf_latency; /**< Current latency
* between buf_enq0
* and writing to socket
*/
rkb_avg_outbuf_latency; /**< Current latency
* between buf_enq0
* and writing to socket
*/
rd_avg_t rkb_avg_fetch_latency; /**< Current fetch
* latency avg */
} rd_avg_current;

struct {
rd_avg_t rkb_avg_rtt; /**< Rolled over RTT avg */
rd_avg_t
rkb_avg_throttle; /**< Rolled over throttle avg */
rd_avg_t rkb_avg_outbuf_latency; /**< Rolled over outbuf
* latency avg */
rd_avg_t rkb_avg_fetch_latency; /**< Rolled over fetch
* latency avg */
} rd_avg_rollover;
} rkb_telemetry;

Expand Down
16 changes: 15 additions & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,18 @@ void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state) {
if ((int)rkcg->rkcg_join_state == join_state)
return;

if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT ||
rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
/* Start timer when leaving the INIT or STEADY state */
rkcg->rkcg_ts_rebalance_start = rd_clock();
} else if (join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
/* End timer when reaching the STEADY state */
rd_dassert(rkcg->rkcg_ts_rebalance_start);
rd_avg_add(&rkcg->rkcg_rk->rk_telemetry.rd_avg_current
.rk_avg_rebalance_latency,
rd_clock() - rkcg->rkcg_ts_rebalance_start);
}

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE",
"Group \"%.*s\" changed join state %s -> %s "
"(state %s)",
Expand Down Expand Up @@ -6497,7 +6509,9 @@ static rd_kafka_op_res_t rd_kafka_cgrp_op_serve(rd_kafka_t *rk,
break;

case RD_KAFKA_OP_SUBSCRIBE:
rd_kafka_app_polled(rk);
/* We just want to avoid reaching max poll interval,
* without anything else is done on poll. */
rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock());

/* New atomic subscription (may be NULL) */
if (rkcg->rkcg_group_protocol ==
Expand Down
3 changes: 3 additions & 0 deletions src/rdkafka_cgrp.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ typedef struct rd_kafka_cgrp_s {
* assignment */
} rkcg_c;

/* Timestamp of last rebalance start */
rd_ts_t rkcg_ts_rebalance_start;

} rd_kafka_cgrp_t;


Expand Down
69 changes: 64 additions & 5 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,20 @@ struct rd_kafka_s {
* (or equivalent).
* Used to enforce
* max.poll.interval.ms.
* Only relevant for consumer. */
* Set to INT64_MAX while polling
* to avoid reaching
* max.poll.interval.ms. during that time
* frame. Only relevant for consumer. */
rd_ts_t rk_ts_last_poll_start; /**< Timestamp of last application
* consumer_poll() call start
* Only relevant for consumer.
* Not an atomic as Kafka consumer
* isn't thread safe. */
rd_ts_t rk_ts_last_poll_end; /**< Timestamp of last application
* consumer_poll() call end
* Only relevant for consumer.
* Not an atomic as Kafka consumer
* isn't thread safe. */
/* First fatal error. */
struct {
rd_atomic32_t err; /**< rd_kafka_resp_err_t */
Expand Down Expand Up @@ -696,8 +709,28 @@ struct rd_kafka_s {
rd_ts_t ts_last; /**< Timestamp of last push */
rd_ts_t ts_start; /**< Timestamp from when collection
* started */
/** Total rebalance latency (ms) up to previous push */
uint64_t rebalance_latency_total;
} rk_historic_c;

struct {
rd_avg_t rk_avg_poll_idle_ratio;
rd_avg_t rk_avg_commit_latency; /**< Current commit
* latency avg */
rd_avg_t
rk_avg_rebalance_latency; /**< Current rebalance
* latency avg */
} rd_avg_current;

struct {
rd_avg_t rk_avg_poll_idle_ratio;
rd_avg_t rk_avg_commit_latency; /**< Rolled over commit
* latency avg */
rd_avg_t
rk_avg_rebalance_latency; /**< Rolled over rebalance
* latency avg */
} rd_avg_rollover;

} rk_telemetry;

/* Test mocks */
Expand Down Expand Up @@ -1093,7 +1126,7 @@ static RD_INLINE RD_UNUSED int rd_kafka_max_poll_exceeded(rd_kafka_t *rk) {
last_poll = rd_atomic64_get(&rk->rk_ts_last_poll);

/* Application is blocked in librdkafka function, see
* rd_kafka_app_poll_blocking(). */
* rd_kafka_app_poll_start(). */
if (last_poll == INT64_MAX)
return 0;

Expand All @@ -1119,9 +1152,30 @@ static RD_INLINE RD_UNUSED int rd_kafka_max_poll_exceeded(rd_kafka_t *rk) {
* @locality any
* @locks none
*/
static RD_INLINE RD_UNUSED void rd_kafka_app_poll_blocking(rd_kafka_t *rk) {
if (rk->rk_type == RD_KAFKA_CONSUMER)
static RD_INLINE RD_UNUSED void
rd_kafka_app_poll_start(rd_kafka_t *rk, rd_ts_t now, rd_bool_t is_blocking) {
if (rk->rk_type != RD_KAFKA_CONSUMER)
return;

if (!now)
now = rd_clock();
if (is_blocking)
rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX);
if (rk->rk_ts_last_poll_end) {
int64_t poll_idle_ratio = 0;
rd_ts_t poll_interval = now - rk->rk_ts_last_poll_start;
if (poll_interval) {
rd_ts_t idle_interval =
rk->rk_ts_last_poll_end - rk->rk_ts_last_poll_start;
poll_idle_ratio =
idle_interval * 1000000 / poll_interval;
}
rd_avg_add(
&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio,
poll_idle_ratio);
rk->rk_ts_last_poll_start = now;
rk->rk_ts_last_poll_end = 0;
}
}

/**
Expand All @@ -1134,7 +1188,8 @@ static RD_INLINE RD_UNUSED void rd_kafka_app_poll_blocking(rd_kafka_t *rk) {
*/
static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) {
if (rk->rk_type == RD_KAFKA_CONSUMER) {
rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock());
rd_ts_t now = rd_clock();
rd_atomic64_set(&rk->rk_ts_last_poll, now);
if (unlikely(rk->rk_cgrp &&
rk->rk_cgrp->rkcg_group_protocol ==
RD_KAFKA_GROUP_PROTOCOL_CONSUMER &&
Expand All @@ -1144,6 +1199,10 @@ static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) {
rk->rk_cgrp,
"app polled after poll interval exceeded");
}
if (!rk->rk_ts_last_poll_end)
rk->rk_ts_last_poll_end = now;
rd_dassert(rk->rk_ts_last_poll_end >=
rk->rk_ts_last_poll_start);
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2534,6 +2534,14 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,
return RD_KAFKA_RESP_ERR_NO_ERROR;
}

void rd_kafka_mock_group_initial_rebalance_delay_ms(
rd_kafka_mock_cluster_t *mcluster,
int32_t delay_ms) {
mtx_lock(&mcluster->lock);
mcluster->defaults.group_initial_rebalance_delay_ms = delay_ms;
mtx_unlock(&mcluster->lock);
}


static rd_kafka_op_res_t
rd_kafka_mock_cluster_op_serve(rd_kafka_t *rk,
Expand Down Expand Up @@ -2694,7 +2702,8 @@ rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new(rd_kafka_t *rk,
TAILQ_INIT(&mcluster->topics);
mcluster->defaults.partition_cnt = 4;
mcluster->defaults.replication_factor = RD_MIN(3, broker_cnt);
mcluster->track_requests = rd_false;
mcluster->defaults.group_initial_rebalance_delay_ms = 3000;
mcluster->track_requests = rd_false;

TAILQ_INIT(&mcluster->cgrps);

Expand Down
9 changes: 9 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ rd_kafka_mock_push_request_errors_array(rd_kafka_mock_cluster_t *mcluster,
const rd_kafka_resp_err_t *errors);


/**
* @brief Apply broker configuration group.initial.rebalance.delay.ms
* to the whole \p mcluster.
*/
RD_EXPORT void rd_kafka_mock_group_initial_rebalance_delay_ms(
rd_kafka_mock_cluster_t *mcluster,
int32_t delay_ms);


/**
* @brief Push \p cnt errors and RTT tuples in the \p ... va-arg list onto
* the broker's error stack for the given \p ApiKey.
Expand Down
Loading

0 comments on commit d5e1f9f

Please sign in to comment.