From 01d3b0134e5d29c525852a17e5cfb118b9a7ae16 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Thu, 25 May 2023 16:59:37 +0530 Subject: [PATCH 01/14] Add KIP-235 initial implementation --- src/rdaddr.h | 2 +- src/rdkafka_broker.c | 66 ++++++++++++++++++++++++++++++++++---------- src/rdkafka_conf.c | 6 ++++ src/rdkafka_conf.h | 1 + 4 files changed, 60 insertions(+), 15 deletions(-) diff --git a/src/rdaddr.h b/src/rdaddr.h index c8574d0194..4f03630c33 100644 --- a/src/rdaddr.h +++ b/src/rdaddr.h @@ -139,7 +139,7 @@ rd_sockaddr_list_next(rd_sockaddr_list_t *rsal) { #define RD_SOCKADDR_LIST_FOREACH(sinx, rsal) \ for ((sinx) = &(rsal)->rsal_addr[0]; \ - (sinx) < &(rsal)->rsal_addr[(rsal)->rsal_len]; (sinx)++) + (sinx) < &(rsal)->rsal_addr[(rsal)->rsal_cnt]; (sinx)++) /** * Wrapper for getaddrinfo(3) that performs these additional tasks: diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index cb319aec11..7ee33c133c 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -49,6 +49,7 @@ #include #include "rd.h" +#include "rdaddr.h" #include "rdkafka_int.h" #include "rdkafka_msg.h" #include "rdkafka_msgset.h" @@ -5256,6 +5257,28 @@ static int rd_kafka_broker_name_parse(rd_kafka_t *rk, return 0; } +/** + * @brief Add a broker from a string of type "[proto://]host[:port]" to the list of brokers. + * *cnt is increased by one if a broker was added, else not. + */ +void rd_kafka_find_or_add_broker(rd_kafka_t *rk, rd_kafka_secproto_t proto, const char *host, uint16_t port, int *cnt) { + rd_kafka_broker_t *rkb; + + if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) && + rkb->rkb_source == RD_KAFKA_CONFIGURED) { + (*cnt)++; + } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, + host, port, + RD_KAFKA_NODEID_UA) != NULL) + (*cnt)++; + + /* If rd_kafka_broker_find returned a broker its + * reference needs to be released + * See issue #193 */ + if (rkb) + rd_kafka_broker_destroy(rkb); +} + /** * @brief Adds a (csv list of) broker(s). * Returns the number of brokers succesfully added. @@ -5267,13 +5290,15 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { char *s_copy = rd_strdup(brokerlist); char *s = s_copy; int cnt = 0; - rd_kafka_broker_t *rkb; int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt); + rd_sockaddr_inx_t *sinx; + rd_sockaddr_list_t *sockaddrList; /* Parse comma-separated list of brokers. */ while (*s) { uint16_t port; const char *host; + const char *errstr; rd_kafka_secproto_t proto; if (*s == ',' || *s == ' ') { @@ -5286,20 +5311,33 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { break; rd_kafka_wrlock(rk); + if (rk->rk_conf.resolve_canonical_bootstrap_servers_only) { + sockaddrList = rd_getaddrinfo( + host, RD_KAFKA_PORT_STR, AI_ADDRCONFIG, + rk->rk_conf.broker_addr_family, SOCK_STREAM, + IPPROTO_TCP, rk->rk_conf.resolve_cb, + rk->rk_conf.opaque, &errstr); + + if (!sockaddrList) { + rd_kafka_log(rk, LOG_WARNING, "BROKER", + "Failed to resolve '%s': %s", host, + errstr); + rd_kafka_wrunlock(rk); + continue; + } - if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) && - rkb->rkb_source == RD_KAFKA_CONFIGURED) { - cnt++; - } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, - host, port, - RD_KAFKA_NODEID_UA) != NULL) - cnt++; - - /* If rd_kafka_broker_find returned a broker its - * reference needs to be released - * See issue #193 */ - if (rkb) - rd_kafka_broker_destroy(rkb); + RD_SOCKADDR_LIST_FOREACH(sinx, sockaddrList) { + const char *resolvedFQDN = rd_sockaddr2str( + sinx, RD_SOCKADDR2STR_F_RESOLVE); + rd_kafka_find_or_add_broker( + rk, proto, resolvedFQDN, port, &cnt); + }; + + rd_sockaddr_list_destroy(sockaddrList); + } else { + rd_kafka_find_or_add_broker(rk, proto, host, port, + &cnt); + } rd_kafka_wrunlock(rk); } diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index e5c1415fce..c584e10918 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1620,6 +1620,12 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "Maximum number of messages to dispatch in " "one `rd_kafka_consume_callback*()` call (0 = unlimited)", 0, 1000000, 0}, + {_RK_GLOBAL, "resolve.canonical.bootstrap.servers.only", _RK_C_BOOL, + _RK(resolve_canonical_bootstrap_servers_only), + "Resolve each bootstrap address into a list of canonical names." + "Default: false.", + 0, 1, 0}, + } {0, /* End */}}; diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index 2d625ce05f..01df21f3ec 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -224,6 +224,7 @@ struct rd_kafka_conf_s { int api_version_fallback_ms; char *broker_version_fallback; rd_kafka_secproto_t security_protocol; + int resolve_canonical_bootstrap_servers_only; struct { #if WITH_SSL From e74dca5a9b5cef53caad8aa6161e674587919c76 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Fri, 26 May 2023 14:11:14 +0530 Subject: [PATCH 02/14] Fix configuration bug --- CONFIGURATION.md | 7 +++++++ src/rdkafka_conf.c | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 9a0e7ab4c7..90a6ceb8b0 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -181,4 +181,11 @@ offset.store.sync.interval.ms | C | -1 .. 86400000 | -1 offset.store.method | C | file, broker | broker | low | **DEPRECATED** Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.).
*Type: enum value* consume.callback.max.messages | C | 0 .. 1000000 | 0 | low | Maximum number of messages to dispatch in one `rd_kafka_consume_callback*()` call (0 = unlimited)
*Type: integer* + +## Global configuration properties + +Property | C/P | Range | Default | Importance | Description +-----------------------------------------|-----|-----------------|--------------:|------------| -------------------------- +resolve.canonical.bootstrap.servers.only | * | true, false | false | low | Resolve each bootstrap address into a list of canonical names.Default: false.
*Type: boolean* + ### C/P legend: C = Consumer, P = Producer, * = both diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index c584e10918..3a8694704b 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1620,12 +1620,12 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "Maximum number of messages to dispatch in " "one `rd_kafka_consume_callback*()` call (0 = unlimited)", 0, 1000000, 0}, + {_RK_GLOBAL, "resolve.canonical.bootstrap.servers.only", _RK_C_BOOL, _RK(resolve_canonical_bootstrap_servers_only), "Resolve each bootstrap address into a list of canonical names." "Default: false.", 0, 1, 0}, - } {0, /* End */}}; From 2dc13640688a9cc9c60fba98159e4c27275c5c62 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Mon, 29 May 2023 16:01:58 +0530 Subject: [PATCH 03/14] Add basic test --- tests/0141-resolve_cname.c | 58 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 tests/0141-resolve_cname.c diff --git a/tests/0141-resolve_cname.c b/tests/0141-resolve_cname.c new file mode 100644 index 0000000000..2cd9c7b377 --- /dev/null +++ b/tests/0141-resolve_cname.c @@ -0,0 +1,58 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + +/** + * @brief Tests that the resolve.canonical.bootstrap.servers.only + * configuration property works as expected. + */ +void do_test_config_enabled() { + rd_kafka_conf_t *conf; + rd_kafka_t *rk; + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "resolve.canonical.bootstrap.servers.only", "true"); + + + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + if (rk == NULL) { + TEST_FAIL("Failed to create producer"); + return; + } + + if (rk) + rd_kafka_destroy(rk); + else + rd_kafka_conf_destroy(conf); + +} + +int main_0141_resolve_cname_bootstrap_servers(int argc, char **argv) { + do_test_config_enabled(); +} \ No newline at end of file From e068f41a3a0d338306faa960adf05f3f57e2f31a Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Mon, 29 May 2023 16:08:32 +0530 Subject: [PATCH 04/14] Fix bugs --- CONFIGURATION.md | 8 +------- src/rdkafka_conf.c | 11 +++++------ ...cname.c => 0141-resolve_cname_bootstrap_servers.c} | 2 +- 3 files changed, 7 insertions(+), 14 deletions(-) rename tests/{0141-resolve_cname.c => 0141-resolve_cname_bootstrap_servers.c} (99%) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 90a6ceb8b0..4d02fde720 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -152,6 +152,7 @@ delivery.report.only.error | P | true, false | false dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb())
*Type: see dedicated API* dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())
*Type: see dedicated API* sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages.
*Type: integer* +resolve.canonical.bootstrap.servers.only | * | true, false | false | low | Resolve each bootstrap address into a list of canonical names.Default: false.
*Type: boolean* ## Topic configuration properties @@ -181,11 +182,4 @@ offset.store.sync.interval.ms | C | -1 .. 86400000 | -1 offset.store.method | C | file, broker | broker | low | **DEPRECATED** Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.).
*Type: enum value* consume.callback.max.messages | C | 0 .. 1000000 | 0 | low | Maximum number of messages to dispatch in one `rd_kafka_consume_callback*()` call (0 = unlimited)
*Type: integer* - -## Global configuration properties - -Property | C/P | Range | Default | Importance | Description ------------------------------------------|-----|-----------------|--------------:|------------| -------------------------- -resolve.canonical.bootstrap.servers.only | * | true, false | false | low | Resolve each bootstrap address into a list of canonical names.Default: false.
*Type: boolean* - ### C/P legend: C = Consumer, P = Producer, * = both diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 3a8694704b..90eec70920 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1437,6 +1437,11 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "A higher value allows for more effective batching of these " "messages.", 0, 900000, 10}, + {_RK_GLOBAL, "resolve.canonical.bootstrap.servers.only", _RK_C_BOOL, + _RK(resolve_canonical_bootstrap_servers_only), + "Resolve each bootstrap address into a list of canonical names." + "Default: false.", + 0, 1, 0}, /* @@ -1621,12 +1626,6 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "one `rd_kafka_consume_callback*()` call (0 = unlimited)", 0, 1000000, 0}, - {_RK_GLOBAL, "resolve.canonical.bootstrap.servers.only", _RK_C_BOOL, - _RK(resolve_canonical_bootstrap_servers_only), - "Resolve each bootstrap address into a list of canonical names." - "Default: false.", - 0, 1, 0}, - {0, /* End */}}; /** diff --git a/tests/0141-resolve_cname.c b/tests/0141-resolve_cname_bootstrap_servers.c similarity index 99% rename from tests/0141-resolve_cname.c rename to tests/0141-resolve_cname_bootstrap_servers.c index 2cd9c7b377..337ccf62e7 100644 --- a/tests/0141-resolve_cname.c +++ b/tests/0141-resolve_cname_bootstrap_servers.c @@ -55,4 +55,4 @@ void do_test_config_enabled() { int main_0141_resolve_cname_bootstrap_servers(int argc, char **argv) { do_test_config_enabled(); -} \ No newline at end of file +} From 8f79b1b63f97319409ea12f4841bac331bdb950d Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Wed, 7 Jun 2023 23:58:34 +0530 Subject: [PATCH 05/14] PR Feedback --- CONFIGURATION.md | 2 +- src/rdkafka_broker.c | 27 +++++---- src/rdkafka_conf.c | 7 ++- src/rdkafka_conf.h | 2 +- tests/0004-conf.c | 2 + tests/0141-resolve_cname_bootstrap_servers.c | 58 -------------------- 6 files changed, 25 insertions(+), 73 deletions(-) delete mode 100644 tests/0141-resolve_cname_bootstrap_servers.c diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 4d02fde720..ca50fabe5e 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -152,7 +152,7 @@ delivery.report.only.error | P | true, false | false dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb())
*Type: see dedicated API* dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())
*Type: see dedicated API* sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages.
*Type: integer* -resolve.canonical.bootstrap.servers.only | * | true, false | false | low | Resolve each bootstrap address into a list of canonical names.Default: false.
*Type: boolean* +enable.bootstrap.servers.canonical.resolve | * | true, false | false | low | Resolve each bootstrap address into a list of canonical names. By default client will not attempt to reverse lookup to find the FQDN.Default: false.
*Type: boolean* ## Topic configuration properties diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 7ee33c133c..2d90c98011 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -5258,18 +5258,21 @@ static int rd_kafka_broker_name_parse(rd_kafka_t *rk, } /** - * @brief Add a broker from a string of type "[proto://]host[:port]" to the list of brokers. - * *cnt is increased by one if a broker was added, else not. + * @brief Add a broker from a string of type "[proto://]host[:port]" to the list + * of brokers. *cnt is increased by one if a broker was added, else not. */ -void rd_kafka_find_or_add_broker(rd_kafka_t *rk, rd_kafka_secproto_t proto, const char *host, uint16_t port, int *cnt) { - rd_kafka_broker_t *rkb; +static void rd_kafka_find_or_add_broker(rd_kafka_t *rk, + rd_kafka_secproto_t proto, + const char *host, + uint16_t port, + int *cnt) { + rd_kafka_broker_t *rkb = NULL; if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) && rkb->rkb_source == RD_KAFKA_CONFIGURED) { (*cnt)++; - } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, - host, port, - RD_KAFKA_NODEID_UA) != NULL) + } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, host, + port, RD_KAFKA_NODEID_UA) != NULL) (*cnt)++; /* If rd_kafka_broker_find returned a broker its @@ -5290,7 +5293,7 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { char *s_copy = rd_strdup(brokerlist); char *s = s_copy; int cnt = 0; - int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt); + int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt); rd_sockaddr_inx_t *sinx; rd_sockaddr_list_t *sockaddrList; @@ -5299,6 +5302,7 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { uint16_t port; const char *host; const char *errstr; + const char *resolvedFQDN; rd_kafka_secproto_t proto; if (*s == ',' || *s == ' ') { @@ -5311,7 +5315,10 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { break; rd_kafka_wrlock(rk); - if (rk->rk_conf.resolve_canonical_bootstrap_servers_only) { + if (rk->rk_conf.enable_bootstrap_servers_canonical_resolve) { + rd_kafka_dbg(rk, ALL, "INIT", + "Canonicalizing bootstrap broker %s:%d", + host, port); sockaddrList = rd_getaddrinfo( host, RD_KAFKA_PORT_STR, AI_ADDRCONFIG, rk->rk_conf.broker_addr_family, SOCK_STREAM, @@ -5327,7 +5334,7 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { } RD_SOCKADDR_LIST_FOREACH(sinx, sockaddrList) { - const char *resolvedFQDN = rd_sockaddr2str( + resolvedFQDN = rd_sockaddr2str( sinx, RD_SOCKADDR2STR_F_RESOLVE); rd_kafka_find_or_add_broker( rk, proto, resolvedFQDN, port, &cnt); diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 90eec70920..6c6fb70496 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1437,9 +1437,10 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "A higher value allows for more effective batching of these " "messages.", 0, 900000, 10}, - {_RK_GLOBAL, "resolve.canonical.bootstrap.servers.only", _RK_C_BOOL, - _RK(resolve_canonical_bootstrap_servers_only), - "Resolve each bootstrap address into a list of canonical names." + {_RK_GLOBAL, "enable.bootstrap.servers.canonical.resolve", _RK_C_BOOL, + _RK(enable_bootstrap_servers_canonical_resolve), + "Resolve each bootstrap address into a list of canonical names. By " + "default client will not attempt to reverse lookup to find the FQDN." "Default: false.", 0, 1, 0}, diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index 01df21f3ec..f394a7557e 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -224,7 +224,7 @@ struct rd_kafka_conf_s { int api_version_fallback_ms; char *broker_version_fallback; rd_kafka_secproto_t security_protocol; - int resolve_canonical_bootstrap_servers_only; + int enable_bootstrap_servers_canonical_resolve; struct { #if WITH_SSL diff --git a/tests/0004-conf.c b/tests/0004-conf.c index 51401e17d3..5938e520b7 100644 --- a/tests/0004-conf.c +++ b/tests/0004-conf.c @@ -529,6 +529,8 @@ int main_0004_conf(int argc, char **argv) { "ssl.ca.certificate.stores", "Intermediate ,, Root ,", #endif + "enable.bootstrap.servers.canonical.resolve", + "true", NULL }; static const char *tconfs[] = {"request.required.acks", diff --git a/tests/0141-resolve_cname_bootstrap_servers.c b/tests/0141-resolve_cname_bootstrap_servers.c deleted file mode 100644 index 337ccf62e7..0000000000 --- a/tests/0141-resolve_cname_bootstrap_servers.c +++ /dev/null @@ -1,58 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2023, Confluent Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "test.h" - -/** - * @brief Tests that the resolve.canonical.bootstrap.servers.only - * configuration property works as expected. - */ -void do_test_config_enabled() { - rd_kafka_conf_t *conf; - rd_kafka_t *rk; - - test_conf_init(&conf, NULL, 30); - test_conf_set(conf, "resolve.canonical.bootstrap.servers.only", "true"); - - - rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - if (rk == NULL) { - TEST_FAIL("Failed to create producer"); - return; - } - - if (rk) - rd_kafka_destroy(rk); - else - rd_kafka_conf_destroy(conf); - -} - -int main_0141_resolve_cname_bootstrap_servers(int argc, char **argv) { - do_test_config_enabled(); -} From 2565f6a7a39db9bc27fb50296953ff48ebfa4bbf Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Thu, 8 Jun 2023 23:19:03 +0530 Subject: [PATCH 06/14] PR Feedback --- CONFIGURATION.md | 2 +- src/rdkafka_broker.c | 30 ++++++++++++++++++------------ src/rdkafka_conf.c | 18 ++++++++++++------ src/rdkafka_conf.h | 7 ++++++- tests/0004-conf.c | 4 ++-- 5 files changed, 39 insertions(+), 22 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index ca50fabe5e..d98314d9c2 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -152,7 +152,7 @@ delivery.report.only.error | P | true, false | false dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb())
*Type: see dedicated API* dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())
*Type: see dedicated API* sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages.
*Type: integer* -enable.bootstrap.servers.canonical.resolve | * | true, false | false | low | Resolve each bootstrap address into a list of canonical names. By default client will not attempt to reverse lookup to find the FQDN.Default: false.
*Type: boolean* +client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default when the lookupreturns multiple IP addresses for a hostname they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only` each entry will be resolved and expanded into a list of canonical names
*Type: enum value* ## Topic configuration properties diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 2d90c98011..786b382613 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -5295,14 +5295,14 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { int cnt = 0; int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt); rd_sockaddr_inx_t *sinx; - rd_sockaddr_list_t *sockaddrList; + rd_sockaddr_list_t *sockaddr_list; /* Parse comma-separated list of brokers. */ while (*s) { uint16_t port; const char *host; - const char *errstr; - const char *resolvedFQDN; + const char *err_str; + const char *resolved_FQDN; rd_kafka_secproto_t proto; if (*s == ',' || *s == ' ') { @@ -5315,32 +5315,38 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { break; rd_kafka_wrlock(rk); - if (rk->rk_conf.enable_bootstrap_servers_canonical_resolve) { + if (rk->rk_conf.client_dns_lookup == + RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { rd_kafka_dbg(rk, ALL, "INIT", "Canonicalizing bootstrap broker %s:%d", host, port); - sockaddrList = rd_getaddrinfo( + sockaddr_list = rd_getaddrinfo( host, RD_KAFKA_PORT_STR, AI_ADDRCONFIG, rk->rk_conf.broker_addr_family, SOCK_STREAM, IPPROTO_TCP, rk->rk_conf.resolve_cb, - rk->rk_conf.opaque, &errstr); + rk->rk_conf.opaque, &err_str); - if (!sockaddrList) { + if (!sockaddr_list) { rd_kafka_log(rk, LOG_WARNING, "BROKER", "Failed to resolve '%s': %s", host, - errstr); + err_str); rd_kafka_wrunlock(rk); continue; } - RD_SOCKADDR_LIST_FOREACH(sinx, sockaddrList) { - resolvedFQDN = rd_sockaddr2str( + RD_SOCKADDR_LIST_FOREACH(sinx, sockaddr_list) { + resolved_FQDN = rd_sockaddr2str( sinx, RD_SOCKADDR2STR_F_RESOLVE); + rd_kafka_dbg( + rk, ALL, "INIT", + "Adding broker with resolved hostname %s", + resolved_FQDN); + rd_kafka_find_or_add_broker( - rk, proto, resolvedFQDN, port, &cnt); + rk, proto, resolved_FQDN, port, &cnt); }; - rd_sockaddr_list_destroy(sockaddrList); + rd_sockaddr_list_destroy(sockaddr_list); } else { rd_kafka_find_or_add_broker(rk, proto, host, port, &cnt); diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 6c6fb70496..8b2d07e004 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1437,12 +1437,18 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "A higher value allows for more effective batching of these " "messages.", 0, 900000, 10}, - {_RK_GLOBAL, "enable.bootstrap.servers.canonical.resolve", _RK_C_BOOL, - _RK(enable_bootstrap_servers_canonical_resolve), - "Resolve each bootstrap address into a list of canonical names. By " - "default client will not attempt to reverse lookup to find the FQDN." - "Default: false.", - 0, 1, 0}, + {_RK_GLOBAL, "client.dns.lookup", _RK_C_S2I, _RK(client_dns_lookup), + "Controls how the client uses DNS lookups. By default when the lookup " + "returns multiple IP addresses for a hostname they will all be attempted " + "to connect to before failing the connection. Applies to both bootstrap " + "and " + " advertised servers. If the value is set to " + "`resolve_canonical_bootstrap_servers_only` each entry will be " + "resolved and expanded into a list of canonical names", + .vdef = RD_KAFKA_USE_ALL_DNS_IPS, + .s2i = {{RD_KAFKA_USE_ALL_DNS_IPS, "use_all_dns_ips"}, + {RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, + "resolve_canonical_bootstrap_servers_only"}}}, /* diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index f394a7557e..12c04211e5 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -158,6 +158,11 @@ typedef enum { RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, /**< RFC2818 */ } rd_kafka_ssl_endpoint_id_t; +typedef enum { + RD_KAFKA_USE_ALL_DNS_IPS, + RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, +} rd_kafka_client_dns_lookup_t; + /* Increase in steps of 64 as needed. * This must be larger than sizeof(rd_kafka_[topic_]conf_t) */ #define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 33) @@ -224,7 +229,7 @@ struct rd_kafka_conf_s { int api_version_fallback_ms; char *broker_version_fallback; rd_kafka_secproto_t security_protocol; - int enable_bootstrap_servers_canonical_resolve; + rd_kafka_client_dns_lookup_t client_dns_lookup; struct { #if WITH_SSL diff --git a/tests/0004-conf.c b/tests/0004-conf.c index 5938e520b7..c78c75f558 100644 --- a/tests/0004-conf.c +++ b/tests/0004-conf.c @@ -529,8 +529,8 @@ int main_0004_conf(int argc, char **argv) { "ssl.ca.certificate.stores", "Intermediate ,, Root ,", #endif - "enable.bootstrap.servers.canonical.resolve", - "true", + "client.dns.lookup", + "resolve_canonical_bootstrap_servers_only", NULL }; static const char *tconfs[] = {"request.required.acks", From dc886cd8e50af9e11d64e0422b96a99655246c53 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Thu, 8 Jun 2023 23:35:06 +0530 Subject: [PATCH 07/14] Fix issues --- CONFIGURATION.md | 2 +- src/rdkafka_conf.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index d98314d9c2..85357680d3 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -152,7 +152,7 @@ delivery.report.only.error | P | true, false | false dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb())
*Type: see dedicated API* dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())
*Type: see dedicated API* sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages.
*Type: integer* -client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default when the lookupreturns multiple IP addresses for a hostname they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only` each entry will be resolved and expanded into a list of canonical names
*Type: enum value* +client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default when the lookup returns multiple IP addresses for a hostname they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only` each entry will be resolved and expanded into a list of canonical names
*Type: enum value* ## Topic configuration properties diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 8b2d07e004..0546f5f016 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1625,7 +1625,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "Apache Kafka 0.8.2 or later on the broker.).", .vdef = RD_KAFKA_OFFSET_METHOD_BROKER, .s2i = {{RD_KAFKA_OFFSET_METHOD_FILE, "file"}, - {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}}, + {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}}, {_RK_TOPIC | _RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT, _RKT(consume_callback_max_msgs), From 159e8006a628ea6ba110e1d356f426276030f94c Mon Sep 17 00:00:00 2001 From: Anchit Jain <112778471+anchitj@users.noreply.github.com> Date: Thu, 8 Jun 2023 18:32:43 +0000 Subject: [PATCH 08/14] Style fix --- src/rdkafka_conf.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 0546f5f016..a913adb4ee 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1447,8 +1447,8 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "resolved and expanded into a list of canonical names", .vdef = RD_KAFKA_USE_ALL_DNS_IPS, .s2i = {{RD_KAFKA_USE_ALL_DNS_IPS, "use_all_dns_ips"}, - {RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, - "resolve_canonical_bootstrap_servers_only"}}}, + {RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, + "resolve_canonical_bootstrap_servers_only"}}}, /* @@ -1625,7 +1625,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "Apache Kafka 0.8.2 or later on the broker.).", .vdef = RD_KAFKA_OFFSET_METHOD_BROKER, .s2i = {{RD_KAFKA_OFFSET_METHOD_FILE, "file"}, - {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}}, + {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}}, {_RK_TOPIC | _RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT, _RKT(consume_callback_max_msgs), From 48164fb255a31ca23e302cb86d94c796757eb2a1 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Fri, 9 Jun 2023 13:11:30 +0530 Subject: [PATCH 09/14] Add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index acc40f9ffc..b5124545ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ librdkafka v2.2.0 is a feature release: closes as normal ones (#4294). * Added `fetch.queue.backoff.ms` to the consumer to control how long the consumer backs off next fetch attempt. (@bitemyapp, @edenhill, #2879) + * [KIP-235](https://cwiki.apache.org/confluence/display/KAFKA/KIP-235%3A+Add+DNS+alias+support+for+secured+connection): + Add DNS alias support for secured connection (#4292). ## Enhancements From 3bbb666c2fc3bc0fadd8bfff4efbfdea296554e7 Mon Sep 17 00:00:00 2001 From: Anchit Jain <112778471+anchitj@users.noreply.github.com> Date: Mon, 12 Jun 2023 07:57:02 +0000 Subject: [PATCH 10/14] Add note about difference from Java default --- CONFIGURATION.md | 2 +- src/rdkafka_conf.c | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 85357680d3..127fe4c88f 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -152,7 +152,7 @@ delivery.report.only.error | P | true, false | false dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb())
*Type: see dedicated API* dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())
*Type: see dedicated API* sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages.
*Type: integer* -client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default when the lookup returns multiple IP addresses for a hostname they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only` each entry will be resolved and expanded into a list of canonical names
*Type: enum value* +client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default, when the lookup returns multiple IP addresses for a hostname, they will all be attempted for connection before the connection is considered failed. This applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only`, each entry will be resolved and expanded into a list of canonical names. NOTE: Default here is different from the Java client's default behavior, which connects only to the first IP address returned for a hostname.
*Type: enum value* ## Topic configuration properties diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index a913adb4ee..f568940da6 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1438,13 +1438,14 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "messages.", 0, 900000, 10}, {_RK_GLOBAL, "client.dns.lookup", _RK_C_S2I, _RK(client_dns_lookup), - "Controls how the client uses DNS lookups. By default when the lookup " - "returns multiple IP addresses for a hostname they will all be attempted " - "to connect to before failing the connection. Applies to both bootstrap " - "and " - " advertised servers. If the value is set to " - "`resolve_canonical_bootstrap_servers_only` each entry will be " - "resolved and expanded into a list of canonical names", + "Controls how the client uses DNS lookups. By default, when the lookup " + "returns multiple IP addresses for a hostname, they will all be attempted " + "for connection before the connection is considered failed. This applies " + "to both bootstrap and advertised servers. If the value is set to " + "`resolve_canonical_bootstrap_servers_only`, each entry will be resolved " + "and expanded into a list of canonical names. NOTE: Default here is " + "different from the Java client's default behavior, which connects only " + "to the first IP address returned for a hostname. ", .vdef = RD_KAFKA_USE_ALL_DNS_IPS, .s2i = {{RD_KAFKA_USE_ALL_DNS_IPS, "use_all_dns_ips"}, {RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, From 24015103368c61c0a06ea398d37398a0076f95c3 Mon Sep 17 00:00:00 2001 From: Anchit Jain <112778471+anchitj@users.noreply.github.com> Date: Mon, 12 Jun 2023 10:05:55 +0000 Subject: [PATCH 11/14] Add adding_bootsrap boolean variable --- src/rdkafka.c | 3 ++- src/rdkafka_broker.c | 11 +++++++---- src/rdkafka_broker.h | 4 +++- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index 2a5e040b68..05631e7969 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2523,7 +2523,8 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, /* Add initial list of brokers from configuration */ if (rk->rk_conf.brokerlist) { - if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0) + if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist, + rd_true) == 0) rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN, "No brokers configured"); } diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 786b382613..ba3a27d5f1 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -5289,7 +5289,9 @@ static void rd_kafka_find_or_add_broker(rd_kafka_t *rk, * @locality any thread * @locks none */ -int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { +int rd_kafka_brokers_add0(rd_kafka_t *rk, + const char *brokerlist, + rd_bool_t adding_bootstrap) { char *s_copy = rd_strdup(brokerlist); char *s = s_copy; int cnt = 0; @@ -5315,8 +5317,9 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { break; rd_kafka_wrlock(rk); - if (rk->rk_conf.client_dns_lookup == - RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { + if (adding_bootstrap && + rk->rk_conf.client_dns_lookup == + RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { rd_kafka_dbg(rk, ALL, "INIT", "Canonicalizing bootstrap broker %s:%d", host, port); @@ -5372,7 +5375,7 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist) { - return rd_kafka_brokers_add0(rk, brokerlist); + return rd_kafka_brokers_add0(rk, brokerlist, rd_false); } diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index be7ce0536b..c60f24df6f 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -468,7 +468,9 @@ rd_kafka_broker_t *rd_kafka_broker_controller_async(rd_kafka_t *rk, int state, rd_kafka_enq_once_t *eonce); -int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist); +int rd_kafka_brokers_add0(rd_kafka_t *rk, + const char *brokerlist, + rd_bool_t adding_bootstrap); void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state); void rd_kafka_broker_fail(rd_kafka_broker_t *rkb, From c89b89ebb2f52bc38170935df31b89011e48a977 Mon Sep 17 00:00:00 2001 From: Anchit Jain <112778471+anchitj@users.noreply.github.com> Date: Tue, 13 Jun 2023 10:42:32 +0000 Subject: [PATCH 12/14] Address comments --- src/rdkafka_broker.c | 8 ++++---- src/rdkafka_broker.h | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index ba3a27d5f1..8e98233c9c 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -5268,8 +5268,8 @@ static void rd_kafka_find_or_add_broker(rd_kafka_t *rk, int *cnt) { rd_kafka_broker_t *rkb = NULL; - if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) && - rkb->rkb_source == RD_KAFKA_CONFIGURED) { + if (rkb = rd_kafka_broker_find(rk, proto, host, port) && + rkb->rkb_source == RD_KAFKA_CONFIGURED) { (*cnt)++; } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, host, port, RD_KAFKA_NODEID_UA) != NULL) @@ -5291,7 +5291,7 @@ static void rd_kafka_find_or_add_broker(rd_kafka_t *rk, */ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist, - rd_bool_t adding_bootstrap) { + rd_bool_t is_bootstrap_server_list) { char *s_copy = rd_strdup(brokerlist); char *s = s_copy; int cnt = 0; @@ -5317,7 +5317,7 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, break; rd_kafka_wrlock(rk); - if (adding_bootstrap && + if (is_bootstrap_server_list && rk->rk_conf.client_dns_lookup == RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { rd_kafka_dbg(rk, ALL, "INIT", diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index c60f24df6f..83560cde06 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -470,7 +470,7 @@ rd_kafka_broker_t *rd_kafka_broker_controller_async(rd_kafka_t *rk, int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist, - rd_bool_t adding_bootstrap); + rd_bool_t is_bootstrap_server_list); void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state); void rd_kafka_broker_fail(rd_kafka_broker_t *rkb, From 573baef017b6ca26a5fb7215d8c4b0128fa2384b Mon Sep 17 00:00:00 2001 From: Anchit Jain <112778471+anchitj@users.noreply.github.com> Date: Tue, 13 Jun 2023 10:46:52 +0000 Subject: [PATCH 13/14] Add parentheses back --- src/rdkafka_broker.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 8e98233c9c..359bdd54f2 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -5268,8 +5268,8 @@ static void rd_kafka_find_or_add_broker(rd_kafka_t *rk, int *cnt) { rd_kafka_broker_t *rkb = NULL; - if (rkb = rd_kafka_broker_find(rk, proto, host, port) && - rkb->rkb_source == RD_KAFKA_CONFIGURED) { + if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) && + rkb->rkb_source == RD_KAFKA_CONFIGURED) { (*cnt)++; } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, host, port, RD_KAFKA_NODEID_UA) != NULL) From 5c9b33ba256fb1e45bf8b97da4414ce719929155 Mon Sep 17 00:00:00 2001 From: Anchit Jain <112778471+anchitj@users.noreply.github.com> Date: Tue, 13 Jun 2023 10:51:57 +0000 Subject: [PATCH 14/14] Update supported KIPs --- INTRODUCTION.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 67504dd133..19689576d5 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1900,7 +1900,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-226 - AdminAPI: Dynamic broker config | 1.1.0 | Supported | | KIP-227 - Consumer Incremental Fetch | 1.1.0 | Not supported | | KIP-229 - AdminAPI: DeleteGroups | 1.1.0 | Supported | -| KIP-235 - DNS alias for secure connections | 2.1.0 | Not supported | +| KIP-235 - DNS alias for secure connections | 2.1.0 | Supported | | KIP-249 - AdminAPI: Deletegation Tokens | 2.0.0 | Not supported | | KIP-255 - SASL OAUTHBEARER | 2.0.0 | Supported | | KIP-266 - Fix indefinite consumer timeouts | 2.0.0 | Supported (bound by session.timeout.ms and max.poll.interval.ms) |