From 550b40dbd231103d7ca3f3ebe53a229479bd6730 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 15 Apr 2024 09:38:33 +0200 Subject: [PATCH] Fix to metadata cache expiration on full metadata refresh (#4677) Metadata cache was cleared on full metadata refresh, leading to unnecessary refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating cache for existing or hinted entries instead of clearing them. Happening since 2.1.0 --- CHANGELOG.md | 5 ++ src/rdkafka_metadata.c | 53 ++++++++++--------- src/rdkafka_metadata.h | 7 +-- src/rdkafka_metadata_cache.c | 66 +++++++++++------------- src/rdkafka_topic.c | 2 +- tests/0146-metadata_mock.c | 98 ++++++++++++++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/cluster_testing.py | 11 +--- tests/test.c | 2 + win32/tests/tests.vcxproj | 1 + 10 files changed, 170 insertions(+), 76 deletions(-) create mode 100644 tests/0146-metadata_mock.c diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ea5c674fc..1b7a5f09f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ librdkafka v2.4.0 is a feature release: * [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) Continue partial implementation by adding a metadata cache by topic id and updating the topic id corresponding to the partition name (#4676) + * Fix to metadata cache expiration on full metadata refresh (#4677). ## Upgrade considerations @@ -42,6 +43,10 @@ librdkafka v2.4.0 is a feature release: before the expiration of a timeout, it was serving with a zero timeout, leading to increased CPU usage until the timeout was reached. Happening since 1.x (#4671). + * Metadata cache was cleared on full metadata refresh, leading to unnecessary + refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating + cache for existing or hinted entries instead of clearing them. + Happening since 2.1.0 (#4677). ### Consumer fixes diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index bc8e5bc5ee..7e9c90376d 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -490,7 +490,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; int broker_changes = 0; int cache_changes = 0; - rd_ts_t ts_start = rd_clock(); + /* If client rack is present, the metadata cache (topic or full) needs * to contain the partition to rack map. */ rd_bool_t has_client_rack = rk->rk_conf.client_rack && @@ -850,23 +850,24 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_list_remove_cmp(missing_topic_ids, &mdi->topics[i].topic_id, (void *)rd_kafka_Uuid_ptr_cmp)); - if (!all_topics) { - /* Only update cache when not asking - * for all topics. */ - - rd_kafka_wrlock(rk); - rd_kafka_metadata_cache_topic_update( - rk, &md->topics[i], &mdi->topics[i], - rd_false /*propagate later*/, - /* use has_client_rack rather than - compute_racks. We need cached rack ids - only in case we need to rejoin the group - if they change and client.rack is set - (KIP-881). */ - has_client_rack, mdi->brokers, md->broker_cnt); - cache_changes++; - rd_kafka_wrunlock(rk); - } + /* Only update cache when not asking + * for all topics or cache entry + * already exists. */ + rd_kafka_wrlock(rk); + cache_changes += + rd_kafka_metadata_cache_topic_update( + rk, &md->topics[i], &mdi->topics[i], + rd_false /*propagate later*/, + /* use has_client_rack rather than + compute_racks. We need cached rack ids + only in case we need to rejoin the group + if they change and client.rack is set + (KIP-881). */ + has_client_rack, mdi->brokers, + md->broker_cnt, + all_topics /*cache entry needs to exist + *if all_topics*/); + rd_kafka_wrunlock(rk); } /* Requested topics not seen in metadata? Propogate to topic code. */ @@ -979,9 +980,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } if (all_topics) { - /* Expire all cache entries that were not updated. */ - rd_kafka_metadata_cache_evict_by_age(rkb->rkb_rk, ts_start); - + /* All hints have been replaced by the corresponding entry. + * Rest of hints can be removed as topics aren't present + * in full metadata. */ + rd_kafka_metadata_cache_purge_all_hints(rkb->rkb_rk); if (rkb->rkb_rk->rk_full_metadata) rd_kafka_metadata_destroy( &rkb->rkb_rk->rk_full_metadata->metadata); @@ -1001,10 +1003,6 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, "Caching full metadata with " "%d broker(s) and %d topic(s): %s", md->broker_cnt, md->topic_cnt, reason); - } else { - if (cache_changes) - rd_kafka_metadata_cache_propagate_changes(rk); - rd_kafka_metadata_cache_expiry_start(rk); } /* Remove cache hints for the originally requested topics. */ if (requested_topics) @@ -1013,6 +1011,11 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_metadata_cache_purge_hints_by_id(rk, requested_topic_ids); + if (cache_changes) { + rd_kafka_metadata_cache_propagate_changes(rk); + rd_kafka_metadata_cache_expiry_start(rk); + } + rd_kafka_wrunlock(rkb->rkb_rk); if (broker_changes) { diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 495ca6436e..b0926845ef 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -274,15 +274,16 @@ int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic); int rd_kafka_metadata_cache_delete_by_topic_id(rd_kafka_t *rk, const rd_kafka_Uuid_t topic_id); void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk); -int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts); -void rd_kafka_metadata_cache_topic_update( +int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk); +int rd_kafka_metadata_cache_topic_update( rd_kafka_t *rk, const rd_kafka_metadata_topic_t *mdt, const rd_kafka_metadata_topic_internal_t *mdit, rd_bool_t propagate, rd_bool_t include_metadata, rd_kafka_metadata_broker_internal_t *brokers, - size_t broker_cnt); + size_t broker_cnt, + rd_bool_t only_existing); void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk); struct rd_kafka_metadata_cache_entry * rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid); diff --git a/src/rdkafka_metadata_cache.c b/src/rdkafka_metadata_cache.c index 75e39134f9..d4c93cd11c 100644 --- a/src/rdkafka_metadata_cache.c +++ b/src/rdkafka_metadata_cache.c @@ -182,45 +182,27 @@ static int rd_kafka_metadata_cache_evict(rd_kafka_t *rk) { /** - * @brief Evict timed out entries from cache based on their insert/update time - * rather than expiry time. Any entries older than \p ts will be evicted. + * @brief Remove all cache hints,. + * This is done when the Metadata response has been parsed and + * replaced hints with existing topic information, thus this will + * only remove unmatched topics from the cache. * - * @returns the number of entries evicted. + * @returns the number of purged hints * * @locks_required rd_kafka_wrlock() */ -int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts) { +int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk) { int cnt = 0; struct rd_kafka_metadata_cache_entry *rkmce, *tmp; TAILQ_FOREACH_SAFE(rkmce, &rk->rk_metadata_cache.rkmc_expiry, rkmce_link, tmp) { - if (rkmce->rkmce_ts_insert <= ts) { + if (!RD_KAFKA_METADATA_CACHE_VALID(rkmce)) { rd_kafka_metadata_cache_delete(rk, rkmce, 1); cnt++; } } - /* Update expiry timer */ - rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry); - if (rkmce) - rd_kafka_timer_start(&rk->rk_timers, - &rk->rk_metadata_cache.rkmc_expiry_tmr, - rkmce->rkmce_ts_expires - rd_clock(), - rd_kafka_metadata_cache_evict_tmr_cb, rk); - else - rd_kafka_timer_stop(&rk->rk_timers, - &rk->rk_metadata_cache.rkmc_expiry_tmr, 1); - - rd_kafka_dbg(rk, METADATA, "METADATA", - "Expired %d entries older than %dms from metadata cache " - "(%d entries remain)", - cnt, (int)((rd_clock() - ts) / 1000), - rk->rk_metadata_cache.rkmc_cnt); - - if (cnt) - rd_kafka_metadata_cache_propagate_changes(rk); - return cnt; } @@ -474,6 +456,9 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) { * For permanent errors (authorization failures), we keep * the entry cached for metadata.max.age.ms. * + * @param only_existing Update only existing metadata cache entries, + * either valid or hinted. + * * @return 1 on metadata change, 0 when no change was applied * * @remark The cache expiry timer will not be updated/started, @@ -481,31 +466,31 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) { * * @locks rd_kafka_wrlock() */ -void rd_kafka_metadata_cache_topic_update( +int rd_kafka_metadata_cache_topic_update( rd_kafka_t *rk, const rd_kafka_metadata_topic_t *mdt, const rd_kafka_metadata_topic_internal_t *mdit, rd_bool_t propagate, rd_bool_t include_racks, rd_kafka_metadata_broker_internal_t *brokers, - size_t broker_cnt) { + size_t broker_cnt, + rd_bool_t only_existing) { struct rd_kafka_metadata_cache_entry *rkmce = NULL; rd_ts_t now = rd_clock(); rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000); int changed = 1; - if (unlikely(!mdt->topic)) { - rkmce = - rd_kafka_metadata_cache_find_by_id(rk, mdit->topic_id, 1); + if (only_existing) { + if (likely(mdt->topic != NULL)) { + rkmce = rd_kafka_metadata_cache_find(rk, mdt->topic, 0); + } else { + rkmce = rd_kafka_metadata_cache_find_by_id( + rk, mdit->topic_id, 1); + } if (!rkmce) - return; + return 0; } - if (unlikely(!mdt->topic)) { - /* Cache entry found but no topic name: - * delete it. */ - changed = rd_kafka_metadata_cache_delete_by_topic_id( - rk, mdit->topic_id); - } else { + if (likely(mdt->topic != NULL)) { /* Cache unknown topics for a short while (100ms) to allow the * cgrp logic to find negative cache hits. */ if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) @@ -520,10 +505,17 @@ void rd_kafka_metadata_cache_topic_update( else changed = rd_kafka_metadata_cache_delete_by_name( rk, mdt->topic); + } else { + /* Cache entry found but no topic name: + * delete it. */ + changed = rd_kafka_metadata_cache_delete_by_topic_id( + rk, mdit->topic_id); } if (changed && propagate) rd_kafka_metadata_cache_propagate_changes(rk); + + return changed; } diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index afdb3b5bf5..80e7a14780 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -2056,7 +2056,7 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, rd_kafka_wrlock(rkt->rkt_rk); rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true, - rd_false, NULL, 0); + rd_false, NULL, 0, rd_false); rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock()); rd_kafka_wrunlock(rkt->rkt_rk); rd_free(partitions); diff --git a/tests/0146-metadata_mock.c b/tests/0146-metadata_mock.c new file mode 100644 index 0000000000..56f5b81f8c --- /dev/null +++ b/tests/0146-metadata_mock.c @@ -0,0 +1,98 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2024, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + + +/** + * @brief Metadata should persists in cache after + * a full metadata refresh. + * + * @param assignor Assignor to use + */ +static void do_test_metadata_persists_in_cache(const char *assignor) { + rd_kafka_t *rk; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + rd_kafka_conf_t *conf; + rd_kafka_topic_t *rkt; + const rd_kafka_metadata_t *md; + rd_kafka_topic_partition_list_t *subscription; + + SUB_TEST_QUICK("%s", assignor); + + mcluster = test_mock_cluster_new(3, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 1); + + test_conf_init(&conf, NULL, 10); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "partition.assignment.strategy", assignor); + test_conf_set(conf, "group.id", topic); + + rk = test_create_handle(RD_KAFKA_CONSUMER, conf); + + subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, topic, 0); + + rkt = test_create_consumer_topic(rk, topic); + + /* Metadata for topic is available */ + TEST_CALL_ERR__(rd_kafka_metadata(rk, 0, rkt, &md, 1000)); + rd_kafka_metadata_destroy(md); + md = NULL; + + /* Subscribe to same topic */ + TEST_CALL_ERR__(rd_kafka_subscribe(rk, subscription)); + + /* Request full metadata */ + TEST_CALL_ERR__(rd_kafka_metadata(rk, 1, NULL, &md, 1000)); + rd_kafka_metadata_destroy(md); + md = NULL; + + /* Subscribing shouldn't give UNKNOWN_TOPIC_OR_PART err. + * Verify no error was returned. */ + test_consumer_poll_no_msgs("no error", rk, 0, 100); + + rd_kafka_topic_partition_list_destroy(subscription); + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +int main_0146_metadata_mock(int argc, char **argv) { + TEST_SKIP_MOCK_CLUSTER(0); + + do_test_metadata_persists_in_cache("range"); + + do_test_metadata_persists_in_cache("cooperative-sticky"); + + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 30a1363b27..62ce0deb02 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -136,6 +136,7 @@ set( 0143-exponential_backoff_mock.c 0144-idempotence_mock.c 0145-pause_resume_mock.c + 0146-metadata_mock.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c test.c diff --git a/tests/cluster_testing.py b/tests/cluster_testing.py index 86d3d91248..d3189f1cdb 100755 --- a/tests/cluster_testing.py +++ b/tests/cluster_testing.py @@ -9,7 +9,7 @@ from trivup.trivup import Cluster from trivup.apps.ZookeeperApp import ZookeeperApp -from trivup.apps.KafkaBrokerApp import KafkaBrokerApp as KafkaBrokerAppOrig +from trivup.apps.KafkaBrokerApp import KafkaBrokerApp from trivup.apps.KerberosKdcApp import KerberosKdcApp from trivup.apps.SslApp import SslApp from trivup.apps.OauthbearerOIDCApp import OauthbearerOIDCApp @@ -35,15 +35,6 @@ def read_scenario_conf(scenario): return parser.load(f) -# FIXME: merge in trivup -class KafkaBrokerApp(KafkaBrokerAppOrig): - def _add_simple_authorizer(self, conf_blob): - conf_blob.append( - 'authorizer.class.name=' + - 'org.apache.kafka.metadata.authorizer.StandardAuthorizer') - conf_blob.append('super.users=User:ANONYMOUS') - - class LibrdkafkaTestCluster(Cluster): def __init__(self, version, conf={}, num_brokers=3, debug=False, scenario="default", kraft=False): diff --git a/tests/test.c b/tests/test.c index 39502d5957..dc312467da 100644 --- a/tests/test.c +++ b/tests/test.c @@ -260,6 +260,7 @@ _TEST_DECL(0142_reauthentication); _TEST_DECL(0143_exponential_backoff_mock); _TEST_DECL(0144_idempotence_mock); _TEST_DECL(0145_pause_resume_mock); +_TEST_DECL(0146_metadata_mock); /* Manual tests */ _TEST_DECL(8000_idle); @@ -516,6 +517,7 @@ struct test tests[] = { _TEST(0143_exponential_backoff_mock, TEST_F_LOCAL), _TEST(0144_idempotence_mock, TEST_F_LOCAL, TEST_BRKVER(0, 11, 0, 0)), _TEST(0145_pause_resume_mock, TEST_F_LOCAL), + _TEST(0146_metadata_mock, TEST_F_LOCAL), /* Manual tests */ diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index f9ffa00d0a..a354f278f8 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -226,6 +226,7 @@ +