Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for a wrong error returned on full metadata refresh before joining a consumer group #4678

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ librdkafka v2.4.0 is a feature release:
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Fix to metadata cache expiration on full metadata refresh (#4677).
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).


## Upgrade considerations
Expand All @@ -32,29 +34,40 @@ librdkafka v2.4.0 is a feature release:

### General fixes

* In librdkafka release pipeline a static build containing libsasl2
* Issues: [confluentinc/confluent-kafka-go#981](https://github.com/confluentinc/confluent-kafka-go/issues/981).
In librdkafka release pipeline a static build containing libsasl2
could be chosen instead of the alternative one without it.
That caused the libsasl2 dependency to be required in confluent-kafka-go
v2.1.0-linux-musl-arm64 and v2.3.0-linux-musl-arm64.
Solved by correctly excluding the binary configured with that library,
when targeting a static build.
Happening since v2.0.2, with specified platforms, when using static binaries (#4666).
* When the main thread loop was awakened less than 1 ms
Happening since v2.0.2, with specified platforms,
when using static binaries (#4666).
* Issues: #4684.
When the main thread loop was awakened less than 1 ms
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x (#4671).
* Metadata cache was cleared on full metadata refresh, leading to unnecessary
Happening since 1.x.
* Issues: #4685.
Metadata cache was cleared on full metadata refresh, leading to unnecessary
refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
cache for existing or hinted entries instead of clearing them.
Happening since 2.1.0 (#4677).
* Issues: #4589.
A metadata call before member joins consumer group,
could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating
the consumer group following a metadata refresh only in safe states.
Happening since 2.1.0 (#4678).

### Consumer fixes

* In case of subscription change with a consumer using the cooperative assignor
* Issues: #4686.
In case of subscription change with a consumer using the cooperative assignor
it could resume fetching from a previous position.
That could also happen if resuming a partition that wasn't paused.
Fixed by ensuring that a resume operation is completely a no-op when
the partition isn't paused (#4636).
the partition isn't paused.
Happening since 1.x (#4636).



Expand Down
23 changes: 18 additions & 5 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -5005,6 +5005,20 @@ rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
return revoking;
}

static void
rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t *rktparlist) {
rkcg->rkcg_subscription = rktparlist;
if (rkcg->rkcg_subscription) {
/* Insert all non-wildcard topics in cache immediately.
* Otherwise a manual full metadata request could
* not cache the hinted topic and return an
* UNKNOWN_TOPIC_OR_PART error to the user. See #4589. */
rd_kafka_metadata_cache_hint_rktparlist(
rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL,
0 /*dont replace*/);
}
}

/**
* @brief Handle a new subscription that is modifying an existing subscription
Expand Down Expand Up @@ -5037,7 +5051,7 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
rkcg, unsubscribing_topics);

rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
rkcg->rkcg_subscription = rktparlist;
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);

if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
"modify subscription") == 1) {
Expand Down Expand Up @@ -5146,7 +5160,7 @@ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg,

if (rkcg->rkcg_subscription) {
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
rkcg->rkcg_subscription = NULL;
rd_kafka_cgrp_subscription_set(rkcg, NULL);
}

if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_GENERIC)
Expand Down Expand Up @@ -5244,7 +5258,7 @@ rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg,
if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;

rkcg->rkcg_subscription = rktparlist;
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);

rd_kafka_cgrp_join(rkcg);

Expand Down Expand Up @@ -5909,8 +5923,7 @@ rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg,
RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE |
RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION;

rkcg->rkcg_subscription = rktparlist;

rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
} else {
rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*leave group*/);
Expand Down
42 changes: 41 additions & 1 deletion tests/0146-metadata_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,53 @@ static void do_test_metadata_persists_in_cache(const char *assignor) {

SUB_TEST_PASS();
}
/**
* @brief A metadata call for an existing topic, just after subscription,
* must not cause a UNKNOWN_TOPIC_OR_PART error.
* See issue #4589.
*/
static void do_test_metadata_call_before_join(void) {
rd_kafka_t *rk;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
rd_kafka_conf_t *conf;
const struct rd_kafka_metadata *metadata;

SUB_TEST_QUICK();

mcluster = test_mock_cluster_new(3, &bootstraps);
rd_kafka_mock_topic_create(mcluster, topic, 1, 3);

test_conf_init(&conf, NULL, 10);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "group.id", topic);

rk = test_create_handle(RD_KAFKA_CONSUMER, conf);

test_consumer_subscribe(rk, topic);

TEST_CALL_ERR__(rd_kafka_metadata(rk, 1, 0, &metadata, 5000));
rd_kafka_metadata_destroy(metadata);

test_consumer_poll_no_msgs("no errors", rk, 0, 1000);

rd_kafka_destroy(rk);
test_mock_cluster_destroy(mcluster);

SUB_TEST_PASS();
}

int main_0146_metadata_mock(int argc, char **argv) {
TEST_SKIP_MOCK_CLUSTER(0);

/* No need to test the "roundrobin" assignor case,
* as this is just for checking the two code paths:
* EAGER or COOPERATIVE one, and "range" is EAGER too. */
do_test_metadata_persists_in_cache("range");

do_test_metadata_persists_in_cache("cooperative-sticky");

do_test_metadata_call_before_join();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add extra line before this.


return 0;
}