diff --git a/src/rdkafka.c b/src/rdkafka.c index 1019c1650d..ec9dbc5940 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2954,7 +2954,8 @@ rd_kafka_consume_cb (rd_kafka_t *rk, struct consume_ctx *ctx = opaque; rd_kafka_message_t *rkmessage; - if (unlikely(rd_kafka_op_version_outdated(rko, 0))) { + if (unlikely(rd_kafka_op_version_outdated(rko, 0)) || + rko->rko_type == RD_KAFKA_OP_BARRIER) { rd_kafka_op_destroy(rko); return RD_KAFKA_OP_RES_HANDLED; } @@ -3858,6 +3859,9 @@ rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, res = rd_kafka_op_call(rk, rkq, rko); break; + case RD_KAFKA_OP_BARRIER: + break; + case RD_KAFKA_OP_PURGE: rd_kafka_purge(rk, rko->rko_u.purge.flags); break; diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 09ee515249..f3b783f348 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -95,6 +95,7 @@ const char *rd_kafka_op2str (rd_kafka_op_type_t type) { [RD_KAFKA_OP_GET_REBALANCE_PROTOCOL] = "REPLY:GET_REBALANCE_PROTOCOL", [RD_KAFKA_OP_LEADERS] = "REPLY:LEADERS", + [RD_KAFKA_OP_BARRIER] = "REPLY:BARRIER", }; if (type & RD_KAFKA_OP_REPLY) @@ -237,6 +238,7 @@ rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type) { [RD_KAFKA_OP_GET_REBALANCE_PROTOCOL] = sizeof(rko->rko_u.rebalance_protocol), [RD_KAFKA_OP_LEADERS] = sizeof(rko->rko_u.leaders), + [RD_KAFKA_OP_BARRIER] = _RD_KAFKA_OP_EMPTY, }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 7d779ec244..788444c0e2 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -136,6 +136,7 @@ typedef enum { RD_KAFKA_OP_TXN, /**< Transaction command */ RD_KAFKA_OP_GET_REBALANCE_PROTOCOL, /**< Get rebalance protocol */ RD_KAFKA_OP_LEADERS, /**< Partition leader query */ + RD_KAFKA_OP_BARRIER, /**< Version barrier bump */ RD_KAFKA_OP__END } rd_kafka_op_type_t; diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index ed032c052e..984ddcbf5a 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -186,6 +186,24 @@ static void rd_kafka_toppar_consumer_lag_tmr_cb (rd_kafka_timers_t *rkts, rd_kafka_toppar_consumer_lag_req(rktp); } +/** + * @brief Update rktp_op_version. + * Enqueue an RD_KAFKA_OP_BARRIER type of operation + * when the op_version is updated. + * + * @locks_required rd_kafka_toppar_lock() must be held. + * @locality Toppar handler thread + */ +void rd_kafka_toppar_op_version_bump (rd_kafka_toppar_t *rktp, + int32_t version) { + rd_kafka_op_t *rko; + + rktp->rktp_op_version = version; + rko = rd_kafka_op_new(RD_KAFKA_OP_BARRIER); + rko->rko_version = version; + rd_kafka_q_enq(rktp->rktp_fetchq, rko); +} + /** * Add new partition to topic. @@ -1585,7 +1603,7 @@ static void rd_kafka_toppar_fetch_start (rd_kafka_toppar_t *rktp, goto err_reply; } - rktp->rktp_op_version = version; + rd_kafka_toppar_op_version_bump(rktp, version); if (rkcg) { rd_kafka_assert(rktp->rktp_rkt->rkt_rk, !rktp->rktp_cgrp); @@ -1694,7 +1712,7 @@ void rd_kafka_toppar_fetch_stop (rd_kafka_toppar_t *rktp, rktp->rktp_partition, rd_kafka_fetch_states[rktp->rktp_fetch_state], version); - rktp->rktp_op_version = version; + rd_kafka_toppar_op_version_bump(rktp, version); /* Abort pending offset lookups. */ if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY) @@ -1754,7 +1772,7 @@ void rd_kafka_toppar_seek (rd_kafka_toppar_t *rktp, goto err_reply; } - rktp->rktp_op_version = version; + rd_kafka_toppar_op_version_bump(rktp, version); /* Abort pending offset lookups. */ if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY) @@ -1809,7 +1827,7 @@ static void rd_kafka_toppar_pause_resume (rd_kafka_toppar_t *rktp, rd_kafka_toppar_lock(rktp); - rktp->rktp_op_version = version; + rd_kafka_toppar_op_version_bump(rktp, version); if (!pause && (rktp->rktp_flags & flag) != flag) { rd_kafka_dbg(rk, TOPIC, "RESUME", diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index 6ee352082d..c975ea2057 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -516,8 +516,32 @@ int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms, return cnt; } +/** + * @brief Filter out and destroy outdated messages. + * + * @returns Returns the number of valid messages. + * + * @locality Any thread. + */ +static size_t rd_kafka_purge_outdated_messages (int32_t version, + rd_kafka_message_t **rkmessages, size_t cnt) { + size_t valid_count = 0; + size_t i; - + for (i = 0; i < cnt; i++) { + rd_kafka_op_t *rko; + rko = rkmessages[i]->_private; + if (rd_kafka_op_version_outdated(rko, version)) { + /* This also destroys the corresponding rkmessage. */ + rd_kafka_op_destroy(rko); + } else if (i > valid_count) { + rkmessages[valid_count++] = rkmessages[i]; + } else { + valid_count++; + } + } + return valid_count; +} /** @@ -582,6 +606,15 @@ int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms, continue; } + if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) { + cnt = (unsigned int)rd_kafka_purge_outdated_messages( + rko->rko_version, + rkmessages, + cnt); + rd_kafka_op_destroy(rko); + continue; + } + /* Serve non-FETCH callbacks */ res = rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL); diff --git a/tests/0122-buffer_cleaning_after_rebalance.c b/tests/0122-buffer_cleaning_after_rebalance.c new file mode 100644 index 0000000000..e9cb6e1925 --- /dev/null +++ b/tests/0122-buffer_cleaning_after_rebalance.c @@ -0,0 +1,207 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2021, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +/* Typical include path would be , but this program + * is built from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ + +typedef struct consumer_s { + const char *what; + rd_kafka_queue_t *rkq; + int timeout_ms; + int consume_msg_cnt; + rd_kafka_t *rk; + uint64_t testid; + test_msgver_t *mv; +} consumer_t; + +static int consumer_batch_queue (void *arg) { + consumer_t *arguments = arg; + int msg_cnt = 0; + int i; + test_timing_t t_cons; + + rd_kafka_queue_t *rkq = arguments->rkq; + int timeout_ms = arguments->timeout_ms; + const int consume_msg_cnt = arguments->consume_msg_cnt; + rd_kafka_t *rk = arguments->rk; + uint64_t testid = arguments->testid; + + rd_kafka_message_t **rkmessage = malloc(consume_msg_cnt * sizeof(*rkmessage)); + + TIMING_START(&t_cons, "CONSUME"); + + msg_cnt = rd_kafka_consume_batch_queue(rkq, + timeout_ms, rkmessage, consume_msg_cnt); + + TIMING_STOP(&t_cons); + + for (i = 0; i < msg_cnt; i++) { + if (test_msgver_add_msg(rk, arguments->mv, rkmessage[i]) == 0) + TEST_FAIL("The message is not from testid " + "%"PRId64" \n", testid); + rd_kafka_message_destroy(rkmessage[i]); + } + + return 0; +} + + +/** + * @brief Produce 400 messages and consume 500 messages totally by 2 consumers + * using batch queue method, verify if there isn't any missed or + * duplicate messages received by the two consumers. + * The reasons for setting the consume messages number is higher than + * or equal to the produce messages number are: + * 1) Make sure each consumer can at most receive half of the produced + * messages even though the consumers expect more. + * 2) If the consume messages number is smaller than the produce + * messages number, it's hard to verify that the messages returned + * are added to the batch queue before or after the rebalancing. + * But if the consume messages number is larger than the produce + * messages number, and we still received half of the produced + * messages by each consumer, we can make sure that the buffer + * cleaning is happened during the batch queue process to guarantee + * only received messages added to the batch queue after the + * rebalance. + * + * 1. Produce 100 messages to each of the 4 partitions + * 2. First consumer subscribes to the topic, wait for it's assignment + * 3. The first consumer consumes 500 messages using the batch queue + * method + * 4. Second consumer subscribes to the topic, wait for it's assignment + * 5. Rebalance happenes + * 6. The second consumer consumes 500 messages using the batch queue + * method + * 7. Each consumer receives 200 messages finally + * 8. Combine all the messages received by the 2 consumers and + * verify if there isn't any missed or duplicate messages + * + */ +static void do_test_consume_batch (const char *strategy) { + const int partition_cnt = 4; + rd_kafka_queue_t *rkq1, *rkq2; + const char *topic; + rd_kafka_t *c1; + rd_kafka_t *c2; + int p; + const int timeout_ms = 30000; + uint64_t testid; + const int consume_msg_cnt = 500; + const int produce_msg_cnt = 400; + rd_kafka_conf_t *conf; + consumer_t c1_args; + consumer_t c2_args; + test_msgver_t mv; + thrd_t thread_id; + + SUB_TEST("partition.assignment.strategy = %s", strategy); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "partition.assignment.strategy", strategy); + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + /* Produce messages */ + topic = test_mk_topic_name("0122-buffer_cleaning", 1); + + for (p = 0 ; p < partition_cnt ; p++) + test_produce_msgs_easy(topic, + testid, + p, + produce_msg_cnt / partition_cnt); + + /* Create consumers */ + c1 = test_create_consumer(topic, NULL, + rd_kafka_conf_dup(conf), NULL); + c2 = test_create_consumer(topic, NULL, conf, NULL); + + test_consumer_subscribe(c1, topic); + test_consumer_wait_assignment(c1, rd_false); + + /* Create generic consume queue */ + rkq1 = rd_kafka_queue_get_consumer(c1); + + c1_args.what = "C1.PRE"; + c1_args.rkq = rkq1; + c1_args.timeout_ms = timeout_ms; + c1_args.consume_msg_cnt = consume_msg_cnt; + c1_args.rk = c1; + c1_args.testid = testid; + c1_args.mv = &mv; + if (thrd_create(&thread_id, consumer_batch_queue, &c1_args) + != thrd_success) + TEST_FAIL("Failed to create thread for %s", "C1.PRE"); + + test_consumer_subscribe(c2, topic); + test_consumer_wait_assignment(c2, rd_false); + + thrd_join(thread_id, NULL); + + /* Create generic consume queue */ + rkq2 = rd_kafka_queue_get_consumer(c2); + + c2_args.what = "C2.PRE"; + c2_args.rkq = rkq2; + c2_args.timeout_ms = timeout_ms; + c2_args.consume_msg_cnt = consume_msg_cnt; + c2_args.rk = c2; + c2_args.testid = testid; + c2_args.mv = &mv; + + consumer_batch_queue(&c2_args); + + test_msgver_verify("C1.PRE + C2.PRE", + &mv, + TEST_MSGVER_ORDER|TEST_MSGVER_DUP, + 0, + produce_msg_cnt); + test_msgver_clear(&mv); + + rd_kafka_queue_destroy(rkq1); + rd_kafka_queue_destroy(rkq2); + + test_consumer_close(c1); + test_consumer_close(c2); + + rd_kafka_destroy(c1); + rd_kafka_destroy(c2); + + SUB_TEST_PASS(); +} + + +int main_0122_buffer_cleaning_after_rebalance (int argc, char **argv) { + do_test_consume_batch("range"); + do_test_consume_batch("cooperative-sticky"); + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e500d5b8fa..6d70b88791 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -113,6 +113,7 @@ set( 0119-consumer_auth.cpp 0120-asymmetric_subscription.c 0121-clusterid.c + 0122-buffer_cleaning_after_rebalance.c 0123-connections_max_idle.c 0124-openssl_invalid_engine.c 8000-idle.cpp diff --git a/tests/test.c b/tests/test.c index 23ef9ff003..c391214027 100644 --- a/tests/test.c +++ b/tests/test.c @@ -231,6 +231,7 @@ _TEST_DECL(0118_commit_rebalance); _TEST_DECL(0119_consumer_auth); _TEST_DECL(0120_asymmetric_subscription); _TEST_DECL(0121_clusterid); +_TEST_DECL(0122_buffer_cleaning_after_rebalance); _TEST_DECL(0123_connections_max_idle); _TEST_DECL(0124_openssl_invalid_engine); @@ -434,6 +435,7 @@ struct test tests[] = { _TEST(0119_consumer_auth, 0, TEST_BRKVER(2,1,0,0)), _TEST(0120_asymmetric_subscription, TEST_F_LOCAL), _TEST(0121_clusterid, TEST_F_LOCAL), + _TEST(0122_buffer_cleaning_after_rebalance, TEST_BRKVER(2,4,0,0)), _TEST(0123_connections_max_idle, 0), _TEST(0124_openssl_invalid_engine, TEST_F_LOCAL), diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index abc1d29c4c..d6666c7455 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -203,6 +203,7 @@ +