diff --git a/CHANGELOG.md b/CHANGELOG.md index acc40f9ffc..b5124545ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ librdkafka v2.2.0 is a feature release: closes as normal ones (#4294). * Added `fetch.queue.backoff.ms` to the consumer to control how long the consumer backs off next fetch attempt. (@bitemyapp, @edenhill, #2879) + * [KIP-235](https://cwiki.apache.org/confluence/display/KAFKA/KIP-235%3A+Add+DNS+alias+support+for+secured+connection): + Add DNS alias support for secured connection (#4292). ## Enhancements diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 9a0e7ab4c7..127fe4c88f 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -152,6 +152,7 @@ delivery.report.only.error | P | true, false | false dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb())
*Type: see dedicated API* dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())
*Type: see dedicated API* sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages.
*Type: integer* +client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default, when the lookup returns multiple IP addresses for a hostname, they will all be attempted for connection before the connection is considered failed. This applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only`, each entry will be resolved and expanded into a list of canonical names. NOTE: Default here is different from the Java client's default behavior, which connects only to the first IP address returned for a hostname.
*Type: enum value* ## Topic configuration properties diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 67504dd133..19689576d5 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1900,7 +1900,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-226 - AdminAPI: Dynamic broker config | 1.1.0 | Supported | | KIP-227 - Consumer Incremental Fetch | 1.1.0 | Not supported | | KIP-229 - AdminAPI: DeleteGroups | 1.1.0 | Supported | -| KIP-235 - DNS alias for secure connections | 2.1.0 | Not supported | +| KIP-235 - DNS alias for secure connections | 2.1.0 | Supported | | KIP-249 - AdminAPI: Deletegation Tokens | 2.0.0 | Not supported | | KIP-255 - SASL OAUTHBEARER | 2.0.0 | Supported | | KIP-266 - Fix indefinite consumer timeouts | 2.0.0 | Supported (bound by session.timeout.ms and max.poll.interval.ms) | diff --git a/src/rdaddr.h b/src/rdaddr.h index c8574d0194..4f03630c33 100644 --- a/src/rdaddr.h +++ b/src/rdaddr.h @@ -139,7 +139,7 @@ rd_sockaddr_list_next(rd_sockaddr_list_t *rsal) { #define RD_SOCKADDR_LIST_FOREACH(sinx, rsal) \ for ((sinx) = &(rsal)->rsal_addr[0]; \ - (sinx) < &(rsal)->rsal_addr[(rsal)->rsal_len]; (sinx)++) + (sinx) < &(rsal)->rsal_addr[(rsal)->rsal_cnt]; (sinx)++) /** * Wrapper for getaddrinfo(3) that performs these additional tasks: diff --git a/src/rdkafka.c b/src/rdkafka.c index 2a5e040b68..05631e7969 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2523,7 +2523,8 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, /* Add initial list of brokers from configuration */ if (rk->rk_conf.brokerlist) { - if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0) + if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist, + rd_true) == 0) rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN, "No brokers configured"); } diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index cb319aec11..359bdd54f2 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -49,6 +49,7 @@ #include #include "rd.h" +#include "rdaddr.h" #include "rdkafka_int.h" #include "rdkafka_msg.h" #include "rdkafka_msgset.h" @@ -5256,6 +5257,31 @@ static int rd_kafka_broker_name_parse(rd_kafka_t *rk, return 0; } +/** + * @brief Add a broker from a string of type "[proto://]host[:port]" to the list + * of brokers. *cnt is increased by one if a broker was added, else not. + */ +static void rd_kafka_find_or_add_broker(rd_kafka_t *rk, + rd_kafka_secproto_t proto, + const char *host, + uint16_t port, + int *cnt) { + rd_kafka_broker_t *rkb = NULL; + + if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) && + rkb->rkb_source == RD_KAFKA_CONFIGURED) { + (*cnt)++; + } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, host, + port, RD_KAFKA_NODEID_UA) != NULL) + (*cnt)++; + + /* If rd_kafka_broker_find returned a broker its + * reference needs to be released + * See issue #193 */ + if (rkb) + rd_kafka_broker_destroy(rkb); +} + /** * @brief Adds a (csv list of) broker(s). * Returns the number of brokers succesfully added. @@ -5263,17 +5289,22 @@ static int rd_kafka_broker_name_parse(rd_kafka_t *rk, * @locality any thread * @locks none */ -int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { +int rd_kafka_brokers_add0(rd_kafka_t *rk, + const char *brokerlist, + rd_bool_t is_bootstrap_server_list) { char *s_copy = rd_strdup(brokerlist); char *s = s_copy; int cnt = 0; - rd_kafka_broker_t *rkb; - int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt); + int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt); + rd_sockaddr_inx_t *sinx; + rd_sockaddr_list_t *sockaddr_list; /* Parse comma-separated list of brokers. */ while (*s) { uint16_t port; const char *host; + const char *err_str; + const char *resolved_FQDN; rd_kafka_secproto_t proto; if (*s == ',' || *s == ' ') { @@ -5286,20 +5317,43 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { break; rd_kafka_wrlock(rk); + if (is_bootstrap_server_list && + rk->rk_conf.client_dns_lookup == + RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { + rd_kafka_dbg(rk, ALL, "INIT", + "Canonicalizing bootstrap broker %s:%d", + host, port); + sockaddr_list = rd_getaddrinfo( + host, RD_KAFKA_PORT_STR, AI_ADDRCONFIG, + rk->rk_conf.broker_addr_family, SOCK_STREAM, + IPPROTO_TCP, rk->rk_conf.resolve_cb, + rk->rk_conf.opaque, &err_str); + + if (!sockaddr_list) { + rd_kafka_log(rk, LOG_WARNING, "BROKER", + "Failed to resolve '%s': %s", host, + err_str); + rd_kafka_wrunlock(rk); + continue; + } - if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) && - rkb->rkb_source == RD_KAFKA_CONFIGURED) { - cnt++; - } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, - host, port, - RD_KAFKA_NODEID_UA) != NULL) - cnt++; - - /* If rd_kafka_broker_find returned a broker its - * reference needs to be released - * See issue #193 */ - if (rkb) - rd_kafka_broker_destroy(rkb); + RD_SOCKADDR_LIST_FOREACH(sinx, sockaddr_list) { + resolved_FQDN = rd_sockaddr2str( + sinx, RD_SOCKADDR2STR_F_RESOLVE); + rd_kafka_dbg( + rk, ALL, "INIT", + "Adding broker with resolved hostname %s", + resolved_FQDN); + + rd_kafka_find_or_add_broker( + rk, proto, resolved_FQDN, port, &cnt); + }; + + rd_sockaddr_list_destroy(sockaddr_list); + } else { + rd_kafka_find_or_add_broker(rk, proto, host, port, + &cnt); + } rd_kafka_wrunlock(rk); } @@ -5321,7 +5375,7 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) { int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist) { - return rd_kafka_brokers_add0(rk, brokerlist); + return rd_kafka_brokers_add0(rk, brokerlist, rd_false); } diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index be7ce0536b..83560cde06 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -468,7 +468,9 @@ rd_kafka_broker_t *rd_kafka_broker_controller_async(rd_kafka_t *rk, int state, rd_kafka_enq_once_t *eonce); -int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist); +int rd_kafka_brokers_add0(rd_kafka_t *rk, + const char *brokerlist, + rd_bool_t is_bootstrap_server_list); void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state); void rd_kafka_broker_fail(rd_kafka_broker_t *rkb, diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index e5c1415fce..f568940da6 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1437,6 +1437,19 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "A higher value allows for more effective batching of these " "messages.", 0, 900000, 10}, + {_RK_GLOBAL, "client.dns.lookup", _RK_C_S2I, _RK(client_dns_lookup), + "Controls how the client uses DNS lookups. By default, when the lookup " + "returns multiple IP addresses for a hostname, they will all be attempted " + "for connection before the connection is considered failed. This applies " + "to both bootstrap and advertised servers. If the value is set to " + "`resolve_canonical_bootstrap_servers_only`, each entry will be resolved " + "and expanded into a list of canonical names. NOTE: Default here is " + "different from the Java client's default behavior, which connects only " + "to the first IP address returned for a hostname. ", + .vdef = RD_KAFKA_USE_ALL_DNS_IPS, + .s2i = {{RD_KAFKA_USE_ALL_DNS_IPS, "use_all_dns_ips"}, + {RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, + "resolve_canonical_bootstrap_servers_only"}}}, /* diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index 2d625ce05f..12c04211e5 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -158,6 +158,11 @@ typedef enum { RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, /**< RFC2818 */ } rd_kafka_ssl_endpoint_id_t; +typedef enum { + RD_KAFKA_USE_ALL_DNS_IPS, + RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, +} rd_kafka_client_dns_lookup_t; + /* Increase in steps of 64 as needed. * This must be larger than sizeof(rd_kafka_[topic_]conf_t) */ #define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 33) @@ -224,6 +229,7 @@ struct rd_kafka_conf_s { int api_version_fallback_ms; char *broker_version_fallback; rd_kafka_secproto_t security_protocol; + rd_kafka_client_dns_lookup_t client_dns_lookup; struct { #if WITH_SSL diff --git a/tests/0004-conf.c b/tests/0004-conf.c index 51401e17d3..c78c75f558 100644 --- a/tests/0004-conf.c +++ b/tests/0004-conf.c @@ -529,6 +529,8 @@ int main_0004_conf(int argc, char **argv) { "ssl.ca.certificate.stores", "Intermediate ,, Root ,", #endif + "client.dns.lookup", + "resolve_canonical_bootstrap_servers_only", NULL }; static const char *tconfs[] = {"request.required.acks",