Skip to content

Commit

Permalink
Clean consume_batch..() buffer after rebalancing for batch queue (@jl…
Browse files Browse the repository at this point in the history
…iunyu, #3269)

Issue: Buffer is not cleaned after rebalance if messages are polled using the batch queue method, so the consumer will still get old messages.

Solution: when assign happens, a new op event with type RD_KAFKA_OP_BARRIER will be created, a new version is been created at mean time. If the consumer met this event, will clean the buffer by comparing the version of msgs and the new version just created.
  • Loading branch information
jliunyu committed Apr 13, 2021
1 parent 91aba43 commit b31363f
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 6 deletions.
6 changes: 5 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2954,7 +2954,8 @@ rd_kafka_consume_cb (rd_kafka_t *rk,
struct consume_ctx *ctx = opaque;
rd_kafka_message_t *rkmessage;

if (unlikely(rd_kafka_op_version_outdated(rko, 0))) {
if (unlikely(rd_kafka_op_version_outdated(rko, 0)) ||
rko->rko_type == RD_KAFKA_OP_BARRIER) {
rd_kafka_op_destroy(rko);
return RD_KAFKA_OP_RES_HANDLED;
}
Expand Down Expand Up @@ -3858,6 +3859,9 @@ rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
res = rd_kafka_op_call(rk, rkq, rko);
break;

case RD_KAFKA_OP_BARRIER:
break;

case RD_KAFKA_OP_PURGE:
rd_kafka_purge(rk, rko->rko_u.purge.flags);
break;
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ const char *rd_kafka_op2str (rd_kafka_op_type_t type) {
[RD_KAFKA_OP_GET_REBALANCE_PROTOCOL] =
"REPLY:GET_REBALANCE_PROTOCOL",
[RD_KAFKA_OP_LEADERS] = "REPLY:LEADERS",
[RD_KAFKA_OP_BARRIER] = "REPLY:BARRIER",
};

if (type & RD_KAFKA_OP_REPLY)
Expand Down Expand Up @@ -237,6 +238,7 @@ rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type) {
[RD_KAFKA_OP_GET_REBALANCE_PROTOCOL] =
sizeof(rko->rko_u.rebalance_protocol),
[RD_KAFKA_OP_LEADERS] = sizeof(rko->rko_u.leaders),
[RD_KAFKA_OP_BARRIER] = _RD_KAFKA_OP_EMPTY,
};
size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ typedef enum {
RD_KAFKA_OP_TXN, /**< Transaction command */
RD_KAFKA_OP_GET_REBALANCE_PROTOCOL, /**< Get rebalance protocol */
RD_KAFKA_OP_LEADERS, /**< Partition leader query */
RD_KAFKA_OP_BARRIER, /**< Version barrier bump */
RD_KAFKA_OP__END
} rd_kafka_op_type_t;

Expand Down
26 changes: 22 additions & 4 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,24 @@ static void rd_kafka_toppar_consumer_lag_tmr_cb (rd_kafka_timers_t *rkts,
rd_kafka_toppar_consumer_lag_req(rktp);
}

/**
* @brief Update rktp_op_version.
* Enqueue an RD_KAFKA_OP_BARRIER type of operation
* when the op_version is updated.
*
* @locks_required rd_kafka_toppar_lock() must be held.
* @locality Toppar handler thread
*/
void rd_kafka_toppar_op_version_bump (rd_kafka_toppar_t *rktp,
int32_t version) {
rd_kafka_op_t *rko;

rktp->rktp_op_version = version;
rko = rd_kafka_op_new(RD_KAFKA_OP_BARRIER);
rko->rko_version = version;
rd_kafka_q_enq(rktp->rktp_fetchq, rko);
}


/**
* Add new partition to topic.
Expand Down Expand Up @@ -1585,7 +1603,7 @@ static void rd_kafka_toppar_fetch_start (rd_kafka_toppar_t *rktp,
goto err_reply;
}

rktp->rktp_op_version = version;
rd_kafka_toppar_op_version_bump(rktp, version);

if (rkcg) {
rd_kafka_assert(rktp->rktp_rkt->rkt_rk, !rktp->rktp_cgrp);
Expand Down Expand Up @@ -1694,7 +1712,7 @@ void rd_kafka_toppar_fetch_stop (rd_kafka_toppar_t *rktp,
rktp->rktp_partition,
rd_kafka_fetch_states[rktp->rktp_fetch_state], version);

rktp->rktp_op_version = version;
rd_kafka_toppar_op_version_bump(rktp, version);

/* Abort pending offset lookups. */
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
Expand Down Expand Up @@ -1754,7 +1772,7 @@ void rd_kafka_toppar_seek (rd_kafka_toppar_t *rktp,
goto err_reply;
}

rktp->rktp_op_version = version;
rd_kafka_toppar_op_version_bump(rktp, version);

/* Abort pending offset lookups. */
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
Expand Down Expand Up @@ -1809,7 +1827,7 @@ static void rd_kafka_toppar_pause_resume (rd_kafka_toppar_t *rktp,

rd_kafka_toppar_lock(rktp);

rktp->rktp_op_version = version;
rd_kafka_toppar_op_version_bump(rktp, version);

if (!pause && (rktp->rktp_flags & flag) != flag) {
rd_kafka_dbg(rk, TOPIC, "RESUME",
Expand Down
35 changes: 34 additions & 1 deletion src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,32 @@ int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms,
return cnt;
}

/**
* @brief Filter out and destroy outdated messages.
*
* @returns Returns the number of valid messages.
*
* @locality Any thread.
*/
static size_t rd_kafka_purge_outdated_messages (int32_t version,
rd_kafka_message_t **rkmessages, size_t cnt) {
size_t valid_count = 0;
size_t i;


for (i = 0; i < cnt; i++) {
rd_kafka_op_t *rko;
rko = rkmessages[i]->_private;
if (rd_kafka_op_version_outdated(rko, version)) {
/* This also destroys the corresponding rkmessage. */
rd_kafka_op_destroy(rko);
} else if (i > valid_count) {
rkmessages[valid_count++] = rkmessages[i];
} else {
valid_count++;
}
}
return valid_count;
}


/**
Expand Down Expand Up @@ -582,6 +606,15 @@ int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms,
continue;
}

if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) {
cnt = (unsigned int)rd_kafka_purge_outdated_messages(
rko->rko_version,
rkmessages,
cnt);
rd_kafka_op_destroy(rko);
continue;
}

/* Serve non-FETCH callbacks */
res = rd_kafka_poll_cb(rk, rkq, rko,
RD_KAFKA_Q_CB_RETURN, NULL);
Expand Down
207 changes: 207 additions & 0 deletions tests/0122-buffer_cleaning_after_rebalance.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2021, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "test.h"
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */

typedef struct consumer_s {
const char *what;
rd_kafka_queue_t *rkq;
int timeout_ms;
int consume_msg_cnt;
rd_kafka_t *rk;
uint64_t testid;
test_msgver_t *mv;
} consumer_t;

static int consumer_batch_queue (void *arg) {
consumer_t *arguments = arg;
int msg_cnt = 0;
int i;
test_timing_t t_cons;

rd_kafka_queue_t *rkq = arguments->rkq;
int timeout_ms = arguments->timeout_ms;
const int consume_msg_cnt = arguments->consume_msg_cnt;
rd_kafka_t *rk = arguments->rk;
uint64_t testid = arguments->testid;

rd_kafka_message_t **rkmessage = malloc(consume_msg_cnt * sizeof(*rkmessage));

TIMING_START(&t_cons, "CONSUME");

msg_cnt = rd_kafka_consume_batch_queue(rkq,
timeout_ms, rkmessage, consume_msg_cnt);

TIMING_STOP(&t_cons);

for (i = 0; i < msg_cnt; i++) {
if (test_msgver_add_msg(rk, arguments->mv, rkmessage[i]) == 0)
TEST_FAIL("The message is not from testid "
"%"PRId64" \n", testid);
rd_kafka_message_destroy(rkmessage[i]);
}

return 0;
}


/**
* @brief Produce 400 messages and consume 500 messages totally by 2 consumers
* using batch queue method, verify if there isn't any missed or
* duplicate messages received by the two consumers.
* The reasons for setting the consume messages number is higher than
* or equal to the produce messages number are:
* 1) Make sure each consumer can at most receive half of the produced
* messages even though the consumers expect more.
* 2) If the consume messages number is smaller than the produce
* messages number, it's hard to verify that the messages returned
* are added to the batch queue before or after the rebalancing.
* But if the consume messages number is larger than the produce
* messages number, and we still received half of the produced
* messages by each consumer, we can make sure that the buffer
* cleaning is happened during the batch queue process to guarantee
* only received messages added to the batch queue after the
* rebalance.
*
* 1. Produce 100 messages to each of the 4 partitions
* 2. First consumer subscribes to the topic, wait for it's assignment
* 3. The first consumer consumes 500 messages using the batch queue
* method
* 4. Second consumer subscribes to the topic, wait for it's assignment
* 5. Rebalance happenes
* 6. The second consumer consumes 500 messages using the batch queue
* method
* 7. Each consumer receives 200 messages finally
* 8. Combine all the messages received by the 2 consumers and
* verify if there isn't any missed or duplicate messages
*
*/
static void do_test_consume_batch (const char *strategy) {
const int partition_cnt = 4;
rd_kafka_queue_t *rkq1, *rkq2;
const char *topic;
rd_kafka_t *c1;
rd_kafka_t *c2;
int p;
const int timeout_ms = 30000;
uint64_t testid;
const int consume_msg_cnt = 500;
const int produce_msg_cnt = 400;
rd_kafka_conf_t *conf;
consumer_t c1_args;
consumer_t c2_args;
test_msgver_t mv;
thrd_t thread_id;

SUB_TEST("partition.assignment.strategy = %s", strategy);

test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "enable.auto.commit", "false");
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "partition.assignment.strategy", strategy);

testid = test_id_generate();
test_msgver_init(&mv, testid);

/* Produce messages */
topic = test_mk_topic_name("0122-buffer_cleaning", 1);

for (p = 0 ; p < partition_cnt ; p++)
test_produce_msgs_easy(topic,
testid,
p,
produce_msg_cnt / partition_cnt);

/* Create consumers */
c1 = test_create_consumer(topic, NULL,
rd_kafka_conf_dup(conf), NULL);
c2 = test_create_consumer(topic, NULL, conf, NULL);

test_consumer_subscribe(c1, topic);
test_consumer_wait_assignment(c1, rd_false);

/* Create generic consume queue */
rkq1 = rd_kafka_queue_get_consumer(c1);

c1_args.what = "C1.PRE";
c1_args.rkq = rkq1;
c1_args.timeout_ms = timeout_ms;
c1_args.consume_msg_cnt = consume_msg_cnt;
c1_args.rk = c1;
c1_args.testid = testid;
c1_args.mv = &mv;
if (thrd_create(&thread_id, consumer_batch_queue, &c1_args)
!= thrd_success)
TEST_FAIL("Failed to create thread for %s", "C1.PRE");

test_consumer_subscribe(c2, topic);
test_consumer_wait_assignment(c2, rd_false);

thrd_join(thread_id, NULL);

/* Create generic consume queue */
rkq2 = rd_kafka_queue_get_consumer(c2);

c2_args.what = "C2.PRE";
c2_args.rkq = rkq2;
c2_args.timeout_ms = timeout_ms;
c2_args.consume_msg_cnt = consume_msg_cnt;
c2_args.rk = c2;
c2_args.testid = testid;
c2_args.mv = &mv;

consumer_batch_queue(&c2_args);

test_msgver_verify("C1.PRE + C2.PRE",
&mv,
TEST_MSGVER_ORDER|TEST_MSGVER_DUP,
0,
produce_msg_cnt);
test_msgver_clear(&mv);

rd_kafka_queue_destroy(rkq1);
rd_kafka_queue_destroy(rkq2);

test_consumer_close(c1);
test_consumer_close(c2);

rd_kafka_destroy(c1);
rd_kafka_destroy(c2);

SUB_TEST_PASS();
}


int main_0122_buffer_cleaning_after_rebalance (int argc, char **argv) {
do_test_consume_batch("range");
do_test_consume_batch("cooperative-sticky");
return 0;
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ set(
0119-consumer_auth.cpp
0120-asymmetric_subscription.c
0121-clusterid.c
0122-buffer_cleaning_after_rebalance.c
0123-connections_max_idle.c
0124-openssl_invalid_engine.c
8000-idle.cpp
Expand Down
2 changes: 2 additions & 0 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ _TEST_DECL(0118_commit_rebalance);
_TEST_DECL(0119_consumer_auth);
_TEST_DECL(0120_asymmetric_subscription);
_TEST_DECL(0121_clusterid);
_TEST_DECL(0122_buffer_cleaning_after_rebalance);
_TEST_DECL(0123_connections_max_idle);
_TEST_DECL(0124_openssl_invalid_engine);

Expand Down Expand Up @@ -434,6 +435,7 @@ struct test tests[] = {
_TEST(0119_consumer_auth, 0, TEST_BRKVER(2,1,0,0)),
_TEST(0120_asymmetric_subscription, TEST_F_LOCAL),
_TEST(0121_clusterid, TEST_F_LOCAL),
_TEST(0122_buffer_cleaning_after_rebalance, TEST_BRKVER(2,4,0,0)),
_TEST(0123_connections_max_idle, 0),
_TEST(0124_openssl_invalid_engine, TEST_F_LOCAL),

Expand Down
1 change: 1 addition & 0 deletions win32/tests/tests.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
<ClCompile Include="..\..\tests\0119-consumer_auth.cpp" />
<ClCompile Include="..\..\tests\0120-asymmetric_subscription.c" />
<ClCompile Include="..\..\tests\0121-clusterid.c" />
<ClCompile Include="..\..\tests\0122-buffer_cleaning_after_rebalance.c" />
<ClCompile Include="..\..\tests\0123-connections_max_idle.c" />
<ClCompile Include="..\..\tests\0124-openssl_invalid_engine.c" />
<ClCompile Include="..\..\tests\8000-idle.cpp" />
Expand Down

0 comments on commit b31363f

Please sign in to comment.