Skip to content

Commit

Permalink
Fix to metadata cache expiration on full metadata refresh (#4677)
Browse files Browse the repository at this point in the history
Metadata cache was cleared on full metadata
refresh, leading to unnecessary refreshes and
occasional `UNKNOWN_TOPIC_OR_PART` errors.
Solved by updating cache for existing or
hinted entries instead of clearing them.
Happening since 2.1.0
  • Loading branch information
emasab authored and anchitj committed Jun 10, 2024
1 parent b718319 commit 550b40d
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 76 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ librdkafka v2.4.0 is a feature release:
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Fix to metadata cache expiration on full metadata refresh (#4677).


## Upgrade considerations
Expand Down Expand Up @@ -42,6 +43,10 @@ librdkafka v2.4.0 is a feature release:
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x (#4671).
* Metadata cache was cleared on full metadata refresh, leading to unnecessary
refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
cache for existing or hinted entries instead of clearing them.
Happening since 2.1.0 (#4677).

### Consumer fixes

Expand Down
53 changes: 28 additions & 25 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
int broker_changes = 0;
int cache_changes = 0;
rd_ts_t ts_start = rd_clock();

/* If client rack is present, the metadata cache (topic or full) needs
* to contain the partition to rack map. */
rd_bool_t has_client_rack = rk->rk_conf.client_rack &&
Expand Down Expand Up @@ -850,23 +850,24 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_list_remove_cmp(missing_topic_ids,
&mdi->topics[i].topic_id,
(void *)rd_kafka_Uuid_ptr_cmp));
if (!all_topics) {
/* Only update cache when not asking
* for all topics. */

rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers, md->broker_cnt);
cache_changes++;
rd_kafka_wrunlock(rk);
}
/* Only update cache when not asking
* for all topics or cache entry
* already exists. */
rd_kafka_wrlock(rk);
cache_changes +=
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers,
md->broker_cnt,
all_topics /*cache entry needs to exist
*if all_topics*/);
rd_kafka_wrunlock(rk);
}

/* Requested topics not seen in metadata? Propogate to topic code. */
Expand Down Expand Up @@ -979,9 +980,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}

if (all_topics) {
/* Expire all cache entries that were not updated. */
rd_kafka_metadata_cache_evict_by_age(rkb->rkb_rk, ts_start);

/* All hints have been replaced by the corresponding entry.
* Rest of hints can be removed as topics aren't present
* in full metadata. */
rd_kafka_metadata_cache_purge_all_hints(rkb->rkb_rk);
if (rkb->rkb_rk->rk_full_metadata)
rd_kafka_metadata_destroy(
&rkb->rkb_rk->rk_full_metadata->metadata);
Expand All @@ -1001,10 +1003,6 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
"Caching full metadata with "
"%d broker(s) and %d topic(s): %s",
md->broker_cnt, md->topic_cnt, reason);
} else {
if (cache_changes)
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}
/* Remove cache hints for the originally requested topics. */
if (requested_topics)
Expand All @@ -1013,6 +1011,11 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_metadata_cache_purge_hints_by_id(rk,
requested_topic_ids);

if (cache_changes) {
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}

rd_kafka_wrunlock(rkb->rkb_rk);

