Skip to content

Commit

Permalink
Fix for a wrong error returned on full metadata refresh before joinin…
Browse files Browse the repository at this point in the history
…g a consumer group (#4678)

A metadata call before member joins
consumer group, could lead to
an `UNKNOWN_TOPIC_OR_PART` error.
Solved by updating the consumer group
following a metadata refresh only in safe states.
Happening since 2.1.0
  • Loading branch information
emasab authored and anchitj committed Jun 10, 2024
1 parent 550b40d commit c0434f7
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 13 deletions.
27 changes: 20 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ librdkafka v2.4.0 is a feature release:
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Fix to metadata cache expiration on full metadata refresh (#4677).
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).


## Upgrade considerations
Expand All @@ -32,29 +34,40 @@ librdkafka v2.4.0 is a feature release:

### General fixes

* In librdkafka release pipeline a static build containing libsasl2
* Issues: [confluentinc/confluent-kafka-go#981](https://github.com/confluentinc/confluent-kafka-go/issues/981).
In librdkafka release pipeline a static build containing libsasl2
could be chosen instead of the alternative one without it.
That caused the libsasl2 dependency to be required in confluent-kafka-go
v2.1.0-linux-musl-arm64 and v2.3.0-linux-musl-arm64.
Solved by correctly excluding the binary configured with that library,
when targeting a static build.
Happening since v2.0.2, with specified platforms, when using static binaries (#4666).
* When the main thread loop was awakened less than 1 ms
Happening since v2.0.2, with specified platforms,
when using static binaries (#4666).
* Issues: #4684.
When the main thread loop was awakened less than 1 ms
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x (#4671).
* Metadata cache was cleared on full metadata refresh, leading to unnecessary
Happening since 1.x.
* Issues: #4685.
Metadata cache was cleared on full metadata refresh, leading to unnecessary
refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
cache for existing or hinted entries instead of clearing them.
Happening since 2.1.0 (#4677).
* Issues: #4589.
A metadata call before member joins consumer group,
could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating
the consumer group following a metadata refresh only in safe states.
Happening since 2.1.0 (#4678).

### Consumer fixes

* In case of subscription change with a consumer using the cooperative assignor
* Issues: #4686.
In case of subscription change with a consumer using the cooperative assignor
it could resume fetching from a previous position.
That could also happen if resuming a partition that wasn't paused.
Fixed by ensuring that a resume operation is completely a no-op when
the partition isn't paused (#4636).
the partition isn't paused.
Happening since 1.x (#4636).



Expand Down
23 changes: 18 additions & 5 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -5005,6 +5005,20 @@ rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
return revoking;
}

static void
rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t *rktparlist) {
rkcg->rkcg_subscription = rktparlist;
if (rkcg->rkcg_subscription) {
/* Insert all non-wildcard topics in cache immediately.
* Otherwise a manual full metadata request could
* not cache the hinted topic and return an
* UNKNOWN_TOPIC_OR_PART error to the user. See #4589. */
rd_kafka_metadata_cache_hint_rktparlist(
rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL,
0 /*dont replace*/);
}
}

/**
* @brief Handle a new subscription that is modifying an existing subscription
Expand Down Expand Up @@ -5037,7 +5051,7 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
rkcg, unsubscribing_topics);

rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
rkcg->rkcg_subscription = rktparlist;
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);

if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
"modify subscription") == 1) {
Expand Down Expand Up @@ -5146,7 +5160,7 @@ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg,

if (rkcg->rkcg_subscription) {
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
rkcg->rkcg_subscription = NULL;
rd_kafka_cgrp_subscription_set(rkcg, NULL);
}

if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_GENERIC)
Expand Down Expand Up @@ -5244,7 +5258,7 @@ rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg,
if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;

rkcg->rkcg_subscription = rktparlist;
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);

rd_kafka_cgrp_join(rkcg);

Expand Down Expand Up @@ -5909,8 +5923,7 @@ rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg,
RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE |
RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION;

rkcg->rkcg_subscription = rktparlist;

rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
} else {
rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*leave group*/);
Expand Down
42 changes: 41 additions & 1 deletion tests/0146-metadata_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,53 @@ static void do_test_metadata_persists_in_cache(const char *assignor) {

SUB_TEST_PASS();
}
/**
* @brief A metadata call for an existing topic, just after subscription,
* must not cause a UNKNOWN_TOPIC_OR_PART error.
* See issue #4589.
*/
static void do_test_metadata_call_before_join(void) {
rd_kafka_t *rk;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
rd_kafka_conf_t *conf;
const struct rd_kafka_metadata *metadata;

SUB_TEST_QUICK();

mcluster = test_mock_cluster_new(3, &bootstraps);
rd_kafka_mock_topic_create(mcluster, topic, 1, 3);

test_conf_init(&conf, NULL, 10);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "group.id", topic);

rk = test_create_handle(RD_KAFKA_CONSUMER, conf);

test_consumer_subscribe(rk, topic);

TEST_CALL_ERR__(rd_kafka_metadata(rk, 1, 0, &metadata, 5000));
rd_kafka_metadata_destroy(metadata);

test_consumer_poll_no_msgs("no errors", rk, 0, 1000);

rd_kafka_destroy(rk);
test_mock_cluster_destroy(mcluster);

SUB_TEST_PASS();
}

int main_0146_metadata_mock(int argc, char **argv) {
TEST_SKIP_MOCK_CLUSTER(0);

/* No need to test the "roundrobin" assignor case,
* as this is just for checking the two code paths:
* EAGER or COOPERATIVE one, and "range" is EAGER too. */
do_test_metadata_persists_in_cache("range");

do_test_metadata_persists_in_cache("cooperative-sticky");

do_test_metadata_call_before_join();

return 0;
}

0 comments on commit c0434f7

Please sign in to comment.