-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KIP-714] Additional consumer metrics #4808
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round of comments
src/rdkafka.c
Outdated
@@ -3218,10 +3219,21 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) { | |||
rd_kafka_app_poll_blocking(rk); | |||
|
|||
rd_kafka_yield_thread = 0; | |||
rd_ts_t now = rd_clock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are many ways the user can call poll, see where rd_kafka_app_poll_blocking
is called, those places aren't considered at the moment. So you can convert rd_kafka_app_poll_blocking
to rd_kafka_app_poll_start
with a parameter that says if it's blocking that corresponds to timeout_ms
and do the calculation there.
Given we're in the hot path let's reduce system calls, you can get the now value here and pass it to rd_timeout_init0
and then pass it to rd_kafka_app_poll_start
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this thing further, since we are only refactoring how are we changing the order to reduce system call in the hot path ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
System call here is for getting the monotonic clock, we want to do it only once to reduce number of these calls (it's already done in rd_timeout_init).
src/rdkafka.c
Outdated
@@ -3218,10 +3219,21 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) { | |||
rd_kafka_app_poll_blocking(rk); | |||
|
|||
rd_kafka_yield_thread = 0; | |||
rd_ts_t now = rd_clock(); | |||
if (rk->rk_telemetry.ts_fetch_last != -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be useful for something else than telemetry so we can call it rk->rk_ts_last_poll_start
also the check is if it non-zero given it's initialized to zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
src/rdkafka_telemetry_encode.c
Outdated
rd_avg_destroy( | ||
&rk->rk_telemetry.rk_avg_rollover.rk_avg_poll_idle_ratio); | ||
rd_avg_rollover( | ||
&rk->rk_telemetry.rk_avg_current.rk_avg_poll_idle_ratio, | ||
&rk->rk_telemetry.rk_avg_rollover.rk_avg_poll_idle_ratio); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's to be done only if rk->rk_type == RD_KAFKA_CONSUMER
, also add instructions for rk_avg_rebalance_latency
and rk_avg_commit_latency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
src/rdkafka_telemetry_encode.c
Outdated
rk->rk_telemetry.ts_fetch_last = -1; | ||
rk->rk_telemetry.ts_fetch_cb_last = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two (renamed) don't need to be reset on push but just initialized to zero by default and used afterwards
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
src/rdkafka_telemetry_encode.h
Outdated
"consumer coordinator.", | ||
.unit = "ms", | ||
.is_int = rd_true, | ||
.is_per_broker = rd_true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per broker is rd_false
everywhere except for node.request.latency
, it's different from where we store the values, see KIP labels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
src/rdkafka_telemetry_encode.h
Outdated
.is_per_broker = rd_true, | ||
.type = RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM}, | ||
[RD_KAFKA_TELEMETRY_METRIC_CONSUMER_FETCH_MANAGER_FETCH_LATENCY_AVG] = | ||
{.name = "consumer.fetch.manager.fetch.latency.avg ", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these additional spaces in metric names within this file
{.name = "consumer.fetch.manager.fetch.latency.avg ", | |
{.name = "consumer.fetch.manager.fetch.latency.avg", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments after first changes and about unit and mock tests
src/rdkafka.c
Outdated
@@ -3889,6 +3901,7 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, | |||
|
|||
switch ((int)rko->rko_type) { | |||
case RD_KAFKA_OP_FETCH: | |||
rk->rk_telemetry.ts_fetch_cb_last = rd_clock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line left with previous name is causing a compilation error
src/rdkafka_broker.c
Outdated
|
||
if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Fetch) { | ||
rd_avg_add(&rkb->rkb_rk->rk_telemetry.rd_avg_current | ||
.rk_avg_fetch_latency, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fetch latency needs to stay per broker as it's useful to have this information differently from rk_avg_commit_latency that is increased by a single broker at a time (the group coordinator).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But ithas per_broker set to false in the KIP table itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will move it from rk to rkb
src/rdkafka_cgrp.c
Outdated
|
||
if(join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY){ | ||
rd_avg_add(&rkcg->rkcg_curr_coord->rkb_telemetry.rd_avg_current | ||
.rkb_avg_rebalance_latency, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This avg is in rk
now
src/rdkafka_cgrp.c
Outdated
.rkb_avg_rebalance_latency, | ||
rd_clock() - rkcg->rkcg_ts_rebalance_start); | ||
} | ||
switch ((int)rkcg->rkcg_join_state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can change the switch here and join it with previous condition in a if ... else if
as it's on different variables: join_state
in one case and rkcg->rkcg_join_state
in the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
src/rdkafka.c
Outdated
@@ -939,7 +939,19 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) { | |||
/* Synchronize state */ | |||
rd_kafka_wrlock(rk); | |||
rd_kafka_wrunlock(rk); | |||
if(rk->rk_type == RD_KAFKA_CONSUMER){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's already an if
condition for the consumer later, add these instructions there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
tests/0150-telemetry_mock.c
Outdated
* successful PushTelemetry requests. | ||
* See `requests_expected` for detailed expected flow. | ||
*/ | ||
void do_test_telemetry_get_subscription_push_telemetry_consumer(void) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add static
to all functions in this file except the main one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be the existing do_test_telemetry_get_subscription_push_telemetry
but taking rd_kafka_type_t type
as a parameter. So we call the same function before with the producer and then with the consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing with the producer and consumer should be done for all the tests in this file, that's the next thing you should work on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are not new metric specific, i need to discuss these
tests/0150-telemetry_mock.c
Outdated
mcluster = test_mock_cluster_new(1, &bootstraps); | ||
rd_kafka_mock_telemetry_set_requested_metrics(mcluster, | ||
expected_metrics, 1); | ||
rd_kafka_mock_telemetry_set_push_interval(mcluster, push_interval); | ||
rd_kafka_mock_start_request_tracking(mcluster); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part can be refactored to a create_mcluster
function and used in all the tests
tests/0150-telemetry_mock.c
Outdated
test_conf_init(&conf, NULL, 30); | ||
test_conf_set(conf, "bootstrap.servers", bootstraps); | ||
test_conf_set(conf, "debug", "telemetry"); | ||
consumer = test_create_handle(RD_KAFKA_CONSUMER, conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating the handle can also be refactored and used in all the tests. The consumer must subscribe to a topic that is created and pre populated in advance so we can see metrics in logs consumer tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure !
tests/0150-telemetry_mock.c
Outdated
|
||
/* Poll for enough time for two pushes to be triggered, and a little | ||
* extra, so 2.5 x push interval. */ | ||
test_poll_timeout(consumer, push_interval * 2.5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_poll_timeout
should call test_consumer_poll_timeout
with the consumer or test_produce_msgs
with the consumer so we can see metric in logs that are not zero and check if they're correct. A later refactor would be to automatically check the metric values but for this step that is enough.
tests/0150-telemetry_mock.c
Outdated
* extra, so 2.5 x push interval. */ | ||
test_poll_timeout(consumer, push_interval * 2.5); | ||
|
||
requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requests can be got and destroyed directly in test_telemetry_check_protocol_request_times
so we can just pass mcluster
to it and remove this repeated part of the code in all the subtests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these are dependent on all the tests, I will come back on these, in the office hours !
Librdkafka KIP 714 New Metrics -> Addition of Telemetry Metrics Formatting done via, |
src/rdkafka_telemetry_decode.c
Outdated
RD_KAFKA_TELEMETRY_METRIC_CONSUMER_POLL_IDLE_RATIO_AVG, | ||
RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_AVG, | ||
RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_MAX}; | ||
for (int i = 0; i < 6; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialize before using in loop, this is causing CI failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, some minor changes. Please also fix the CI
src/rdkafka_broker.h
Outdated
* between buf_enq0 | ||
* and writing to socket | ||
*/ | ||
rd_avg_t rd_avg_fetch_latency; /**< Current fetch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be named rkb_avg_fetch_latency
since this is at broker level.
src/rdkafka_broker.h
Outdated
} rd_avg_current; | ||
struct { | ||
rd_avg_t rkb_avg_rtt; /**< Rolled over RTT avg */ | ||
rd_avg_t | ||
rkb_avg_throttle; /**< Rolled over throttle avg */ | ||
rd_avg_t rkb_avg_outbuf_latency; /**< Rolled over outbuf | ||
* latency avg */ | ||
rd_avg_t rd_avg_fetch_latency; /**< Rolled over fetch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rkb_avg_fetch_latency
src/rdkafka_int.h
Outdated
@@ -692,12 +695,37 @@ struct rd_kafka_s { | |||
int *matched_metrics; | |||
size_t matched_metrics_cnt; | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
tests/0150-telemetry_mock.c
Outdated
rd_kafka_mock_request_t **requests_actual, | ||
size_t actual_cnt, | ||
rd_kafka_mock_cluster_t *mcluster, | ||
rd_kafka_telemetry_expected_request_t *requests_expected, | ||
size_t expected_cnt) { | ||
size_t actual_cnt; | ||
rd_kafka_mock_request_t **requests_actual = | ||
rd_kafka_mock_get_requests(mcluster, &actual_cnt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this changed? We either should also remove rd_kafka_mock_get_requests
in the methods or keep this as it is
Please also resolve the earlier comments if they've been fixed |
🎉 All Contributor License Agreements have been signed. Ready to merge. |
6de0833
to
f55f3ec
Compare
add missing consumer metrics described in the KIP: * consumer.coordinator.rebalance.latency.avg * consumer.coordinator.rebalance.latency.max * consumer.coordinator.rebalance.latency.total * consumer.fetch.manager.fetch.latency.avg * consumer.fetch.manager.fetch.latency.max * consumer.poll.idle.ratio.avg * consumer.coordinator.commit.latency.avg * consumer.coordinator.commit.latency.max additionally: * add unit tests for all the metrics * add integrations tests with the producer or consumer while they're active * configurable group initial rebalance delay ms to make integration tests reusable with both producer and consumer --------- Co-authored-by: Anchit Jain <anjain@confluent.io> Co-authored-by: mahajanadhitya <amahajan@confluent.io>
No description provided.