if (broker_changes) {
Expand Down
7 changes: 4 additions & 3 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,16 @@ int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic);
int rd_kafka_metadata_cache_delete_by_topic_id(rd_kafka_t *rk,
const rd_kafka_Uuid_t topic_id);
void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk);
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts);
void rd_kafka_metadata_cache_topic_update(
int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk);
int rd_kafka_metadata_cache_topic_update(
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
rd_bool_t propagate,
rd_bool_t include_metadata,
rd_kafka_metadata_broker_internal_t *brokers,
size_t broker_cnt);
size_t broker_cnt,
rd_bool_t only_existing);
void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid);
Expand Down
66 changes: 29 additions & 37 deletions src/rdkafka_metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,45 +182,27 @@ static int rd_kafka_metadata_cache_evict(rd_kafka_t *rk) {


/**
* @brief Evict timed out entries from cache based on their insert/update time
* rather than expiry time. Any entries older than \p ts will be evicted.
* @brief Remove all cache hints,.
* This is done when the Metadata response has been parsed and
* replaced hints with existing topic information, thus this will
* only remove unmatched topics from the cache.
*
* @returns the number of entries evicted.
* @returns the number of purged hints
*
* @locks_required rd_kafka_wrlock()
*/
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts) {
int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk) {
int cnt = 0;
struct rd_kafka_metadata_cache_entry *rkmce, *tmp;

TAILQ_FOREACH_SAFE(rkmce, &rk->rk_metadata_cache.rkmc_expiry,
rkmce_link, tmp) {
if (rkmce->rkmce_ts_insert <= ts) {
if (!RD_KAFKA_METADATA_CACHE_VALID(rkmce)) {
rd_kafka_metadata_cache_delete(rk, rkmce, 1);
cnt++;
}
}

/* Update expiry timer */
rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry);
if (rkmce)
rd_kafka_timer_start(&rk->rk_timers,
&rk->rk_metadata_cache.rkmc_expiry_tmr,
rkmce->rkmce_ts_expires - rd_clock(),
rd_kafka_metadata_cache_evict_tmr_cb, rk);
else
rd_kafka_timer_stop(&rk->rk_timers,
&rk->rk_metadata_cache.rkmc_expiry_tmr, 1);

rd_kafka_dbg(rk, METADATA, "METADATA",
"Expired %d entries older than %dms from metadata cache "
"(%d entries remain)",
cnt, (int)((rd_clock() - ts) / 1000),
rk->rk_metadata_cache.rkmc_cnt);

if (cnt)
rd_kafka_metadata_cache_propagate_changes(rk);

return cnt;
}

