From 77a013b7a2611f7bdc091afa1e56b1a46d1c52f5 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 6 May 2021 13:13:54 +0200 Subject: [PATCH] Remove old Zookeeper mentions since it may confuse people (KRaft, etc) --- CONFIGURATION.md | 4 +- INTRODUCTION.md | 2 - examples/Makefile | 15 - examples/rdkafka_zookeeper_example.c | 728 --------------------------- src/rdkafka_conf.c | 3 +- src/rdkafka_offset.c | 3 +- 6 files changed, 4 insertions(+), 751 deletions(-) delete mode 100644 examples/rdkafka_zookeeper_example.c diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 5fc102bc37..7fbef2de3e 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -161,8 +161,8 @@ opaque | * | | compression.codec | P | none, gzip, snappy, lz4, zstd, inherit | inherit | high | Compression codec to use for compressing message sets. inherit = inherit global compression.codec configuration.
*Type: enum value* compression.type | P | none, gzip, snappy, lz4, zstd | none | medium | Alias for `compression.codec`: compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`.
*Type: enum value* compression.level | P | -1 .. 12 | -1 | medium | Compression level parameter for algorithm selected by configuration property `compression.codec`. Higher values will result in better compression at the cost of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; -1 = codec-dependent default compression level.
*Type: integer* -auto.commit.enable | C | true, false | true | low | **DEPRECATED** [**LEGACY PROPERTY:** This property is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `enable.auto.commit` property must be used instead]. If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call `rd_kafka_offset_store()` to store an offset (optional). **NOTE:** There is currently no zookeeper integration, offsets will be written to broker or local file according to offset.store.method.
*Type: boolean* -enable.auto.commit | C | true, false | true | low | **DEPRECATED** Alias for `auto.commit.enable`: [**LEGACY PROPERTY:** This property is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `enable.auto.commit` property must be used instead]. If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call `rd_kafka_offset_store()` to store an offset (optional). **NOTE:** There is currently no zookeeper integration, offsets will be written to broker or local file according to offset.store.method.
*Type: boolean* +auto.commit.enable | C | true, false | true | low | **DEPRECATED** [**LEGACY PROPERTY:** This property is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `enable.auto.commit` property must be used instead]. If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call `rd_kafka_offset_store()` to store an offset (optional). Offsets will be written to broker or local file according to offset.store.method.
*Type: boolean* +enable.auto.commit | C | true, false | true | low | **DEPRECATED** Alias for `auto.commit.enable`: [**LEGACY PROPERTY:** This property is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `enable.auto.commit` property must be used instead]. If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call `rd_kafka_offset_store()` to store an offset (optional). Offsets will be written to broker or local file according to offset.store.method.
*Type: boolean* auto.commit.interval.ms | C | 10 .. 86400000 | 60000 | high | [**LEGACY PROPERTY:** This setting is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `auto.commit.interval.ms` property must be used instead]. The frequency in milliseconds that the consumer offsets are committed (written) to offset storage.
*Type: integer* auto.offset.reset | C | smallest, earliest, beginning, largest, latest, end, error | largest | high | Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
*Type: enum value* offset.store.path | C | | . | low | **DEPRECATED** Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition. File-based offset storage will be removed in a future version.
*Type: string* diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 5953b94665..6eed11c3d1 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1421,8 +1421,6 @@ The legacy `auto.commit.enable` topic configuration property is only to be used with the legacy low-level consumer. Use `enable.auto.commit` with the modern KafkaConsumer. -There is no support for offset management with ZooKeeper. - ##### Auto offset commit diff --git a/examples/Makefile b/examples/Makefile index a489821838..7720a3c123 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -103,21 +103,6 @@ rdkafka_consume_batch: ../src-cpp/librdkafka++.a ../src/librdkafka.a rdkafka_con $(CXX) $(CPPFLAGS) $(CXXFLAGS) rdkafka_consume_batch.cpp -o $@ $(LDFLAGS) \ ../src-cpp/librdkafka++.a ../src/librdkafka.a $(LIBS) -rdkafka_zookeeper_example: ../src/librdkafka.a rdkafka_zookeeper_example.c - $(CC) $(CPPFLAGS) $(CFLAGS) -I/usr/include/zookeeper rdkafka_zookeeper_example.c -o $@ $(LDFLAGS) \ - ../src/librdkafka.a $(LIBS) -lzookeeper_mt -ljansson - @echo "# $@ is ready" - @echo "#" - @echo "# Run producer (write messages on stdin)" - @echo "./$@ -P -t -p " - @echo "" - @echo "# or consumer" - @echo "./$@ -C -t -p " - @echo "" - @echo "#" - @echo "# More usage options:" - @echo "./$@ -h" - openssl_engine_example_cpp: ../src-cpp/librdkafka++.a ../src/librdkafka.a openssl_engine_example.cpp $(CXX) $(CPPFLAGS) $(CXXFLAGS) openssl_engine_example.cpp -o $@ $(LDFLAGS) \ ../src-cpp/librdkafka++.a ../src/librdkafka.a $(LIBS) diff --git a/examples/rdkafka_zookeeper_example.c b/examples/rdkafka_zookeeper_example.c deleted file mode 100644 index ec96917300..0000000000 --- a/examples/rdkafka_zookeeper_example.c +++ /dev/null @@ -1,728 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -/** - * Apache Kafka consumer & producer example programs - * using the Kafka driver from librdkafka - * (https://github.com/edenhill/librdkafka) - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -/* Typical include path would be , but this program - * is builtin from within the librdkafka source tree and thus differs. */ -#include "rdkafka.h" /* for Kafka driver */ - -#include -#include -#include - -#define BROKER_PATH "/brokers/ids" - -static int run = 1; -static rd_kafka_t *rk; -static int exit_eof = 0; -static int quiet = 0; -static enum { - OUTPUT_HEXDUMP, - OUTPUT_RAW, -} output = OUTPUT_HEXDUMP; - -static void stop (int sig) { - run = 0; - fclose(stdin); /* abort fgets() */ -} - - -static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) { - const char *p = (const char *)ptr; - int of = 0; - - - if (name) - fprintf(fp, "%s hexdump (%zd bytes):\n", name, len); - - for (of = 0 ; of < len ; of += 16) { - char hexen[16*3+1]; - char charen[16+1]; - int hof = 0; - - int cof = 0; - int i; - - for (i = of ; i < of + 16 && i < len ; i++) { - hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff); - cof += sprintf(charen+cof, "%c", - isprint((int)p[i]) ? p[i] : '.'); - } - fprintf(fp, "%08x: %-48s %-16s\n", - of, hexen, charen); - } -} - -/** - * Kafka logger callback (optional) - */ -static void logger (const rd_kafka_t *rk, int level, - const char *fac, const char *buf) { - struct timeval tv; - gettimeofday(&tv, NULL); - fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", - (int)tv.tv_sec, (int)(tv.tv_usec / 1000), - level, fac, rd_kafka_name(rk), buf); -} - -/** - * Message delivery report callback. - * Called once for each message. - * See rdkafka.h for more information. - */ -static void msg_delivered (rd_kafka_t *rk, - void *payload, size_t len, - int error_code, - void *opaque, void *msg_opaque) { - - if (error_code) - fprintf(stderr, "%% Message delivery failed: %s\n", - rd_kafka_err2str(error_code)); - else if (!quiet) - fprintf(stderr, "%% Message delivered (%zd bytes)\n", len); -} - - -static void msg_consume (rd_kafka_message_t *rkmessage, - void *opaque) { - if (rkmessage->err) { - if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - fprintf(stderr, - "%% Consumer reached end of %s [%"PRId32"] " - "message queue at offset %"PRId64"\n", - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, rkmessage->offset); - - if (exit_eof) - run = 0; - - return; - } - - fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] " - "offset %"PRId64": %s\n", - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, - rkmessage->offset, - rd_kafka_message_errstr(rkmessage)); - return; - } - - if (!quiet) - fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n", - rkmessage->offset, rkmessage->len); - - if (rkmessage->key_len) { - if (output == OUTPUT_HEXDUMP) - hexdump(stdout, "Message Key", - rkmessage->key, rkmessage->key_len); - else - printf("Key: %.*s\n", - (int)rkmessage->key_len, (char *)rkmessage->key); - } - - if (output == OUTPUT_HEXDUMP) - hexdump(stdout, "Message Payload", - rkmessage->payload, rkmessage->len); - else - printf("%.*s\n", - (int)rkmessage->len, (char *)rkmessage->payload); -} - - -static void metadata_print (const char *topic, - const struct rd_kafka_metadata *metadata) { - int i, j, k; - - printf("Metadata for %s (from broker %"PRId32": %s):\n", - topic ? : "all topics", - metadata->orig_broker_id, - metadata->orig_broker_name); - - - /* Iterate brokers */ - printf(" %i brokers:\n", metadata->broker_cnt); - for (i = 0 ; i < metadata->broker_cnt ; i++) - printf(" broker %"PRId32" at %s:%i\n", - metadata->brokers[i].id, - metadata->brokers[i].host, - metadata->brokers[i].port); - - /* Iterate topics */ - printf(" %i topics:\n", metadata->topic_cnt); - for (i = 0 ; i < metadata->topic_cnt ; i++) { - const struct rd_kafka_metadata_topic *t = &metadata->topics[i]; - printf(" topic \"%s\" with %i partitions:", - t->topic, - t->partition_cnt); - if (t->err) { - printf(" %s", rd_kafka_err2str(t->err)); - if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) - printf(" (try again)"); - } - printf("\n"); - - /* Iterate topic's partitions */ - for (j = 0 ; j < t->partition_cnt ; j++) { - const struct rd_kafka_metadata_partition *p; - p = &t->partitions[j]; - printf(" partition %"PRId32", " - "leader %"PRId32", replicas: ", - p->id, p->leader); - - /* Iterate partition's replicas */ - for (k = 0 ; k < p->replica_cnt ; k++) - printf("%s%"PRId32, - k > 0 ? ",":"", p->replicas[k]); - - /* Iterate partition's ISRs */ - printf(", isrs: "); - for (k = 0 ; k < p->isr_cnt ; k++) - printf("%s%"PRId32, - k > 0 ? ",":"", p->isrs[k]); - if (p->err) - printf(", %s\n", rd_kafka_err2str(p->err)); - else - printf("\n"); - } - } -} - - -static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) -{ - if (zzh) - { - struct String_vector brokerlist; - if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) - { - fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH); - return; - } - - int i; - char *brokerptr = brokers; - for (i = 0; i < brokerlist.count; i++) - { - char path[255], cfg[1024]; - sprintf(path, "/brokers/ids/%s", brokerlist.data[i]); - int len = sizeof(cfg); - zoo_get(zzh, path, 0, cfg, &len, NULL); - - if (len > 0) - { - cfg[len] = '\0'; - json_error_t jerror; - json_t *jobj = json_loads(cfg, 0, &jerror); - if (jobj) - { - json_t *jhost = json_object_get(jobj, "host"); - json_t *jport = json_object_get(jobj, "port"); - - if (jhost && jport) - { - const char *host = json_string_value(jhost); - const int port = json_integer_value(jport); - sprintf(brokerptr, "%s:%d", host, port); - - brokerptr += strlen(brokerptr); - if (i < brokerlist.count - 1) - { - *brokerptr++ = ','; - } - } - json_decref(jobj); - } - } - } - deallocate_String_vector(&brokerlist); - printf("Found brokers %s\n", brokers); - } -} - - -static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) -{ - char brokers[1024]; - if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0) - { - brokers[0] = '\0'; - set_brokerlist_from_zookeeper(zh, brokers); - if (brokers[0] != '\0' && rk != NULL) - { - rd_kafka_brokers_add(rk, brokers); - rd_kafka_poll(rk, 10); - } - } -} - - -static zhandle_t* initialize_zookeeper(const char * zookeeper, const int debug) -{ - zhandle_t *zh; - if (debug) - { - zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG); - } - zh = zookeeper_init(zookeeper, watcher, 10000, 0, 0, 0); - if (zh == NULL) - { - fprintf(stderr, "Zookeeper connection not established."); - exit(1); - } - return zh; -} - - -static void sig_usr1 (int sig) { - rd_kafka_dump(stdout, rk); -} - -int main (int argc, char **argv) { - rd_kafka_topic_t *rkt; - char *zookeeper = "localhost:2181"; - zhandle_t *zh = NULL; - char brokers[1024]; - char mode = 'C'; - char *topic = NULL; - int partition = RD_KAFKA_PARTITION_UA; - int opt; - rd_kafka_conf_t *conf; - rd_kafka_topic_conf_t *topic_conf; - char errstr[512]; - const char *debug = NULL; - int64_t start_offset = 0; - int do_conf_dump = 0; - - memset(brokers, 0, sizeof(brokers)); - quiet = !isatty(STDIN_FILENO); - - /* Kafka configuration */ - conf = rd_kafka_conf_new(); - - /* Topic configuration */ - topic_conf = rd_kafka_topic_conf_new(); - - while ((opt = getopt(argc, argv, "PCLt:p:k:z:qd:o:eX:A")) != -1) { - switch (opt) { - case 'P': - case 'C': - case 'L': - mode = opt; - break; - case 't': - topic = optarg; - break; - case 'p': - partition = atoi(optarg); - break; - case 'k': - zookeeper = optarg; - break; - case 'z': - if (rd_kafka_conf_set(conf, "compression.codec", - optarg, - errstr, sizeof(errstr)) != - RD_KAFKA_CONF_OK) { - fprintf(stderr, "%% %s\n", errstr); - exit(1); - } - break; - case 'o': - if (!strcmp(optarg, "end")) - start_offset = RD_KAFKA_OFFSET_END; - else if (!strcmp(optarg, "beginning")) - start_offset = RD_KAFKA_OFFSET_BEGINNING; - else if (!strcmp(optarg, "stored")) - start_offset = RD_KAFKA_OFFSET_STORED; - else - start_offset = strtoll(optarg, NULL, 10); - break; - case 'e': - exit_eof = 1; - break; - case 'd': - debug = optarg; - break; - case 'q': - quiet = 1; - break; - case 'A': - output = OUTPUT_RAW; - break; - case 'X': - { - char *name, *val; - rd_kafka_conf_res_t res; - - if (!strcmp(optarg, "list") || - !strcmp(optarg, "help")) { - rd_kafka_conf_properties_show(stdout); - exit(0); - } - - if (!strcmp(optarg, "dump")) { - do_conf_dump = 1; - continue; - } - - name = optarg; - if (!(val = strchr(name, '='))) { - fprintf(stderr, "%% Expected " - "-X property=value, not %s\n", name); - exit(1); - } - - *val = '\0'; - val++; - - res = RD_KAFKA_CONF_UNKNOWN; - /* Try "topic." prefixed properties on topic - * conf first, and then fall through to global if - * it didnt match a topic configuration property. */ - if (!strncmp(name, "topic.", strlen("topic."))) - res = rd_kafka_topic_conf_set(topic_conf, - name+ - strlen("topic."), - val, - errstr, - sizeof(errstr)); - - if (res == RD_KAFKA_CONF_UNKNOWN) - res = rd_kafka_conf_set(conf, name, val, - errstr, sizeof(errstr)); - - if (res != RD_KAFKA_CONF_OK) { - fprintf(stderr, "%% %s\n", errstr); - exit(1); - } - } - break; - - default: - goto usage; - } - } - - - if (do_conf_dump) { - const char **arr; - size_t cnt; - int pass; - - for (pass = 0 ; pass < 2 ; pass++) { - int i; - - if (pass == 0) { - arr = rd_kafka_conf_dump(conf, &cnt); - printf("# Global config\n"); - } else { - printf("# Topic config\n"); - arr = rd_kafka_topic_conf_dump(topic_conf, - &cnt); - } - - for (i = 0 ; i < cnt ; i += 2) - printf("%s = %s\n", - arr[i], arr[i+1]); - - printf("\n"); - - rd_kafka_conf_dump_free(arr, cnt); - } - - exit(0); - } - - - if (optind != argc || (mode != 'L' && !topic)) { - usage: - fprintf(stderr, - "Usage: %s -C|-P|-L -t " - "[-p ] [-b ]\n" - "\n" - "librdkafka version %s (0x%08x)\n" - "\n" - " Options:\n" - " -C | -P Consumer or Producer mode\n" - " -L Metadata list mode\n" - " -t Topic to fetch / produce\n" - " -p Partition (random partitioner)\n" - " -k Zookeeper address (localhost:2181)\n" - " -z Enable compression:\n" - " none|gzip|snappy\n" - " -o Start offset (consumer)\n" - " -e Exit consumer when last message\n" - " in partition has been received.\n" - " -d [facs..] Enable debugging contexts:\n" - " -q Be quiet\n" - " -A Raw payload output (consumer)\n" - " %s\n" - " -X Set arbitrary librdkafka " - "configuration property\n" - " Properties prefixed with \"topic.\" " - "will be set on topic object.\n" - " Use '-X list' to see the full list\n" - " of supported properties.\n" - "\n" - " In Consumer mode:\n" - " writes fetched messages to stdout\n" - " In Producer mode:\n" - " reads messages from stdin and sends to broker\n" - " In List mode:\n" - " queries broker for metadata information, " - "topic is optional.\n" - "\n" - "\n" - "\n", - argv[0], - rd_kafka_version_str(), rd_kafka_version(), - RD_KAFKA_DEBUG_CONTEXTS); - exit(1); - } - - - signal(SIGINT, stop); - signal(SIGUSR1, sig_usr1); - - if (debug && - rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) != - RD_KAFKA_CONF_OK) { - fprintf(stderr, "%% Debug configuration failed: %s: %s\n", - errstr, debug); - exit(1); - } - - /* Set logger */ - rd_kafka_conf_set_log_cb(conf, logger); - - /** Initialize zookeeper */ - zh = initialize_zookeeper(zookeeper, debug != NULL); - - /* Add brokers */ - set_brokerlist_from_zookeeper(zh, brokers); - if (rd_kafka_conf_set(conf, "metadata.broker.list", - brokers, errstr, sizeof(errstr)) != - RD_KAFKA_CONF_OK) { - fprintf(stderr, "%% Failed to set brokers: %s\n", errstr); - exit(1); - } - - if (debug) { - printf("Broker list from zookeeper cluster %s: %s\n", zookeeper, brokers); - } - - if (mode == 'P') { - /* - * Producer - */ - char buf[2048]; - int sendcnt = 0; - - /* Set up a message delivery report callback. - * It will be called once for each message, either on successful - * delivery to broker, or upon failure to deliver to broker. */ - rd_kafka_conf_set_dr_cb(conf, msg_delivered); - - /* Create Kafka handle */ - if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, - errstr, sizeof(errstr)))) { - fprintf(stderr, - "%% Failed to create new producer: %s\n", - errstr); - exit(1); - } - - /* Create topic */ - rkt = rd_kafka_topic_new(rk, topic, topic_conf); - - if (!quiet) - fprintf(stderr, - "%% Type stuff and hit enter to send\n"); - - while (run && fgets(buf, sizeof(buf), stdin)) { - size_t len = strlen(buf); - if (buf[len-1] == '\n') - buf[--len] = '\0'; - - /* Send/Produce message. */ - if (rd_kafka_produce(rkt, partition, - RD_KAFKA_MSG_F_COPY, - /* Payload and length */ - buf, len, - /* Optional key and its length */ - NULL, 0, - /* Message opaque, provided in - * delivery report callback as - * msg_opaque. */ - NULL) == -1) { - fprintf(stderr, - "%% Failed to produce to topic %s " - "partition %i: %s\n", - rd_kafka_topic_name(rkt), partition, - rd_kafka_err2str( - rd_kafka_errno2err(errno))); - /* Poll to handle delivery reports */ - rd_kafka_poll(rk, 0); - continue; - } - - if (!quiet) - fprintf(stderr, "%% Sent %zd bytes to topic " - "%s partition %i\n", - len, rd_kafka_topic_name(rkt), partition); - sendcnt++; - /* Poll to handle delivery reports */ - rd_kafka_poll(rk, 0); - } - - /* Poll to handle delivery reports */ - rd_kafka_poll(rk, 0); - - /* Wait for messages to be delivered */ - while (run && rd_kafka_outq_len(rk) > 0) - rd_kafka_poll(rk, 100); - - /* Destroy the handle */ - rd_kafka_destroy(rk); - - } else if (mode == 'C') { - /* - * Consumer - */ - - /* Create Kafka handle */ - if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, - errstr, sizeof(errstr)))) { - fprintf(stderr, - "%% Failed to create new consumer: %s\n", - errstr); - exit(1); - } - - /* Create topic */ - rkt = rd_kafka_topic_new(rk, topic, topic_conf); - - /* Start consuming */ - if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){ - fprintf(stderr, "%% Failed to start consuming: %s\n", - rd_kafka_err2str(rd_kafka_errno2err(errno))); - exit(1); - } - - while (run) { - rd_kafka_message_t *rkmessage; - - /* Consume single message. - * See rdkafka_performance.c for high speed - * consuming of messages. */ - rkmessage = rd_kafka_consume(rkt, partition, 1000); - if (!rkmessage) /* timeout */ - continue; - - msg_consume(rkmessage, NULL); - - /* Return message to rdkafka */ - rd_kafka_message_destroy(rkmessage); - } - - /* Stop consuming */ - rd_kafka_consume_stop(rkt, partition); - - rd_kafka_topic_destroy(rkt); - - rd_kafka_destroy(rk); - - } else if (mode == 'L') { - rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; - - /* Create Kafka handle */ - if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, - errstr, sizeof(errstr)))) { - fprintf(stderr, - "%% Failed to create new producer: %s\n", - errstr); - exit(1); - } - - /* Create topic */ - if (topic) - rkt = rd_kafka_topic_new(rk, topic, topic_conf); - else - rkt = NULL; - - while (run) { - const struct rd_kafka_metadata *metadata; - - /* Fetch metadata */ - err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt, - &metadata, 5000); - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { - fprintf(stderr, - "%% Failed to acquire metadata: %s\n", - rd_kafka_err2str(err)); - run = 0; - break; - } - - metadata_print(topic, metadata); - - rd_kafka_metadata_destroy(metadata); - run = 0; - } - - /* Destroy the handle */ - rd_kafka_destroy(rk); - - /* Exit right away, dont wait for background cleanup, we haven't - * done anything important anyway. */ - exit(err ? 2 : 0); - } - - /* Let background threads clean up and terminate cleanly. */ - rd_kafka_wait_destroyed(2000); - - /** Free the zookeeper data. */ - zookeeper_close(zh); - - return 0; -} diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 2bd8424a41..41a7868d43 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1576,8 +1576,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "process restarts to pick up where it left off. " "If false, the application will have to call " "`rd_kafka_offset_store()` to store an offset (optional). " - "**NOTE:** There is currently no zookeeper integration, offsets " - "will be written to broker or local file according to " + "Offsets will be written to broker or local file according to " "offset.store.method.", 0, 1, 1 }, { _RK_TOPIC|_RK_CONSUMER, "enable.auto.commit", _RK_C_ALIAS, diff --git a/src/rdkafka_offset.c b/src/rdkafka_offset.c index 69e2403492..31b3033f26 100644 --- a/src/rdkafka_offset.c +++ b/src/rdkafka_offset.c @@ -29,8 +29,7 @@ // FIXME: Revise this documentation: /** * This file implements the consumer offset storage. - * It currently supports local file storage and broker OffsetCommit storage, - * not zookeeper. + * It currently supports local file storage and broker OffsetCommit storage. * * Regardless of commit method (file, broker, ..) this is how it works: * - When rdkafka, or the application, depending on if auto.offset.commit