diff --git a/src/rdkafka.c b/src/rdkafka.c index 27730d702..56193dab5 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -961,24 +961,21 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) { rd_kafka_assignors_term(rk); if (rk->rk_type == RD_KAFKA_CONSUMER) { + rd_kafka_assignment_destroy(rk); + if (rk->rk_consumer.q) + rd_kafka_q_destroy(rk->rk_consumer.q); rd_avg_destroy( - &rk->rk_telemetry.rd_avg_current.rd_avg_poll_idle_ratio); + &rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio); rd_avg_destroy( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_poll_idle_ratio); - + &rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency); rd_avg_destroy( - &rk->rk_telemetry.rd_avg_current.rd_avg_commit_latency); + &rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency); rd_avg_destroy( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_commit_latency); - + &rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio); rd_avg_destroy( - &rk->rk_telemetry.rd_avg_current.rd_avg_rebalance_latency); + &rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency); rd_avg_destroy( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency); - - rd_kafka_assignment_destroy(rk); - if (rk->rk_consumer.q) - rd_kafka_q_destroy(rk->rk_consumer.q); + &rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency); } /* Purge op-queues */ @@ -2535,27 +2532,6 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, #endif if (type == RD_KAFKA_CONSUMER) { - rd_avg_init( - &rk->rk_telemetry.rd_avg_current.rd_avg_poll_idle_ratio, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_avg_init( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_poll_idle_ratio, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - - rd_avg_init( - &rk->rk_telemetry.rd_avg_current.rd_avg_commit_latency, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_avg_init( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_commit_latency, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - - rd_avg_init( - &rk->rk_telemetry.rd_avg_current.rd_avg_rebalance_latency, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_avg_init( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_kafka_assignment_init(rk); if (RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) { @@ -2570,6 +2546,29 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rk->rk_consumer.q = rd_kafka_q_keep(rk->rk_rep); } + rd_avg_init( + &rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio, + RD_AVG_GAUGE, 0, 1, 2, rk->rk_conf.enable_metrics_push); + rd_avg_init( + &rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio, + RD_AVG_GAUGE, 0, 1, 2, rk->rk_conf.enable_metrics_push); + rd_avg_init( + &rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency, + RD_AVG_GAUGE, 0, 500 * 1000, 2, + rk->rk_conf.enable_metrics_push); + rd_avg_init( + &rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency, + RD_AVG_GAUGE, 0, 900000 * 1000, 2, + rk->rk_conf.enable_metrics_push); + rd_avg_init( + &rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency, + RD_AVG_GAUGE, 0, 500 * 1000, 2, + rk->rk_conf.enable_metrics_push); + rd_avg_init( + &rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency, + RD_AVG_GAUGE, 0, 500 * 1000, 2, + rk->rk_conf.enable_metrics_push); + } else if (type == RD_KAFKA_PRODUCER) { rk->rk_eos.transactional_id = rd_kafkap_str_new(rk->rk_conf.eos.transactional_id, -1); @@ -3179,8 +3178,7 @@ static rd_kafka_op_res_t rd_kafka_consume_callback0( struct consume_ctx ctx = {.consume_cb = consume_cb, .opaque = opaque}; rd_kafka_op_res_t res; - if (timeout_ms) - rd_kafka_app_poll_start(rkq->rkq_rk, rd_true); + rd_kafka_app_poll_start(rkq->rkq_rk, 0, timeout_ms); res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, RD_KAFKA_Q_CB_RETURN, rd_kafka_consume_cb, &ctx); @@ -3248,10 +3246,10 @@ static rd_kafka_message_t * rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) { rd_kafka_op_t *rko; rd_kafka_message_t *rkmessage = NULL; - rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); + rd_ts_t now = rd_clock(); + rd_ts_t abs_timeout = rd_timeout_init0(now, timeout_ms); - if (timeout_ms) - rd_kafka_app_poll_start(rk, rd_true); + rd_kafka_app_poll_start(rk, now, timeout_ms); rd_kafka_yield_thread = 0; while (( @@ -3924,8 +3922,6 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, switch ((int)rko->rko_type) { case RD_KAFKA_OP_FETCH: - rk->rk_ts_last_poll_start = rd_clock(); - rk->rk_ts_last_poll_end = 0; if (!rk->rk_conf.consume_cb || cb_type == RD_KAFKA_Q_CB_RETURN || cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 8d18b8b22..638db41eb 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -1875,12 +1875,12 @@ static rd_kafka_buf_t *rd_kafka_waitresp_find(rd_kafka_broker_t *rkb, rkbuf->rkbuf_ts_sent); if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Fetch) { rd_avg_add(&rkb->rkb_telemetry.rd_avg_current - .rd_avg_fetch_latency, + .rkb_avg_fetch_latency, rkbuf->rkbuf_ts_sent); } else if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_OffsetCommit) { rd_avg_add(&rkb->rkb_rk->rk_telemetry.rd_avg_current - .rd_avg_commit_latency, + .rk_avg_commit_latency, rkbuf->rkbuf_ts_sent); } if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING && @@ -4818,9 +4818,13 @@ void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) { &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency); rd_avg_destroy( &rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency); - rd_avg_destroy( - &rkb->rkb_telemetry.rd_avg_rollover.rd_avg_fetch_latency); - rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_current.rd_avg_fetch_latency); + if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) { + rd_avg_destroy( + &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency); + rd_avg_destroy( + &rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency); + } + mtx_lock(&rkb->rkb_logname_lock); rd_free(rkb->rkb_logname); @@ -4933,12 +4937,16 @@ rd_kafka_broker_t *rd_kafka_broker_add(rd_kafka_t *rk, rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency, RD_AVG_GAUGE, 0, 100 * 1000, 2, rk->rk_conf.enable_metrics_push); - rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rd_avg_fetch_latency, - RD_AVG_GAUGE, 0, 100 * 1000, 2, + if (rk->rk_type == RD_KAFKA_CONSUMER) { + rd_avg_init( + &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency, + RD_AVG_GAUGE, 0, 500 * 1000, 2, rk->rk_conf.enable_metrics_push); - rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rd_avg_fetch_latency, - RD_AVG_GAUGE, 0, 100 * 1000, 2, + rd_avg_init( + &rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency, + RD_AVG_GAUGE, 0, 500 * 1000, 2, rk->rk_conf.enable_metrics_push); + } rd_refcnt_init(&rkb->rkb_refcnt, 0); rd_kafka_broker_keep(rkb); /* rk_broker's refcount */ diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index 13b8783c3..0011dd5cb 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -198,25 +198,27 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */ int32_t connects; /**< Connection attempts, * successful or not. */ } rkb_historic_c; + struct { rd_avg_t rkb_avg_rtt; /* Current RTT avg */ rd_avg_t rkb_avg_throttle; /* Current throttle avg */ rd_avg_t - rkb_avg_outbuf_latency; /**< Current latency - * between buf_enq0 - * and writing to socket - */ - rd_avg_t rd_avg_fetch_latency; /**< Current fetch - latency avg */ + rkb_avg_outbuf_latency; /**< Current latency + * between buf_enq0 + * and writing to socket + */ + rd_avg_t rkb_avg_fetch_latency; /**< Current fetch + * latency avg */ } rd_avg_current; + struct { rd_avg_t rkb_avg_rtt; /**< Rolled over RTT avg */ rd_avg_t rkb_avg_throttle; /**< Rolled over throttle avg */ rd_avg_t rkb_avg_outbuf_latency; /**< Rolled over outbuf * latency avg */ - rd_avg_t rd_avg_fetch_latency; /**< Rolled over fetch - latency avg */ + rd_avg_t rkb_avg_fetch_latency; /**< Rolled over fetch + * latency avg */ } rd_avg_rollover; } rkb_telemetry; diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 9b482f226..547ec1eb9 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -359,16 +359,19 @@ static int rd_kafka_cgrp_set_state(rd_kafka_cgrp_t *rkcg, int state) { void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state) { if ((int)rkcg->rkcg_join_state == join_state) return; - if (join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) { - rd_avg_add(&rkcg->rkcg_curr_coord->rkb_rk->rk_telemetry - .rd_avg_current.rd_avg_rebalance_latency, - rd_clock() - rkcg->rkcg_ts_rebalance_start); - } else if (((int)rkcg->rkcg_join_state == - RD_KAFKA_CGRP_JOIN_STATE_STEADY) || - ((int)rkcg->rkcg_join_state == - RD_KAFKA_CGRP_JOIN_STATE_INIT)) { + + if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT || + rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) { + /* Start timer when leaving the INIT or STEADY state */ rkcg->rkcg_ts_rebalance_start = rd_clock(); + } else if (join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) { + /* End timer when reaching the STEADY state */ + rd_dassert(rkcg->rkcg_ts_rebalance_start); + rd_avg_add(&rkcg->rkcg_rk->rk_telemetry.rd_avg_current + .rk_avg_rebalance_latency, + rd_clock() - rkcg->rkcg_ts_rebalance_start); } + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE", "Group \"%.*s\" changed join state %s -> %s " "(state %s)", @@ -6506,7 +6509,9 @@ static rd_kafka_op_res_t rd_kafka_cgrp_op_serve(rd_kafka_t *rk, break; case RD_KAFKA_OP_SUBSCRIBE: - rd_kafka_app_polled(rk); + /* We just want to avoid reaching max poll interval, + * without anything else is done on poll. */ + rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock()); /* New atomic subscription (may be NULL) */ if (rkcg->rkcg_group_protocol == diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index f1ec1a21f..f4e671041 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -346,6 +346,7 @@ typedef struct rd_kafka_cgrp_s { * assignment */ } rkcg_c; + /* Timestamp of last rebalance start */ rd_ts_t rkcg_ts_rebalance_start; } rd_kafka_cgrp_t; diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 99b3b1e42..33281d3b2 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -380,10 +380,20 @@ struct rd_kafka_s { * (or equivalent). * Used to enforce * max.poll.interval.ms. - * Only relevant for consumer. */ - rd_ts_t rk_ts_last_poll_start; - rd_ts_t rk_ts_last_poll_end; - + * Set to INT64_MAX while polling + * to avoid reaching + * max.poll.interval.ms. during that time + * frame. Only relevant for consumer. */ + rd_ts_t rk_ts_last_poll_start; /**< Timestamp of last application + * consumer_poll() call start + * Only relevant for consumer. + * Not an atomic as Kafka consumer + * isn't thread safe. */ + rd_ts_t rk_ts_last_poll_end; /**< Timestamp of last application + * consumer_poll() call end + * Only relevant for consumer. + * Not an atomic as Kafka consumer + * isn't thread safe. */ /* First fatal error. */ struct { rd_atomic32_t err; /**< rd_kafka_resp_err_t */ @@ -695,35 +705,30 @@ struct rd_kafka_s { int *matched_metrics; size_t matched_metrics_cnt; - struct { rd_ts_t ts_last; /**< Timestamp of last push */ rd_ts_t ts_start; /**< Timestamp from when collection * started */ - uint64_t - rebalance_latency_total_ms; /**< Total rebalance - time in ms */ + /** Total rebalance latency (ms) up to previous push */ + uint64_t rebalance_latency_total; } rk_historic_c; struct { - rd_avg_t rd_avg_poll_idle_ratio; /**< Current Poll Idle - Ratio avg */ + rd_avg_t rk_avg_poll_idle_ratio; + rd_avg_t rk_avg_commit_latency; /**< Current commit + * latency avg */ rd_avg_t - rd_avg_rebalance_latency; /**< Current rebalance - latency avg */ - rd_avg_t rd_avg_commit_latency; /**< Current commit - latency avg */ + rk_avg_rebalance_latency; /**< Current rebalance + * latency avg */ } rd_avg_current; struct { - rd_avg_t rd_avg_poll_idle_ratio; /**< Rollover Poll Idle - Ratio avg */ + rd_avg_t rk_avg_poll_idle_ratio; + rd_avg_t rk_avg_commit_latency; /**< Rolled over commit + * latency avg */ rd_avg_t - rd_avg_rebalance_latency; /**< Rollover rebalance - latency avg */ - - rd_avg_t rd_avg_commit_latency; /**< Rollover commit - latency avg */ + rk_avg_rebalance_latency; /**< Rolled over rebalance + * latency avg */ } rd_avg_rollover; } rk_telemetry; @@ -1135,8 +1140,8 @@ static RD_INLINE RD_UNUSED int rd_kafka_max_poll_exceeded(rd_kafka_t *rk) { } /** - * @brief Call on entry to polling function to indicate - * if the application is blocked waiting for librdkafka + * @brief Call on entry to blocking polling function to indicate + * that the application is blocked waiting for librdkafka * and that max.poll.interval.ms should not be enforced. * * Call app_polled() Upon return from the function calling @@ -1147,28 +1152,27 @@ static RD_INLINE RD_UNUSED int rd_kafka_max_poll_exceeded(rd_kafka_t *rk) { * @locality any * @locks none */ -static RD_INLINE RD_UNUSED void rd_kafka_app_poll_start(rd_kafka_t *rk, - rd_bool_t is_blocking) { +static RD_INLINE RD_UNUSED void +rd_kafka_app_poll_start(rd_kafka_t *rk, rd_ts_t now, rd_bool_t is_blocking) { if (rk->rk_type != RD_KAFKA_CONSUMER) return; + + if (!now) + now = rd_clock(); if (is_blocking) rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX); - - /* To avoid re-entrancy condition */ - rd_ts_t now = rd_clock(); - if ((rk->rk_ts_last_poll_start != 0) && - (rk->rk_ts_last_poll_end != 0)) { - rd_ts_t poll_interval = now - rk->rk_ts_last_poll_start; - rd_ts_t idle_interval = - rk->rk_ts_last_poll_end - rk->rk_ts_last_poll_start; + if (rk->rk_ts_last_poll_end) { int64_t poll_idle_ratio = 0; - if (poll_interval > 0) { - poll_idle_ratio = (idle_interval * 1e6) / poll_interval; + rd_ts_t poll_interval = now - rk->rk_ts_last_poll_start; + if (poll_interval) { + rd_ts_t idle_interval = + rk->rk_ts_last_poll_end - rk->rk_ts_last_poll_start; + poll_idle_ratio = + idle_interval * 1000000 / poll_interval; } rd_avg_add( - &rk->rk_telemetry.rd_avg_current.rd_avg_poll_idle_ratio, + &rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio, poll_idle_ratio); - } else { rk->rk_ts_last_poll_start = now; rk->rk_ts_last_poll_end = 0; } @@ -1184,7 +1188,8 @@ static RD_INLINE RD_UNUSED void rd_kafka_app_poll_start(rd_kafka_t *rk, */ static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) { if (rk->rk_type == RD_KAFKA_CONSUMER) { - rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock()); + rd_ts_t now = rd_clock(); + rd_atomic64_set(&rk->rk_ts_last_poll, now); if (unlikely(rk->rk_cgrp && rk->rk_cgrp->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER && @@ -1194,6 +1199,10 @@ static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) { rk->rk_cgrp, "app polled after poll interval exceeded"); } + if (!rk->rk_ts_last_poll_end) + rk->rk_ts_last_poll_end = now; + rd_dassert(rk->rk_ts_last_poll_end >= + rk->rk_ts_last_poll_start); } } diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index 48e1b0394..b28000051 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -2534,6 +2534,14 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster, return RD_KAFKA_RESP_ERR_NO_ERROR; } +void rd_kafka_mock_group_initial_rebalance_delay_ms( + rd_kafka_mock_cluster_t *mcluster, + int32_t delay_ms) { + mtx_lock(&mcluster->lock); + mcluster->defaults.group_initial_rebalance_delay_ms = delay_ms; + mtx_unlock(&mcluster->lock); +} + static rd_kafka_op_res_t rd_kafka_mock_cluster_op_serve(rd_kafka_t *rk, @@ -2694,7 +2702,8 @@ rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new(rd_kafka_t *rk, TAILQ_INIT(&mcluster->topics); mcluster->defaults.partition_cnt = 4; mcluster->defaults.replication_factor = RD_MIN(3, broker_cnt); - mcluster->track_requests = rd_false; + mcluster->defaults.group_initial_rebalance_delay_ms = 3000; + mcluster->track_requests = rd_false; TAILQ_INIT(&mcluster->cgrps); diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index e13d7d5e9..38de9b158 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -168,6 +168,15 @@ rd_kafka_mock_push_request_errors_array(rd_kafka_mock_cluster_t *mcluster, const rd_kafka_resp_err_t *errors); +/** + * @brief Apply broker configuration group.initial.rebalance.delay.ms + * to the whole \p mcluster. + */ +RD_EXPORT void rd_kafka_mock_group_initial_rebalance_delay_ms( + rd_kafka_mock_cluster_t *mcluster, + int32_t delay_ms); + + /** * @brief Push \p cnt errors and RTT tuples in the \p ... va-arg list onto * the broker's error stack for the given \p ApiKey. diff --git a/src/rdkafka_mock_cgrp.c b/src/rdkafka_mock_cgrp.c index 60b3aa156..cce43b726 100644 --- a/src/rdkafka_mock_cgrp.c +++ b/src/rdkafka_mock_cgrp.c @@ -400,9 +400,11 @@ static void rd_kafka_mock_cgrp_rebalance(rd_kafka_mock_cgrp_t *mcgrp, if (mcgrp->state == RD_KAFKA_MOCK_CGRP_STATE_JOINING) return; /* Do nothing, group is already rebalancing. */ else if (mcgrp->state == RD_KAFKA_MOCK_CGRP_STATE_EMPTY) - timeout_ms = 3000; /* First join, low timeout. - * Same as group.initial.rebalance.delay.ms - * on the broker. */ + /* First join, low timeout. + * Same as group.initial.rebalance.delay.ms + * on the broker. */ + timeout_ms = + mcgrp->cluster->defaults.group_initial_rebalance_delay_ms; else if (mcgrp->state == RD_KAFKA_MOCK_CGRP_STATE_REBALANCING && mcgrp->member_cnt == mcgrp->last_member_cnt) timeout_ms = 100; /* All members rejoined, quickly transition diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index b1560f421..4ea6df2a5 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -394,6 +394,8 @@ struct rd_kafka_mock_cluster_s { struct { int partition_cnt; /**< Auto topic create part cnt */ int replication_factor; /**< Auto topic create repl factor */ + /** Group initial rebalance delay */ + int32_t group_initial_rebalance_delay_ms; } defaults; /**< Dynamic array of IO handlers for corresponding fd in .fds */ diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index 52af378b9..90b3c8cef 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -406,8 +406,8 @@ rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq, rd_timeout_init_timespec_us(&timeout_tspec, timeout_us); - if (timeout_us && can_q_contain_fetched_msgs) - rd_kafka_app_poll_start(rkq->rkq_rk, rd_true); + if (can_q_contain_fetched_msgs) + rd_kafka_app_poll_start(rkq->rkq_rk, 0, timeout_us); while (1) { rd_kafka_op_res_t res; @@ -542,8 +542,8 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq, rd_timeout_init_timespec(&timeout_tspec, timeout_ms); - if (timeout_ms && can_q_contain_fetched_msgs) - rd_kafka_app_poll_start(rk, rd_true); + if (can_q_contain_fetched_msgs) + rd_kafka_app_poll_start(rk, 0, timeout_ms); /* Wait for op */ while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && @@ -681,11 +681,10 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, mtx_unlock(&rkq->rkq_lock); - if (timeout_ms) - rd_kafka_app_poll_start(rk, rd_true); - rd_timeout_init_timespec(&timeout_tspec, timeout_ms); + rd_kafka_app_poll_start(rk, 0, timeout_ms); + rd_kafka_yield_thread = 0; while (cnt < rkmessages_size) { rd_kafka_op_res_t res; diff --git a/src/rdkafka_telemetry_decode.c b/src/rdkafka_telemetry_decode.c index 80b6dcb63..e67f645d2 100644 --- a/src/rdkafka_telemetry_decode.c +++ b/src/rdkafka_telemetry_decode.c @@ -45,6 +45,7 @@ struct metric_unit_test_data { char metric_description[_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE]; int64_t metric_value_int; double metric_value_double; + int64_t int64_value; uint64_t metric_time; }; @@ -57,6 +58,7 @@ static void clear_unit_test_data(void) { unit_test_data.metric_description[0] = '\0'; unit_test_data.metric_value_int = 0; unit_test_data.metric_time = 0; + unit_test_data.int64_value = 0; } static bool @@ -426,6 +428,11 @@ static void unit_test_telemetry_decoded_NumberDataPoint( unit_test_data.current_field++; } +static void unit_test_telemetry_decoded_int64(void *opaque, + int64_t int64_value) { + unit_test_data.int64_value = int64_value; +} + static void unit_test_telemetry_decoded_type(void *opaque, rd_kafka_telemetry_metric_type_t type) { @@ -444,30 +451,44 @@ unit_test_telemetry_decode_error(void *opaque, const char *error, ...) { rd_assert(!*"Failure while decoding telemetry data"); } -bool unit_test_telemetry(rd_kafka_type_t rk_type, - int metric_name, - const char *expected_name, - const char *expected_description, - rd_kafka_telemetry_metric_type_t expected_type, - rd_bool_t is_double) { +int unit_test_telemetry(rd_kafka_type_t rk_type, + rd_kafka_telemetry_producer_metric_name_t metric_name, + const char *expected_name, + const char *expected_description, + rd_kafka_telemetry_metric_type_t expected_type, + rd_bool_t is_double, + rd_bool_t is_per_broker, + void (*set_metric_value)(rd_kafka_t *, + rd_kafka_broker_t *)) { rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); rwlock_init(&rk->rk_lock); - rk->rk_type = rk_type; + rk->rk_type = rk_type; + rk->rk_cgrp = rd_calloc(1, sizeof(*rk->rk_cgrp)); + rk->rk_broker_cnt.val = 1; rk->rk_telemetry.matched_metrics_cnt = 1; - if (rk_type == RD_KAFKA_PRODUCER) - rk->rk_telemetry.matched_metrics = rd_malloc( - sizeof(rd_kafka_telemetry_producer_metric_name_t) * - rk->rk_telemetry.matched_metrics_cnt); - else if (rk_type == RD_KAFKA_CONSUMER) - rk->rk_telemetry.matched_metrics = rd_malloc( - sizeof(rd_kafka_telemetry_consumer_metric_name_t) * - rk->rk_telemetry.matched_metrics_cnt); + rk->rk_telemetry.matched_metrics = + rd_malloc(sizeof(rd_kafka_telemetry_producer_metric_name_t) * + rk->rk_telemetry.matched_metrics_cnt); rk->rk_telemetry.matched_metrics[0] = metric_name; rk->rk_telemetry.rk_historic_c.ts_start = (rd_uclock() - 1000 * 1000) * 1000; rk->rk_telemetry.rk_historic_c.ts_last = (rd_uclock() - 1000 * 1000) * 1000; + rd_avg_init(&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio, + RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); + rd_avg_init(&rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency, + RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); + rd_avg_init(&rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency, + RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); + + rd_avg_init(&rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio, + RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); + rd_avg_init(&rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency, + RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); + rd_avg_init(&rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency, + RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); + rd_strlcpy(rk->rk_name, "unittest", sizeof(rk->rk_name)); clear_unit_test_data(); @@ -475,22 +496,24 @@ bool unit_test_telemetry(rd_kafka_type_t rk_type, .decoded_string = unit_test_telemetry_decoded_string, .decoded_NumberDataPoint = unit_test_telemetry_decoded_NumberDataPoint, - .decoded_type = unit_test_telemetry_decoded_type, - .decode_error = unit_test_telemetry_decode_error, - .opaque = &unit_test_data, + .decoded_int64 = unit_test_telemetry_decoded_int64, + .decoded_type = unit_test_telemetry_decoded_type, + .decode_error = unit_test_telemetry_decode_error, + .opaque = &unit_test_data, }; TAILQ_INIT(&rk->rk_brokers); - rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb)); - rkb->rkb_c.connects.val = 1; + rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb)); + rkb->rkb_nodeid = 1001; + rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_rtt, RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency, RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_throttle, RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rd_avg_fetch_latency, + rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency, RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt, @@ -499,43 +522,12 @@ bool unit_test_telemetry(rd_kafka_type_t rk_type, RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_throttle, RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rd_avg_fetch_latency, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - if (rk_type == RD_KAFKA_CONSUMER) { - /* Initialize the newly added metrics to output one */ - rd_avg_init( - &rk->rk_telemetry.rd_avg_current.rd_avg_rebalance_latency, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_avg_init( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency, + rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency, RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_avg_init( - &rk->rk_telemetry.rd_avg_current.rd_avg_commit_latency, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_avg_init( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_commit_latency, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_avg_init( - &rk->rk_telemetry.rd_avg_current.rd_avg_poll_idle_ratio, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - rd_avg_init( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_poll_idle_ratio, - RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true); - /* Rebalance Latency Metrics */ - rd_avg_add( - &rk->rk_telemetry.rd_avg_current.rd_avg_rebalance_latency, - 1000); - /* Commit Latency Metrics */ - rd_avg_add( - &rk->rk_telemetry.rd_avg_current.rd_avg_commit_latency, - 1000); - /* Poll Idle Ratio Metric */ - rd_avg_add( - &rk->rk_telemetry.rd_avg_current.rd_avg_poll_idle_ratio, - 1e6); - } + set_metric_value(rk, rkb); + TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link); rd_buf_t *rbuf = rd_kafka_telemetry_encode_metrics(rk); void *metrics_payload = rbuf->rbuf_wpos->seg_p; @@ -546,12 +538,11 @@ bool unit_test_telemetry(rd_kafka_type_t rk_type, bool decode_status = rd_kafka_telemetry_decode_metrics( &decode_interface, metrics_payload, metrics_payload_size); + RD_UT_ASSERT(decode_status == 1, "Decoding failed"); RD_UT_ASSERT(unit_test_data.type == expected_type, "Metric type mismatch"); - RD_UT_ASSERT(strcmp(unit_test_data.metric_name + - strlen(RD_KAFKA_TELEMETRY_METRIC_PREFIX), - expected_name) == 0, + RD_UT_ASSERT(strcmp(unit_test_data.metric_name, expected_name) == 0, "Metric name mismatch"); RD_UT_ASSERT(strcmp(unit_test_data.metric_description, expected_description) == 0, @@ -563,80 +554,300 @@ bool unit_test_telemetry(rd_kafka_type_t rk_type, else RD_UT_ASSERT(unit_test_data.metric_value_int == 1, "Metric value mismatch"); + if (is_per_broker) + RD_UT_ASSERT(unit_test_data.int64_value == 1001, + "Expected broker mismatch"); RD_UT_ASSERT(unit_test_data.metric_time != 0, "Metric time mismatch"); rd_free(rk->rk_telemetry.matched_metrics); rd_buf_destroy_free(rbuf); - + rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_rtt); + rd_avg_destroy( + &rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency); + rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_throttle); rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt); rd_avg_destroy( &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency); rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_throttle); + + rd_avg_destroy( + &rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency); + rd_avg_destroy( + &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency); + + rd_avg_destroy(&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio); rd_avg_destroy( - &rkb->rkb_telemetry.rd_avg_rollover.rd_avg_fetch_latency); + &rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio); rd_avg_destroy( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency); - rd_avg_destroy(&rk->rk_telemetry.rd_avg_rollover.rd_avg_commit_latency); + &rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency); rd_avg_destroy( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_poll_idle_ratio); + &rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency); + + rd_avg_destroy(&rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency); + rd_avg_destroy(&rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency); rd_free(rkb); rwlock_destroy(&rk->rk_lock); + rd_free(rk->rk_cgrp); rd_free(rk); RD_UT_PASS(); + return 0; +} + +void unit_test_telemetry_set_connects(rd_kafka_t *rk, rd_kafka_broker_t *rkb) { + rkb->rkb_c.connects.val = 1; +} + +void unit_test_telemetry_set_rtt(rd_kafka_t *rk, rd_kafka_broker_t *rkb) { + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_rtt, 1000); + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_rtt, 1000); +} + +void unit_test_telemetry_set_throttle_time(rd_kafka_t *rk, + rd_kafka_broker_t *rkb) { + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_throttle, 1); + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_throttle, 1); + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_throttle, 1); } -bool unit_test_telemetry_gauge(void) { - return unit_test_telemetry( +void unit_test_telemetry_set_queue_time(rd_kafka_t *rk, + rd_kafka_broker_t *rkb) { + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency, + 1000); + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency, + 1000); + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency, + 1000); + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency, + 1000); +} + +void unit_test_telemetry_set_coordinator_assigned_partitions( + rd_kafka_t *rk, + rd_kafka_broker_t *rkb) { + rk->rk_cgrp->rkcg_c.assignment_size = 1; +} + +void unit_test_telemetry_set_rebalance_latency(rd_kafka_t *rk, + rd_kafka_broker_t *rkb) { + rd_avg_add(&rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency, + 1000); +} + +void unit_test_telemetry_set_fetch_latency(rd_kafka_t *rk, + rd_kafka_broker_t *rkb) { + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency, + 1000); + rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency, + 1000); +} + +void unit_test_telemetry_set_poll_idle_ratio(rd_kafka_t *rk, + rd_kafka_broker_t *rkb) { + rd_avg_add(&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio, + 1000000); + rd_avg_add(&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio, + 1000000); + rd_avg_add(&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio, + 1000000); +} + +void unit_test_telemetry_set_commit_latency(rd_kafka_t *rk, + rd_kafka_broker_t *rkb) { + rd_avg_add(&rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency, + 1000); + rd_avg_add(&rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency, + 1000); +} + +int unit_test_telemetry_gauge(void) { + int fails = 0; + /* Producer metrics */ + fails += unit_test_telemetry( RD_KAFKA_PRODUCER, RD_KAFKA_TELEMETRY_METRIC_PRODUCER_CONNECTION_CREATION_RATE, + RD_KAFKA_TELEMETRY_METRIC_PREFIX "producer.connection.creation.rate", "The rate of connections established per second.", - RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true); + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true, rd_false, + unit_test_telemetry_set_connects); + fails += unit_test_telemetry( + RD_KAFKA_PRODUCER, + RD_KAFKA_TELEMETRY_METRIC_PRODUCER_NODE_REQUEST_LATENCY_AVG, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "producer.node.request.latency.avg", + "The average request latency in ms for a node.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true, rd_true, + unit_test_telemetry_set_rtt); + fails += unit_test_telemetry( + RD_KAFKA_PRODUCER, + RD_KAFKA_TELEMETRY_METRIC_PRODUCER_NODE_REQUEST_LATENCY_MAX, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "producer.node.request.latency.max", + "The maximum request latency in ms for a node.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_false, rd_true, + unit_test_telemetry_set_rtt); + fails += unit_test_telemetry( + RD_KAFKA_PRODUCER, + RD_KAFKA_TELEMETRY_METRIC_PRODUCER_PRODUCE_THROTTLE_TIME_AVG, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "producer.produce.throttle.time.avg", + "The average throttle time in ms for a node.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true, rd_false, + unit_test_telemetry_set_throttle_time); + fails += unit_test_telemetry( + RD_KAFKA_PRODUCER, + RD_KAFKA_TELEMETRY_METRIC_PRODUCER_PRODUCE_THROTTLE_TIME_MAX, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "producer.produce.throttle.time.max", + "The maximum throttle time in ms for a node.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_false, rd_false, + unit_test_telemetry_set_throttle_time); + fails += unit_test_telemetry( + RD_KAFKA_PRODUCER, + RD_KAFKA_TELEMETRY_METRIC_PRODUCER_RECORD_QUEUE_TIME_AVG, + RD_KAFKA_TELEMETRY_METRIC_PREFIX "producer.record.queue.time.avg", + "The average time in ms a record spends in the producer queue.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true, rd_false, + unit_test_telemetry_set_queue_time); + fails += unit_test_telemetry( + RD_KAFKA_PRODUCER, + RD_KAFKA_TELEMETRY_METRIC_PRODUCER_RECORD_QUEUE_TIME_MAX, + RD_KAFKA_TELEMETRY_METRIC_PREFIX "producer.record.queue.time.max", + "The maximum time in ms a record spends in the producer queue.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_false, rd_false, + unit_test_telemetry_set_queue_time); + + /* Consumer metrics */ + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_CONNECTION_CREATION_RATE, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.connection.creation.rate", + "The rate of connections established per second.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true, rd_false, + unit_test_telemetry_set_connects); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_NODE_REQUEST_LATENCY_AVG, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.node.request.latency.avg", + "The average request latency in ms for a node.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true, rd_true, + unit_test_telemetry_set_rtt); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_NODE_REQUEST_LATENCY_MAX, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.node.request.latency.max", + "The maximum request latency in ms for a node.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_false, rd_true, + unit_test_telemetry_set_rtt); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_ASSIGNED_PARTITIONS, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.coordinator.assigned.partitions", + "The number of partitions currently assigned to this consumer.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_false, rd_false, + unit_test_telemetry_set_coordinator_assigned_partitions); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_AVG, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.coordinator.rebalance.latency.avg", + "The average rebalance latency in ms for the " + "consumer coordinator.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true, rd_false, + unit_test_telemetry_set_rebalance_latency); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_MAX, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.coordinator.rebalance.latency.max", + "The maximum rebalance latency in ms for the " + "consumer coordinator.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_false, rd_false, + unit_test_telemetry_set_rebalance_latency); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_FETCH_MANAGER_FETCH_LATENCY_AVG, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.fetch.manager.fetch.latency.avg", + "The average fetch latency in ms for the fetch manager.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true, rd_false, + unit_test_telemetry_set_fetch_latency); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_FETCH_MANAGER_FETCH_LATENCY_MAX, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.fetch.manager.fetch.latency.max", + "The maximum fetch latency in ms for the fetch manager.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_false, rd_false, + unit_test_telemetry_set_fetch_latency); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_POLL_IDLE_RATIO_AVG, + RD_KAFKA_TELEMETRY_METRIC_PREFIX "consumer.poll.idle.ratio.avg", + "The average ratio of idle to poll for a consumer.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true, rd_false, + unit_test_telemetry_set_poll_idle_ratio); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_AVG, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.coordinator.commit.latency.avg", + "The average commit latency in ms for the consumer coordinator.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_true, rd_false, + unit_test_telemetry_set_commit_latency); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_MAX, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.coordinator.commit.latency.max", + "The maximum commit latency in ms for the consumer coordinator.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, rd_false, rd_false, + unit_test_telemetry_set_commit_latency); + return fails; } -bool unit_test_telemetry_sum(void) { - return unit_test_telemetry( +int unit_test_telemetry_sum(void) { + int fails = 0; + /* Producer metrics */ + fails += unit_test_telemetry( RD_KAFKA_PRODUCER, RD_KAFKA_TELEMETRY_METRIC_PRODUCER_CONNECTION_CREATION_TOTAL, + RD_KAFKA_TELEMETRY_METRIC_PREFIX "producer.connection.creation.total", "The total number of connections established.", - RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM, rd_false); -} - -int unit_test_telemetry_consumer(void) { - int fails = 0; - int consumer_metrics[] = { - RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_AVG, - RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_MAX, + RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM, rd_false, rd_false, + unit_test_telemetry_set_connects); + + /* Consumer metrics */ + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, + RD_KAFKA_TELEMETRY_METRIC_CONSUMER_CONNECTION_CREATION_TOTAL, + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.connection.creation.total", + "The total number of connections established.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM, rd_false, rd_false, + unit_test_telemetry_set_connects); + fails += unit_test_telemetry( + RD_KAFKA_CONSUMER, RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_TOTAL, - RD_KAFKA_TELEMETRY_METRIC_CONSUMER_POLL_IDLE_RATIO_AVG, - RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_AVG, - RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_MAX}; - for (int i = 0; i < 6; i++) { - fails += unit_test_telemetry( - RD_KAFKA_CONSUMER, consumer_metrics[i], - RD_KAFKA_TELEMETRY_CONSUMER_METRICS_INFO - [consumer_metrics[i]] - .name, - RD_KAFKA_TELEMETRY_CONSUMER_METRICS_INFO - [consumer_metrics[i]] - .description, - RD_KAFKA_TELEMETRY_CONSUMER_METRICS_INFO - [consumer_metrics[i]] - .type, - !RD_KAFKA_TELEMETRY_CONSUMER_METRICS_INFO - [consumer_metrics[i]] - .is_int); - } + RD_KAFKA_TELEMETRY_METRIC_PREFIX + "consumer.coordinator.rebalance.latency.total", + "The total rebalance latency in ms for the " + "consumer coordinator.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM, rd_false, rd_false, + unit_test_telemetry_set_rebalance_latency); return fails; } int unittest_telemetry_decode(void) { int fails = 0; - fails += unit_test_telemetry_sum(); fails += unit_test_telemetry_gauge(); - fails += unit_test_telemetry_consumer(); + fails += unit_test_telemetry_sum(); return fails; } diff --git a/src/rdkafka_telemetry_encode.c b/src/rdkafka_telemetry_encode.c index 81a41a96d..f430e7514 100644 --- a/src/rdkafka_telemetry_encode.c +++ b/src/rdkafka_telemetry_encode.c @@ -31,7 +31,6 @@ #include "opentelemetry/metrics.pb.h" #define THREE_ORDERS_MAGNITUDE 1000 -#define SIX_ORDERS_MAGNITUDE 1000000 typedef struct { opentelemetry_proto_metrics_v1_Metric **metrics; @@ -217,12 +216,9 @@ calculate_consumer_rebalance_latency_avg(rd_kafka_t *rk, rd_kafka_broker_t *rkb_selected, rd_ts_t now_ns) { rd_kafka_telemetry_metric_value_t avg_rebalance_time; - double avg = 0; - - avg = - rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency.ra_v.avg; - - avg_rebalance_time.double_value = avg / THREE_ORDERS_MAGNITUDE; + avg_rebalance_time.double_value = + rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency.ra_v.avg / + (double)THREE_ORDERS_MAGNITUDE; return avg_rebalance_time; } @@ -231,13 +227,9 @@ calculate_consumer_rebalance_latency_max(rd_kafka_t *rk, rd_kafka_broker_t *rkb_selected, rd_ts_t now_ns) { rd_kafka_telemetry_metric_value_t max_rebalance_time; - max_rebalance_time.int_value = 0; - - max_rebalance_time.int_value = - rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency.ra_v.maxv; - max_rebalance_time.int_value = RD_CEIL_INTEGER_DIVISION( - max_rebalance_time.int_value, THREE_ORDERS_MAGNITUDE); + rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency.ra_v.maxv, + THREE_ORDERS_MAGNITUDE); return max_rebalance_time; } @@ -246,17 +238,13 @@ calculate_consumer_rebalance_latency_total(rd_kafka_t *rk, rd_kafka_broker_t *rkb_selected, rd_ts_t now_ns) { rd_kafka_telemetry_metric_value_t total_rebalance_time; - total_rebalance_time.int_value = 0; - total_rebalance_time.int_value = - rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency.ra_v.sum; - total_rebalance_time.int_value = RD_CEIL_INTEGER_DIVISION( - total_rebalance_time.int_value, THREE_ORDERS_MAGNITUDE); - - if (!rk->rk_telemetry.delta_temporality) + rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency.ra_v.sum, + THREE_ORDERS_MAGNITUDE); + if (!rk->rk_telemetry.delta_temporality) { total_rebalance_time.int_value += - (rk->rk_telemetry.rk_historic_c.rebalance_latency_total_ms); - + rk->rk_telemetry.rk_historic_c.rebalance_latency_total; + } return total_rebalance_time; } @@ -265,9 +253,23 @@ calculate_consumer_fetch_latency_avg(rd_kafka_t *rk, rd_kafka_broker_t *rkb_selected, rd_ts_t now_ns) { rd_kafka_telemetry_metric_value_t avg_fetch_time; + rd_kafka_broker_t *rkb; double avg = 0; - avg = rkb_selected->rkb_telemetry.rd_avg_rollover.rd_avg_fetch_latency - .ra_v.avg; + int count = 0; + + TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { + rd_avg_t *rkb_avg_fetch_latency_rollover = + &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency; + if (rkb_avg_fetch_latency_rollover->ra_v.cnt) { + avg = + (avg * count + + rkb_avg_fetch_latency_rollover->ra_v.sum) / + (double)(count + + rkb_avg_fetch_latency_rollover->ra_v.cnt); + count += rkb_avg_fetch_latency_rollover->ra_v.cnt; + } + } + avg_fetch_time.double_value = avg / THREE_ORDERS_MAGNITUDE; return avg_fetch_time; } @@ -277,8 +279,15 @@ calculate_consumer_fetch_latency_max(rd_kafka_t *rk, rd_kafka_broker_t *rkb_selected, rd_ts_t now_ns) { rd_kafka_telemetry_metric_value_t max_fetch_time; - max_fetch_time.int_value = rkb_selected->rkb_telemetry.rd_avg_rollover - .rd_avg_fetch_latency.ra_v.maxv; + rd_kafka_broker_t *rkb; + + max_fetch_time.int_value = 0; + TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { + max_fetch_time.int_value = + RD_MAX(max_fetch_time.int_value, + rkb->rkb_telemetry.rd_avg_rollover + .rkb_avg_fetch_latency.ra_v.maxv); + } max_fetch_time.int_value = RD_CEIL_INTEGER_DIVISION( max_fetch_time.int_value, THREE_ORDERS_MAGNITUDE); return max_fetch_time; @@ -289,11 +298,9 @@ calculate_consumer_poll_idle_ratio_avg(rd_kafka_t *rk, rd_kafka_broker_t *rkb_selected, rd_ts_t now_ns) { rd_kafka_telemetry_metric_value_t avg_poll_idle_avg; - double avg = 0; - avg = rk->rk_telemetry.rd_avg_rollover.rd_avg_poll_idle_ratio.ra_v.avg; - - avg_poll_idle_avg.double_value = avg / (double)SIX_ORDERS_MAGNITUDE; - + avg_poll_idle_avg.double_value = + rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio.ra_v.avg / + 1e6; return avg_poll_idle_avg; } @@ -302,12 +309,9 @@ calculate_consumer_commit_latency_avg(rd_kafka_t *rk, rd_kafka_broker_t *rkb_selected, rd_ts_t now_ns) { rd_kafka_telemetry_metric_value_t avg_commit_time; - double avg = 0; - - avg = rk->rk_telemetry.rd_avg_rollover.rd_avg_commit_latency.ra_v.avg; - - avg_commit_time.double_value = avg / THREE_ORDERS_MAGNITUDE; - + avg_commit_time.double_value = + rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency.ra_v.avg / + (float)THREE_ORDERS_MAGNITUDE; return avg_commit_time; } @@ -316,27 +320,21 @@ calculate_consumer_commit_latency_max(rd_kafka_t *rk, rd_kafka_broker_t *rkb_selected, rd_ts_t now_ns) { rd_kafka_telemetry_metric_value_t max_commit_time; - max_commit_time.int_value = 0; - - max_commit_time.int_value = - rk->rk_telemetry.rd_avg_rollover.rd_avg_commit_latency.ra_v.maxv; - max_commit_time.int_value = RD_CEIL_INTEGER_DIVISION( - max_commit_time.int_value, THREE_ORDERS_MAGNITUDE); - + rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency.ra_v.maxv, + THREE_ORDERS_MAGNITUDE); return max_commit_time; } static void reset_historical_metrics(rd_kafka_t *rk, rd_ts_t now_ns) { rd_kafka_broker_t *rkb; + rk->rk_telemetry.rk_historic_c.ts_last = now_ns; - if (rk->rk_type == RD_KAFKA_CONSUMER) { - rk->rk_telemetry.rk_historic_c.rebalance_latency_total_ms += - RD_CEIL_INTEGER_DIVISION( - rk->rk_telemetry.rd_avg_rollover - .rd_avg_rebalance_latency.ra_v.sum, - THREE_ORDERS_MAGNITUDE); - } + rk->rk_telemetry.rk_historic_c.rebalance_latency_total += + RD_CEIL_INTEGER_DIVISION(rk->rk_telemetry.rd_avg_rollover + .rk_avg_rebalance_latency.ra_v.sum, + THREE_ORDERS_MAGNITUDE); + TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { rkb->rkb_telemetry.rkb_historic_c.connects = rd_atomic32_get(&rkb->rkb_c.connects); @@ -771,8 +769,7 @@ rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) { } } - rk->rk_ts_last_poll_start = 0; - rk->rk_ts_last_poll_end = 0; + rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Serializing metrics"); TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt); rd_avg_rollover(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt, @@ -789,35 +786,35 @@ rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) { &rkb->rkb_telemetry.rd_avg_current.rkb_avg_throttle); if (rk->rk_type == RD_KAFKA_CONSUMER) { rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover - .rd_avg_fetch_latency); + .rkb_avg_fetch_latency); rd_avg_rollover(&rkb->rkb_telemetry.rd_avg_rollover - .rd_avg_fetch_latency, + .rkb_avg_fetch_latency, &rkb->rkb_telemetry.rd_avg_current - .rd_avg_fetch_latency); + .rkb_avg_fetch_latency); } } if (rk->rk_type == RD_KAFKA_CONSUMER) { rd_avg_destroy( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_poll_idle_ratio); + &rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio); rd_avg_rollover( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_poll_idle_ratio, - &rk->rk_telemetry.rd_avg_current.rd_avg_poll_idle_ratio); + &rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio, + &rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio); rd_avg_destroy( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency); + &rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency); rd_avg_rollover( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_rebalance_latency, - &rk->rk_telemetry.rd_avg_current.rd_avg_rebalance_latency); + &rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency, + &rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency); + rd_avg_destroy( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_commit_latency); + &rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency); rd_avg_rollover( - &rk->rk_telemetry.rd_avg_rollover.rd_avg_commit_latency, - &rk->rk_telemetry.rd_avg_current.rd_avg_commit_latency); + &rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency, + &rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency); } - rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Serializing metrics"); int resource_attributes_count = resource_attributes(rk, &resource_attributes_struct); rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Resource attributes count: %d", diff --git a/src/rdkafka_telemetry_encode.h b/src/rdkafka_telemetry_encode.h index 7252588f9..b4a0656ac 100644 --- a/src/rdkafka_telemetry_encode.h +++ b/src/rdkafka_telemetry_encode.h @@ -243,7 +243,7 @@ static const rd_kafka_telemetry_metric_info_t RD_KAFKA_TELEMETRY_CONSUMER_METRIC "The average fetch latency in ms for the fetch manager.", .unit = "ms", .is_int = rd_false, - .is_per_broker = rd_true, + .is_per_broker = rd_false, .type = RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE}, [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_FETCH_MANAGER_FETCH_LATENCY_MAX] = {.name = "consumer.fetch.manager.fetch.latency.max", @@ -251,7 +251,7 @@ static const rd_kafka_telemetry_metric_info_t RD_KAFKA_TELEMETRY_CONSUMER_METRIC "The maximum fetch latency in ms for the fetch manager.", .unit = "ms", .is_int = rd_true, - .is_per_broker = rd_true, + .is_per_broker = rd_false, .type = RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE}, [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_POLL_IDLE_RATIO_AVG] = {.name = "consumer.poll.idle.ratio.avg", diff --git a/src/rdtime.h b/src/rdtime.h index 4a7e76d75..a84b61526 100644 --- a/src/rdtime.h +++ b/src/rdtime.h @@ -164,9 +164,9 @@ static RD_INLINE int rd_timeout_ms(rd_ts_t timeout_us) { return (int)((timeout_us + 999) / 1000); } - /** * @brief Initialize an absolute timeout based on the provided \p timeout_ms + * and given clock \p now * * To be used with rd_timeout_adjust(). * @@ -175,11 +175,27 @@ static RD_INLINE int rd_timeout_ms(rd_ts_t timeout_us) { * @returns the absolute timeout which should later be passed * to rd_timeout_adjust(). */ -static RD_INLINE rd_ts_t rd_timeout_init(int timeout_ms) { +static RD_INLINE rd_ts_t rd_timeout_init0(rd_ts_t now, int timeout_ms) { if (timeout_ms == RD_POLL_INFINITE || timeout_ms == RD_POLL_NOWAIT) return timeout_ms; - return rd_clock() + ((rd_ts_t)timeout_ms * 1000); + return now + ((rd_ts_t)timeout_ms * 1000); +} + + +/** + * @brief Initialize an absolute timeout based on the provided \p timeout_ms + * and current clock. + * + * To be used with rd_timeout_adjust(). + * + * Honours RD_POLL_INFINITE, RD_POLL_NOWAIT. + * + * @returns the absolute timeout which should later be passed + * to rd_timeout_adjust(). + */ +static RD_INLINE rd_ts_t rd_timeout_init(int timeout_ms) { + return rd_timeout_init0(rd_clock(), timeout_ms); } diff --git a/tests/0150-telemetry_mock.c b/tests/0150-telemetry_mock.c index 997c2ec5f..45a7988b6 100644 --- a/tests/0150-telemetry_mock.c +++ b/tests/0150-telemetry_mock.c @@ -37,39 +37,33 @@ typedef struct { expected diff*/ int broker_id; /* Broker id of request. */ } rd_kafka_telemetry_expected_request_t; -static void test_clear_request_list(rd_kafka_mock_request_t **requests, - size_t request_cnt) { - size_t i; - for (i = 0; i < request_cnt; i++) { - rd_kafka_mock_request_destroy(requests[i]); - } - rd_free(requests); -} + static void test_telemetry_check_protocol_request_times( rd_kafka_mock_cluster_t *mcluster, rd_kafka_telemetry_expected_request_t *requests_expected, size_t expected_cnt) { - size_t actual_cnt; - rd_kafka_mock_request_t **requests_actual = - rd_kafka_mock_get_requests(mcluster, &actual_cnt); int64_t prev_timestamp = -1; int64_t curr_timestamp = -1; size_t expected_idx = 0; size_t actual_idx = 0; - const int buffer = 200 /* constant buffer time. */; + const int buffer = 500 /* constant buffer time. */; + + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt; if (expected_cnt < 1) return; - TEST_ASSERT(actual_cnt >= expected_cnt, + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + + TEST_ASSERT(request_cnt >= expected_cnt, "Expected at least %" PRIusz " requests, have %" PRIusz, - expected_cnt, actual_cnt); + expected_cnt, request_cnt); for (expected_idx = 0, actual_idx = 0; - expected_idx < expected_cnt && actual_idx < actual_cnt; + expected_idx < expected_cnt && actual_idx < request_cnt; actual_idx++) { - rd_kafka_mock_request_t *request_actual = - requests_actual[actual_idx]; + rd_kafka_mock_request_t *request_actual = requests[actual_idx]; int16_t actual_ApiKey = rd_kafka_mock_request_api_key(request_actual); int actual_broker_id = rd_kafka_mock_request_id(request_actual); @@ -81,16 +75,21 @@ static void test_telemetry_check_protocol_request_times( continue; TEST_ASSERT(actual_ApiKey == request_expected.ApiKey, - "Expected ApiKey %s, got ApiKey %s", + "request[%" PRIusz + "]: Expected ApiKey %s, " + "got ApiKey %s", + expected_idx, rd_kafka_ApiKey2str(request_expected.ApiKey), rd_kafka_ApiKey2str(actual_ApiKey)); if (request_expected.broker_id != -1) - TEST_ASSERT( - request_expected.broker_id == actual_broker_id, - "Expected request to be sent to broker %d, " - "was sent to %d", - request_expected.broker_id, actual_broker_id); + TEST_ASSERT(request_expected.broker_id == + actual_broker_id, + "request[%" PRIusz + "]: Expected request to be " + "sent to broker %d, was sent to %d", + expected_idx, request_expected.broker_id, + actual_broker_id); prev_timestamp = curr_timestamp; curr_timestamp = @@ -108,28 +107,99 @@ static void test_telemetry_check_protocol_request_times( (100 + request_expected.jitter_percent) / 100 + buffer; - TEST_ASSERT( - diff_ms > expected_diff_low, - "Expected difference to be more than %" PRId64 - ", was " - "%" PRId64, - expected_diff_low, diff_ms); - TEST_ASSERT( - diff_ms < expected_diff_hi, - "Expected difference to be less than %" PRId64 - ", was " - "%" PRId64, - expected_diff_hi, diff_ms); + TEST_ASSERT(diff_ms > expected_diff_low, + "request[%" PRIusz + "]: Expected difference to be " + "more than %" PRId64 ", was %" PRId64, + expected_idx, expected_diff_low, diff_ms); + TEST_ASSERT(diff_ms < expected_diff_hi, + "request[%" PRIusz + "]: Expected difference to be " + "less than %" PRId64 ", was %" PRId64, + expected_idx, expected_diff_hi, diff_ms); } expected_idx++; } - test_clear_request_list(requests_actual, actual_cnt); + + rd_kafka_mock_request_destroy_array(requests, request_cnt); } -static void test_poll_timeout(rd_kafka_t *rk, int64_t duration_ms) { - int64_t start_time = test_clock(); - while ((test_clock() - start_time) / 1000 < duration_ms) - rd_kafka_poll(rk, 500); +static void test_poll_timeout(rd_kafka_t *rk, + int64_t duration_ms, + const char *topic, + const char *bootstraps) { + int64_t start_time = test_clock(), now, iteration_start_time = 0; + rd_kafka_topic_t *rkt = NULL; + rd_kafka_type_t type = rd_kafka_type(rk); + if (type == RD_KAFKA_PRODUCER) + rkt = test_create_topic_object(rk, topic, NULL); + + now = test_clock(); + while ((now - start_time) / 1000 < duration_ms) { + if (now - iteration_start_time < 500 * 1000) { + int64_t sleep_interval = + 500 * 1000 - (now - iteration_start_time); + if (sleep_interval > + start_time + duration_ms * 1000 - now) + sleep_interval = + start_time + duration_ms * 1000 - now; + rd_usleep(sleep_interval, 0); + } + iteration_start_time = test_clock(); + /* Generate some metrics to report */ + if (type == RD_KAFKA_CONSUMER) { + test_consumer_poll_timeout("Consume", rk, 0, -1, -1, 1, + NULL, 10000); + } else { + test_produce_msgs(rk, rkt, 0, 0, 0, 10, NULL, 64); + } + now = test_clock(); + } + if (rkt) + rd_kafka_topic_destroy(rkt); +} + +static rd_kafka_mock_cluster_t *create_mcluster(const char **bootstraps, + char **expected_metrics, + size_t expected_metrics_cnt, + int64_t push_interval, + const char *topic) { + rd_kafka_mock_cluster_t *mcluster = + test_mock_cluster_new(2, bootstraps); + if (expected_metrics_cnt) + rd_kafka_mock_telemetry_set_requested_metrics( + mcluster, expected_metrics, expected_metrics_cnt); + rd_kafka_mock_telemetry_set_push_interval(mcluster, push_interval); + rd_kafka_mock_topic_create(mcluster, topic, 1, 2); + rd_kafka_mock_group_initial_rebalance_delay_ms(mcluster, 0); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_coordinator_set(mcluster, "group", topic, 1); + + /* Seed the topic so the consumer has always messages to read */ + test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 0, "bootstrap.servers", + *bootstraps, "batch.num.messages", "10", NULL); + + rd_kafka_mock_start_request_tracking(mcluster); + return mcluster; +} + +static rd_kafka_t * +create_handle(const char *bootstraps, rd_kafka_type_t type, const char *topic) { + rd_kafka_conf_t *conf; + rd_kafka_t *rk; + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstraps); + + if (type == RD_KAFKA_CONSUMER) { + test_conf_set(conf, "group.id", topic); + test_conf_set(conf, "auto.offset.reset", "earliest"); + rk = test_create_handle(RD_KAFKA_CONSUMER, conf); + test_consumer_subscribe(rk, topic); + } else { + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + } + return rk; } /** @@ -137,15 +207,14 @@ static void test_poll_timeout(rd_kafka_t *rk, int64_t duration_ms) { * successful PushTelemetry requests. * See `requests_expected` for detailed expected flow. */ -void do_test_telemetry_get_subscription_push_telemetry(void) { - rd_kafka_conf_t *conf; +static void +do_test_telemetry_get_subscription_push_telemetry(rd_kafka_type_t type) { const char *bootstraps; rd_kafka_mock_cluster_t *mcluster; - char *expected_metrics[] = {"*"}; - rd_kafka_t *producer = NULL; - rd_kafka_mock_request_t **requests = NULL; - size_t request_cnt; + char *expected_metrics[] = {"*"}; + rd_kafka_t *rk = NULL; const int64_t push_interval = 5000; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); rd_kafka_telemetry_expected_request_t requests_expected[] = { /* T= 0 : The initial GetTelemetrySubscriptions request. */ @@ -157,59 +226,50 @@ void do_test_telemetry_get_subscription_push_telemetry(void) { {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = -1, .expected_diff_ms = push_interval, - .jitter_percent = 20}, + .jitter_percent = 30}, /* T = push_interval*2 + jitter : The second PushTelemetry request. */ {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = -1, .expected_diff_ms = push_interval, - .jitter_percent = 0}, + .jitter_percent = 30}, }; - SUB_TEST(); + SUB_TEST("type %s", type == 0 ? "PRODUCER" : "CONSUMER"); - mcluster = test_mock_cluster_new(1, &bootstraps); - rd_kafka_mock_telemetry_set_requested_metrics(mcluster, - expected_metrics, 1); - rd_kafka_mock_telemetry_set_push_interval(mcluster, push_interval); - rd_kafka_mock_start_request_tracking(mcluster); + mcluster = create_mcluster(&bootstraps, expected_metrics, + RD_ARRAY_SIZE(expected_metrics), + push_interval, topic); - test_conf_init(&conf, NULL, 30); - test_conf_set(conf, "bootstrap.servers", bootstraps); - test_conf_set(conf, "debug", "telemetry"); - producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + rk = create_handle(bootstraps, type, topic); /* Poll for enough time for two pushes to be triggered, and a little * extra, so 2.5 x push interval. */ - test_poll_timeout(producer, push_interval * 2.5); - - requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + test_poll_timeout(rk, push_interval * 2.5, topic, bootstraps); test_telemetry_check_protocol_request_times( mcluster, requests_expected, RD_ARRAY_SIZE(requests_expected)); /* Clean up. */ - rd_kafka_mock_stop_request_tracking(mcluster); - rd_kafka_destroy(producer); + rd_kafka_destroy(rk); test_mock_cluster_destroy(mcluster); SUB_TEST_PASS(); } + /** * @brief When there are no subscriptions, GetTelemetrySubscriptions should be * resent after the push interval until there are subscriptions. * See `requests_expected` for detailed expected flow. */ -void do_test_telemetry_empty_subscriptions_list(void) { - rd_kafka_conf_t *conf; +static void do_test_telemetry_empty_subscriptions_list(rd_kafka_type_t type) { const char *bootstraps; rd_kafka_mock_cluster_t *mcluster; - char *expected_metrics[] = {"*"}; - rd_kafka_t *producer = NULL; - rd_kafka_mock_request_t **requests = NULL; - size_t request_cnt; + char *expected_metrics[] = {"*"}; + rd_kafka_t *rk = NULL; const int64_t push_interval = 5000; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); rd_kafka_telemetry_expected_request_t requests_expected[] = { /* T= 0 : The initial GetTelemetrySubscriptions request, returns @@ -229,24 +289,20 @@ void do_test_telemetry_empty_subscriptions_list(void) { {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = -1, .expected_diff_ms = push_interval, - .jitter_percent = 20}, + .jitter_percent = 30}, }; - SUB_TEST(); + SUB_TEST("type %s", type == 0 ? "PRODUCER" : "CONSUMER"); - mcluster = test_mock_cluster_new(1, &bootstraps); - rd_kafka_mock_telemetry_set_requested_metrics(mcluster, NULL, 0); - rd_kafka_mock_telemetry_set_push_interval(mcluster, push_interval); - rd_kafka_mock_start_request_tracking(mcluster); + mcluster = create_mcluster(&bootstraps, NULL, 0, push_interval, topic); - test_conf_init(&conf, NULL, 30); - test_conf_set(conf, "bootstrap.servers", bootstraps); - producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + + rk = create_handle(bootstraps, type, topic); /* Poll for enough time so that the first GetTelemetrySubscription * request is triggered. */ - test_poll_timeout(producer, (push_interval * 0.5)); + test_poll_timeout(rk, (push_interval * 0.5), topic, bootstraps); /* Set expected_metrics before the second GetTelemetrySubscription is * triggered. */ @@ -255,15 +311,13 @@ void do_test_telemetry_empty_subscriptions_list(void) { /* Poll for enough time so that the second GetTelemetrySubscriptions and * subsequent PushTelemetry request is triggered. */ - test_poll_timeout(producer, (push_interval * 2)); + test_poll_timeout(rk, (push_interval * 2), topic, bootstraps); - requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); test_telemetry_check_protocol_request_times(mcluster, requests_expected, 3); /* Clean up. */ - rd_kafka_mock_stop_request_tracking(mcluster); - rd_kafka_destroy(producer); + rd_kafka_destroy(rk); test_mock_cluster_destroy(mcluster); SUB_TEST_PASS(); @@ -274,15 +328,14 @@ void do_test_telemetry_empty_subscriptions_list(void) { * push telemetry request should be sent immediately. * See `requests_expected` for detailed expected flow. */ -void do_test_telemetry_terminating_push(void) { - rd_kafka_conf_t *conf; +static void do_test_telemetry_terminating_push(rd_kafka_type_t type) { const char *bootstraps; rd_kafka_mock_cluster_t *mcluster; - char *expected_metrics[] = {"*"}; - rd_kafka_t *producer = NULL; - rd_kafka_mock_request_t **requests = NULL; - size_t request_cnt; + char *expected_metrics[] = {"*"}; + rd_kafka_t *rk = NULL; const int64_t wait_before_termination = 2000; + + const char *topic = test_mk_topic_name(__FUNCTION__, 1); const int64_t push_interval = 5000; /* Needs to be comfortably larger than wait_before_termination. */ @@ -298,35 +351,30 @@ void do_test_telemetry_terminating_push(void) { {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = -1, .expected_diff_ms = wait_before_termination, - .jitter_percent = 0}, + .jitter_percent = 30}, }; - SUB_TEST(); - mcluster = test_mock_cluster_new(1, &bootstraps); - rd_kafka_mock_telemetry_set_requested_metrics(mcluster, - expected_metrics, 1); - rd_kafka_mock_telemetry_set_push_interval(mcluster, push_interval); - rd_kafka_mock_start_request_tracking(mcluster); + SUB_TEST("type %s", type == 0 ? "PRODUCER" : "CONSUMER"); - test_conf_init(&conf, NULL, 30); - test_conf_set(conf, "bootstrap.servers", bootstraps); - producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + mcluster = create_mcluster(&bootstraps, expected_metrics, + RD_ARRAY_SIZE(expected_metrics), + push_interval, topic); + + rk = create_handle(bootstraps, type, topic); /* Poll for enough time so that the initial GetTelemetrySubscriptions * can be sent and handled, and keep polling till it's time to * terminate. */ - test_poll_timeout(producer, wait_before_termination); + test_poll_timeout(rk, wait_before_termination, topic, bootstraps); /* Destroy the client to trigger a terminating push request * immediately. */ - rd_kafka_destroy(producer); + rd_kafka_destroy(rk); - requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); test_telemetry_check_protocol_request_times(mcluster, requests_expected, 2); /* Clean up. */ - rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); SUB_TEST_PASS(); @@ -337,14 +385,12 @@ void do_test_telemetry_terminating_push(void) { * old preferred broker goes down. * See `requests_expected` for detailed expected flow. */ -void do_test_telemetry_preferred_broker_change(void) { - rd_kafka_conf_t *conf; +void do_test_telemetry_preferred_broker_change(rd_kafka_type_t type) { const char *bootstraps; rd_kafka_mock_cluster_t *mcluster; - char *expected_metrics[] = {"*"}; - rd_kafka_t *producer = NULL; - rd_kafka_mock_request_t **requests = NULL; - size_t request_cnt; + char *expected_metrics[] = {"*"}; + rd_kafka_t *rk = NULL; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); const int64_t push_interval = 5000; rd_kafka_telemetry_expected_request_t requests_expected[] = { @@ -359,14 +405,14 @@ void do_test_telemetry_preferred_broker_change(void) { {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = 1, .expected_diff_ms = push_interval, - .jitter_percent = 20}, + .jitter_percent = 30}, /* T = 2*push_interval + jitter : The second PushTelemetry request, * sent to the preferred broker 1. */ {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = 1, .expected_diff_ms = push_interval, - .jitter_percent = 0}, + .jitter_percent = 30}, /* T = 3*push_interval + jitter: The old preferred broker is set * down, and this is the first PushTelemetry request to the new * preferred broker. @@ -374,7 +420,7 @@ void do_test_telemetry_preferred_broker_change(void) { {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = 2, .expected_diff_ms = push_interval, - .jitter_percent = 0}, + .jitter_percent = 30}, /* T = 4*push_interval + jitter + arbitraryT + jitter2 : The second * PushTelemetry request to the new preferred broker. The old * broker will be up, but the preferred broker will not chnage. @@ -382,56 +428,52 @@ void do_test_telemetry_preferred_broker_change(void) { {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = 2, .expected_diff_ms = push_interval, - .jitter_percent = 0}, + .jitter_percent = 30}, }; - SUB_TEST(); - mcluster = test_mock_cluster_new(2, &bootstraps); - rd_kafka_mock_telemetry_set_requested_metrics(mcluster, - expected_metrics, 1); - rd_kafka_mock_telemetry_set_push_interval(mcluster, push_interval); - rd_kafka_mock_start_request_tracking(mcluster); + SUB_TEST("type %s", type == 0 ? "PRODUCER" : "CONSUMER"); + mcluster = create_mcluster(&bootstraps, expected_metrics, + RD_ARRAY_SIZE(expected_metrics), + push_interval, topic); /* Set broker 2 down, to make sure broker 1 is the first preferred * broker. */ rd_kafka_mock_broker_set_down(mcluster, 2); - test_conf_init(&conf, NULL, 30); - test_conf_set(conf, "bootstrap.servers", bootstraps); - test_conf_set(conf, "debug", "telemetry"); - // rd_kafka_conf_set_error_cb(conf, test_error_is_not_fatal_cb); test_curr->is_fatal_cb = test_error_is_not_fatal_cb; - producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + rk = create_handle(bootstraps, type, topic); /* Poll for enough time that the initial GetTelemetrySubscription can be * sent and the first PushTelemetry request can be scheduled. */ - test_poll_timeout(producer, 0.5 * push_interval); + test_poll_timeout(rk, 0.5 * push_interval, topic, bootstraps); /* Poll for enough time that 2 PushTelemetry requests can be sent. Set * the all brokers up during this time, but the preferred broker (1) * should remain sticky. */ rd_kafka_mock_broker_set_up(mcluster, 2); - test_poll_timeout(producer, 2 * push_interval); + test_poll_timeout(rk, 2 * push_interval, topic, bootstraps); /* Set the preferred broker (1) down. */ rd_kafka_mock_broker_set_down(mcluster, 1); + /* Change partition leader to broker 2. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + /* Change coordinator to broker 2. */ + rd_kafka_mock_coordinator_set(mcluster, "group", topic, 2); /* Poll for enough time that 1 PushTelemetry request can be sent. */ - test_poll_timeout(producer, 1.25 * push_interval); + test_poll_timeout(rk, 1.25 * push_interval, topic, bootstraps); /* Poll for enough time that 1 PushTelemetry request can be sent. Set * the all brokers up during this time, but the preferred broker (2) * should remain sticky. */ rd_kafka_mock_broker_set_up(mcluster, 1); - test_poll_timeout(producer, 1.25 * push_interval); + test_poll_timeout(rk, 1.25 * push_interval, topic, bootstraps); - requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); test_telemetry_check_protocol_request_times(mcluster, requests_expected, 5); /* Clean up. */ - rd_kafka_mock_stop_request_tracking(mcluster); - rd_kafka_destroy(producer); + rd_kafka_destroy(rk); test_mock_cluster_destroy(mcluster); SUB_TEST_PASS(); @@ -441,15 +483,13 @@ void do_test_telemetry_preferred_broker_change(void) { * @brief Subscription Id change at the broker should trigger a new * GetTelemetrySubscriptions request. */ -void do_test_subscription_id_change(void) { - rd_kafka_conf_t *conf; +void do_test_subscription_id_change(rd_kafka_type_t type) { const char *bootstraps; rd_kafka_mock_cluster_t *mcluster; - char *expected_metrics[] = {"*"}; - rd_kafka_t *producer = NULL; - rd_kafka_mock_request_t **requests = NULL; - size_t request_cnt; - const int64_t push_interval = 1000; + char *expected_metrics[] = {"*"}; + rd_kafka_t *rk = NULL; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + const int64_t push_interval = 2000; rd_kafka_telemetry_expected_request_t requests_expected[] = { /* T= 0 : The initial GetTelemetrySubscriptions request. */ @@ -463,14 +503,14 @@ void do_test_subscription_id_change(void) { {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = -1, .expected_diff_ms = push_interval, - .jitter_percent = 20}, + .jitter_percent = 30}, /* T = 2*push_interval + jitter : The second PushTelemetry request, * which will fail with unknown subscription id. */ {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = -1, .expected_diff_ms = push_interval, - .jitter_percent = 20}, + .jitter_percent = 30}, /* New GetTelemetrySubscriptions request will be sent immediately. */ {.ApiKey = RD_KAFKAP_GetTelemetrySubscriptions, @@ -483,58 +523,50 @@ void do_test_subscription_id_change(void) { {.ApiKey = RD_KAFKAP_PushTelemetry, .broker_id = -1, .expected_diff_ms = push_interval, - .jitter_percent = 20}, + .jitter_percent = 30}, }; - SUB_TEST(); - mcluster = test_mock_cluster_new(1, &bootstraps); + SUB_TEST("type %s", type == 0 ? "PRODUCER" : "CONSUMER"); - rd_kafka_mock_telemetry_set_requested_metrics(mcluster, - expected_metrics, 1); - rd_kafka_mock_telemetry_set_push_interval(mcluster, push_interval); - rd_kafka_mock_start_request_tracking(mcluster); + mcluster = create_mcluster(&bootstraps, expected_metrics, + RD_ARRAY_SIZE(expected_metrics), + push_interval, topic); - test_conf_init(&conf, NULL, 30); - test_conf_set(conf, "bootstrap.servers", bootstraps); - test_conf_set(conf, "debug", "telemetry"); - producer = test_create_handle(RD_KAFKA_PRODUCER, conf); - test_poll_timeout(producer, push_interval * 1.2); + rk = create_handle(bootstraps, type, topic); + + test_poll_timeout(rk, push_interval * 1.5, topic, bootstraps); rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_PushTelemetry, 1, RD_KAFKA_RESP_ERR_UNKNOWN_SUBSCRIPTION_ID); - test_poll_timeout(producer, push_interval * 2.5); - - requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + test_poll_timeout(rk, push_interval * 2.5, topic, bootstraps); test_telemetry_check_protocol_request_times( mcluster, requests_expected, RD_ARRAY_SIZE(requests_expected)); /* Clean up. */ - rd_kafka_mock_stop_request_tracking(mcluster); - rd_kafka_destroy(producer); + rd_kafka_destroy(rk); test_mock_cluster_destroy(mcluster); SUB_TEST_PASS(); } int main_0150_telemetry_mock(int argc, char **argv) { + int type; if (test_needs_auth()) { TEST_SKIP("Mock cluster does not support SSL/SASL\n"); return 0; } - do_test_telemetry_get_subscription_push_telemetry(); - - do_test_telemetry_empty_subscriptions_list(); - - do_test_telemetry_terminating_push(); - - do_test_telemetry_preferred_broker_change(); - - do_test_subscription_id_change(); + for (type = RD_KAFKA_PRODUCER; type <= RD_KAFKA_CONSUMER; type++) { + do_test_telemetry_get_subscription_push_telemetry(type); + do_test_telemetry_empty_subscriptions_list(type); + do_test_telemetry_terminating_push(type); + do_test_telemetry_preferred_broker_change(type); + do_test_subscription_id_change(type); + }; return 0; }