Expand Down Expand Up @@ -474,38 +456,41 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) {
* For permanent errors (authorization failures), we keep
* the entry cached for metadata.max.age.ms.
*
* @param only_existing Update only existing metadata cache entries,
* either valid or hinted.
*
* @return 1 on metadata change, 0 when no change was applied
*
* @remark The cache expiry timer will not be updated/started,
* call rd_kafka_metadata_cache_expiry_start() instead.
*
* @locks rd_kafka_wrlock()
*/
void rd_kafka_metadata_cache_topic_update(
int rd_kafka_metadata_cache_topic_update(
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
rd_bool_t propagate,
rd_bool_t include_racks,
rd_kafka_metadata_broker_internal_t *brokers,
size_t broker_cnt) {
size_t broker_cnt,
rd_bool_t only_existing) {
struct rd_kafka_metadata_cache_entry *rkmce = NULL;
rd_ts_t now = rd_clock();
rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
int changed = 1;
if (unlikely(!mdt->topic)) {
rkmce =
rd_kafka_metadata_cache_find_by_id(rk, mdit->topic_id, 1);
if (only_existing) {
if (likely(mdt->topic != NULL)) {
rkmce = rd_kafka_metadata_cache_find(rk, mdt->topic, 0);
} else {
rkmce = rd_kafka_metadata_cache_find_by_id(
rk, mdit->topic_id, 1);
}
if (!rkmce)
return;
return 0;
}

if (unlikely(!mdt->topic)) {
/* Cache entry found but no topic name:
* delete it. */
changed = rd_kafka_metadata_cache_delete_by_topic_id(
rk, mdit->topic_id);
} else {
if (likely(mdt->topic != NULL)) {
/* Cache unknown topics for a short while (100ms) to allow the
* cgrp logic to find negative cache hits. */
if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
Expand All @@ -520,10 +505,17 @@ void rd_kafka_metadata_cache_topic_update(
else
changed = rd_kafka_metadata_cache_delete_by_name(
rk, mdt->topic);
} else {
/* Cache entry found but no topic name:
* delete it. */
changed = rd_kafka_metadata_cache_delete_by_topic_id(
rk, mdit->topic_id);
}

if (changed && propagate)
rd_kafka_metadata_cache_propagate_changes(rk);

return changed;
}


Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -2056,7 +2056,7 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt,

rd_kafka_wrlock(rkt->rkt_rk);
rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true,
rd_false, NULL, 0);
rd_false, NULL, 0, rd_false);
rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock());
rd_kafka_wrunlock(rkt->rkt_rk);
rd_free(partitions);
Expand Down
98 changes: 98 additions & 0 deletions tests/0146-metadata_mock.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2024, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "test.h"


/**
* @brief Metadata should persists in cache after
* a full metadata refresh.
*
* @param assignor Assignor to use
*/
static void do_test_metadata_persists_in_cache(const char *assignor) {
rd_kafka_t *rk;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
rd_kafka_conf_t *conf;
rd_kafka_topic_t *rkt;
const rd_kafka_metadata_t *md;
rd_kafka_topic_partition_list_t *subscription;

SUB_TEST_QUICK("%s", assignor);

mcluster = test_mock_cluster_new(3, &bootstraps);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

test_conf_init(&conf, NULL, 10);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "partition.assignment.strategy", assignor);
test_conf_set(conf, "group.id", topic);

rk = test_create_handle(RD_KAFKA_CONSUMER, conf);

subscription = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subscription, topic, 0);

rkt = test_create_consumer_topic(rk, topic);

/* Metadata for topic is available */
TEST_CALL_ERR__(rd_kafka_metadata(rk, 0, rkt, &md, 1000));
rd_kafka_metadata_destroy(md);
md = NULL;

/* Subscribe to same topic */
TEST_CALL_ERR__(rd_kafka_subscribe(rk, subscription));

/* Request full metadata */
TEST_CALL_ERR__(rd_kafka_metadata(rk, 1, NULL, &md, 1000));
rd_kafka_metadata_destroy(md);
md = NULL;

/* Subscribing shouldn't give UNKNOWN_TOPIC_OR_PART err.
* Verify no error was returned. */
test_consumer_poll_no_msgs("no error", rk, 0, 100);

rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
test_mock_cluster_destroy(mcluster);

SUB_TEST_PASS();
}

int main_0146_metadata_mock(int argc, char **argv) {
TEST_SKIP_MOCK_CLUSTER(0);

do_test_metadata_persists_in_cache("range");

do_test_metadata_persists_in_cache("cooperative-sticky");

return 0;
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ set(
0143-exponential_backoff_mock.c
0144-idempotence_mock.c
0145-pause_resume_mock.c
0146-metadata_mock.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
test.c
Expand Down
11 changes: 1 addition & 10 deletions tests/cluster_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from trivup.trivup import Cluster
from trivup.apps.ZookeeperApp import ZookeeperApp
from trivup.apps.KafkaBrokerApp import KafkaBrokerApp as KafkaBrokerAppOrig
from trivup.apps.KafkaBrokerApp import KafkaBrokerApp
from trivup.apps.KerberosKdcApp import KerberosKdcApp
from trivup.apps.SslApp import SslApp
from trivup.apps.OauthbearerOIDCApp import OauthbearerOIDCApp
Expand All @@ -35,15 +35,6 @@ def read_scenario_conf(scenario):
return parser.load(f)


# FIXME: merge in trivup
class KafkaBrokerApp(KafkaBrokerAppOrig):
def _add_simple_authorizer(self, conf_blob):
conf_blob.append(
'authorizer.class.name=' +
'org.apache.kafka.metadata.authorizer.StandardAuthorizer')
conf_blob.append('super.users=User:ANONYMOUS')


class LibrdkafkaTestCluster(Cluster):
def __init__(self, version, conf={}, num_brokers=3, debug=False,
scenario="default", kraft=False):
Expand Down
Loading

0 comments on commit 550b40d

Please sign in to comment.