-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metadata cache by topic id and fixes #4660
Conversation
b36ead0
to
268cb01
Compare
Closes #4577 |
d04ed7e
to
f05bc17
Compare
CHANGELOG.md
Outdated
### General fixes | ||
|
||
* Metadata cache was cleared on full metadata refresh, leading to unnecessary | ||
refreshes and accasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refreshes and accasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating | |
refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed it
CHANGELOG.md
Outdated
cache for existing or hinted entries instead of clearing them. | ||
Happening since 2.1.0 (#4660). | ||
* Metadata refreshes without partition leader change could lead to a loop of | ||
intervaled metadata calls. Solved by stopping metadata refresh when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, "intervaled" isn't a word; I don't have enough context to know what this means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intervaled means at fixed intervals, it's rarely used I think. I can put the phrase "at fixed intervals" instead.
That is to mean that they're not in a tight loop
CHANGELOG.md
Outdated
was undergoing a validation and being retried because of an error. | ||
Solved by doing a partition migration only with a non-stale leader epoch. | ||
Happening since 2.1.0 (#4660). | ||
* In librdkafka main thread loop, when it was awaken less that 1 ms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* In librdkafka main thread loop, when it was awaken less that 1 ms | |
* When the main thread loop was awakened less than 1 ms |
CHANGELOG.md
Outdated
* In librdkafka main thread loop, when it was awaken less that 1 ms | ||
before the expiration of a timeout, it was serving with a zero timeout, | ||
leading to increased CPU usage until the timeout was reached. | ||
Happening since 1.x (#4660). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, it's mildly confusing that all these bullets point to #4660 (this PR), instead of to the actual bug reports about the bugs being described. Do such bug reports exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually put the id of the PR containing the changes there, then the PR needs to link to the corresponding issues that closes. I update this PR
src/rdkafka.c
Outdated
int timeout_ms = (int)(sleeptime / 1000); | ||
if (sleeptime % 1000 > 0) | ||
timeout_ms++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same as
int timeout_ms = (int)(sleeptime / 1000) + (sleeptime % 1000 > 0);
or perhaps even better,
int timeout_ms = ((sleeptime + 999) / 1000);
(The cast to int
probably isn't needed, unless it's suppressing a compiler warning.)
Also, this diff seems totally unrelated to the "metadata cache" stuff in this PR. This should be a separate commit with a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I think I'll use the second one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting it in a separate commit too
*/ | ||
struct rd_kafka_metadata_cache_entry * | ||
rd_kafka_metadata_cache_find_by_id(rd_kafka_t *rk, | ||
const rd_kafka_Uuid_t topic_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const rd_kafka_Uuid_t
passes "by const value," which is never correct. Surely the compiler warned about this?
Everywhere else in librdkafka that takes rd_kafka_Uuid_t
either takes it by pointer to avoid a copy — const rd_kafka_Uuid_t*
— which is probably what you want here, since you never use the parameter's own value and it's large — or takes it by "non-const value" in two places (rd_kafka_topic_partition_new_with_topic_id
and rd_kafka_Uuid_cmp
: the latter is inline and the former looks like a flaw to me but it's part of the public API now).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internally we pass the Uuid by struct copy, as a primitive value, to avoid more heap allocations. Here the const isn't strictly necessary, but it doesn't hurt as that parameter shouldn't be modified inside the function
src/rdkafka_mock.c
Outdated
} | ||
|
||
/** | ||
* @brief First forced partition leader response in passed \p mpart . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I don't understand this grammar. Does it mean \p mpart holds the first forced response from the partition leader
, or I guess more likely Fetch the first partition leader response into \p mpart
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's:
Return the first mocked partition leader response in \p mpart , if available
bc427e2
to
117d317
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed few files which I understood. Reviewing more. Please check comments in the original PR as well.
src/rdkafka_cgrp.c
Outdated
/* When consumer group isn't in join state WAIT_METADATA | ||
* or STEADY, it's possible that some topics aren't present | ||
* in cache and not hinted. | ||
* It uses metadata cache only if it's not a wildcard | ||
* subscription, so skip only those cases. See issue #4589. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rephrasing is required for this comment. I had to dig deeper into the code to understand it fully. Maybe we can explain the conditions in order.
src/rdkafka_cgrp.c
Outdated
* subscription, so skip only those cases. See issue #4589. */ | ||
rd_kafka_dbg( | ||
rkcg->rkcg_rk, | ||
CGRP | RD_KAFKA_DBG_METADATA | RD_KAFKA_DBG_CONSUMER, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think RD_KAFKA_DBG_METADATA is required here as this is triggered for cgrp check. This is valid only for cgrp and consumer.
You can keep it as well to be consistent with other places in the function though.
src/rdkafka_cgrp.c
Outdated
rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA && | ||
rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you confirm if this is retried if the state is different from these? This function is being called from multiple places with multiple scenarios. Please check the correctness in all those scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sure we know that in states RD_KAFKA_CGRP_JOIN_STATE_WAIT_INIT
and RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN
it can cause the issue #4589 . I was also considering to put rkcg_join_state >= RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA
but doing these updates with a stable group seems more error proof.
In RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA
it's required because there are cases where it's waiting exactly this update.
For a regex subscription it's required in state INIT too, because it has to find the topics to include in the JoinGroup
request, but in that case it doesn't have the problem with metadata cache.
const rd_list_t *requested_topics = request_topics; | ||
const rd_list_t *requested_topic_ids = NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation works fine if only topic names or topic ids are provided, not both. Let's check in the metadata request about this condition and update the docs accordingly.
In this way we can generalize handling of these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the documentation for the three versions of MetadataRequests
/* Only update cache when not asking | ||
* for all topics or cache entry | ||
* already exists. */ | ||
rd_kafka_wrlock(rk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the all_topics case, we will unnecessarily lock and unlock rk even if the topic is not present in the cache. Let's check if the cache has the topic or not. We should lock only if the topic is present in the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To call rd_kafka_metadata_cache_find
and rd_kafka_metadata_cache_find_by_id
the lock is required because it's reading the AVL tree, it could use a read lock but that cannot be upgraded to a write lock and if releasing it, after acquiring the write lock it should have to check again if the entry was deleted in the meantime.
It can be an optimization to increase concurrent reads
* Mark the topic as non-existent */ | ||
rd_kafka_topic_wrlock(rkt); | ||
rd_kafka_topic_set_notexists( | ||
rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be set to UNKNOWN_TOPIC_ID.
A concern regarding this new error code:- Are we handling it properly in fetch flow? I don't remember we added this in fetch flow. Is it part of 951 prework? With this new error code being added in the rkt, we should handle it properly as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the local error _UNKNOWN_TOPIC
, not the broker one UNKNOWN_TOPIC_OR_PART
or UNKNOWN_TOPIC_ID
so it's correct to use the same code to signal that the topic is unknown, independently of which parameter was used to query for it
src/rdkafka_metadata.c
Outdated
/* Remove cache hints for the originally requested topics. */ | ||
if (requested_topics) | ||
rd_kafka_metadata_cache_purge_hints(rk, requested_topics); | ||
if (requested_topic_ids) | ||
rd_kafka_metadata_cache_purge_hints(rk, requested_topic_ids); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rd_kafka_metadata_cache_purge_hints_by_id
|
||
if (cache_changes) { | ||
rd_kafka_metadata_cache_propagate_changes(rk); | ||
rd_kafka_metadata_cache_expiry_start(rk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Do you mean to include expiry start call inside the if condition? Earlier it was outside.
- What happens to the cache entries which were not updated? Do we restart the timer for them as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it's restarting the timer if some entries have been updated, the timer is calculated from the expiry time of the first element, the one that expires first. Earlier was outside just in this code, in trunk it's done only if !all_topics
but now existing entries can be updated in that case.
f405243
to
4dd41b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some review for Cache.
* @brief Remove all cache hints,. | ||
* This is done when the Metadata response has been parsed and | ||
* replaced hints with existing topic information, thus this will | ||
* only remove unmatched topics from the cache. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this more clear? This is difficult to understand.
@@ -247,7 +254,7 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert( | |||
rd_bool_t include_racks, | |||
rd_kafka_metadata_broker_internal_t *brokers_internal, | |||
size_t broker_cnt) { | |||
struct rd_kafka_metadata_cache_entry *rkmce, *old; | |||
struct rd_kafka_metadata_cache_entry *rkmce, *old, *old_by_id = NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think old and old_by_id would never be not null and different. In the NULL old_by_id
case, do we need different handling?
@@ -350,8 +357,28 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert( | |||
/* Insert (and replace existing) entry. */ | |||
old = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl, rkmce, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the deleted topic case, topic name might not be present if the request is for the topic id. Are we handling this case and how?
rd_kafka_metadata_cache_delete(rk, old, 0); | ||
} | ||
if (old_by_id && old_by_id != old) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
old
gets free in the earlier step with rd_kafka_metadata_cache_delete
. This statement will cause a SegFault.
RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl_by_id, old); | ||
} | ||
if (old) { | ||
/* Delete and free old cache entry */ | ||
rd_kafka_metadata_cache_delete(rk, old, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function free the cache entry. We should carefully check if this is being used correctly after the new changes as we are inserting same cache entry in two avls. Deleting should be done from both the avls, otherwise there will be SegFault in the avl still referring to it.
- metadata doesn't persist after full metadata refresh - fast metadata refresh doesn't stop when no leader change happened - stale metadata doesn't migrate back the partition to corresponding broker while validating an offset - a metadata call for an existing topic, just after subscription, must not cause a UNKNOWN_TOPIC_OR_PART error
fixes for failing metadata tests: - cache is updated on full metadata refresh and not cleared - Unknown topic happening before joining the consumer group - fast metadata refresh stops without leader change when there are no stale leader epochs - handling broker isn't updated on stale leader epoch
- changed fix for do_test_metadata_call_before_join - removed borrowing of cached topic name pointer
d2444a7
to
d25149b
Compare
Metadata cache by topic id is added, with an AVL tree having a Uuid as key and the same struct as value.
General fixes
refreshes and occasional
UNKNOWN_TOPIC_OR_PART
errors. Solved by updatingcache for existing or hinted entries instead of clearing them.
Happening since 2.1.0.
could lead to an
UNKNOWN_TOPIC_OR_PART
error. Solved by updatingthe consumer group following a metadata refresh only in safe states.
Happening since 2.1.0.
metadata calls at fixed intervals. Solved by stopping metadata refresh when
all existing metadata is non-stale. Happening since 2.3.0.
was undergoing a validation and being retried because of an error.
Solved by doing a partition migration only with a non-stale leader epoch.
Happening since 2.1.0.