Skip to content

Commit

Permalink
Raise fatal error on transactional OUT_OF_ORDER_SEQ error (#3575)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Oct 25, 2021
1 parent a8dfcb8 commit 10dda4a
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 1 deletion.
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@

librdkafka v1.6.2 is a maintenance release with the following backported fixes:

* Upon quick repeated leader changes the transactional producer could receive
an `OUT_OF_ORDER_SEQUENCE` error from the broker, which triggered an
Epoch bump on the producer resulting in an InitProducerIdRequest being sent
to the transaction coordinator in the middle of a transaction.
This request would start a new transaction on the coordinator, but the
producer would still think (erroneously) it was in the current transaction.
Any messages produced in the current transaction prior to this event would
be silently lost when the application committed the transaction, leading
to message loss.
To avoid message loss a fatal error is now raised.
This fix is specific to v1.6.x. librdkafka v1.8.x implements a recoverable
error state instead. #3575.
* The transactional producer could stall during a transaction if the transaction
coordinator changed while adding offsets to the transaction (send_offsets_to_transaction()).
This stall lasted until the coordinator connection went down, the
Expand All @@ -11,7 +23,7 @@ librdkafka v1.6.2 is a maintenance release with the following backported fixes:
which would result in some timeout operations not being enforced correctly,
e.g., the transactional producer API timeouts.
These timers are now started with a timeout of 1 microsecond.
* Force address resolution if the broker epoch changes (#3238)
* Force address resolution if the broker epoch changes (#3238).


# librdkafka v1.6.1
Expand Down
28 changes: 28 additions & 0 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2654,6 +2654,34 @@ rd_kafka_handle_idempotent_Produce_error (rd_kafka_broker_t *rkb,
perr->update_next_ack = rd_false;
perr->update_next_err = rd_true;

if (rd_kafka_is_transactional(rk))
rd_kafka_txn_set_fatal_error(
rk, RD_DO_LOCK,
perr->err,
"ProduceRequest for %.*s [%"PRId32"] "
"with %d message(s) failed "
"due to skipped sequence numbers "
"(%s, base seq %"PRId32" > "
"next seq %"PRId32") "
"caused by previous failed request "
"(%s, actions %s, "
"base seq %"PRId32"..%"PRId32
", base msgid %"PRIu64", %"PRId64"ms "
"ago)",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->
rkt_topic),
rktp->rktp_partition,
rd_kafka_msgq_len(&batch->msgq),
rd_kafka_pid2str(batch->pid),
batch->first_seq,
perr->next_ack_seq,
rd_kafka_err2name(last_err.err),
rd_kafka_actions2str(last_err.actions),
last_err.base_seq, last_err.last_seq,
last_err.base_msgid,
last_err.ts ?
(now - last_err.ts)/1000 : -1);

rd_kafka_idemp_drain_epoch_bump(
rk, "skipped sequence numbers");

Expand Down
124 changes: 124 additions & 0 deletions tests/0105-transactions_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2305,6 +2305,128 @@ static void do_test_unstable_offset_commit (void) {
}



/**
* @brief #3575: OUT_OF_ORDER_SEQ must raise a fatal txn error.
*
* This is a special fix for v1.6.x to avoid silent message loss.
* v1.8.x and later has a proper fix that transitions the transaction to
* the abortable state.
*/
static void do_test_out_of_order_seq_is_fatal (void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
int32_t txn_coord = 1, leader = 2;
const char *txnid = "myTxnId";
test_timing_t timing;
rd_kafka_resp_err_t err;

SUB_TEST_QUICK();

rk = create_txn_producer(&mcluster, txnid, 3,
"batch.num.messages", "1",
NULL);

rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);

rd_kafka_mock_partition_set_leader(mcluster, "mytopic", 0, leader);

test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = NULL;

TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));

/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));



/* Produce one seeding message first to get the leader up and running */
TEST_CALL_ERR__(rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
test_flush(rk, -1);

/* Let partition leader have a latency of 2 seconds
* so that we can have multiple messages in-flight. */
rd_kafka_mock_broker_set_rtt(mcluster, leader, 2*1000);

/* Produce a message, let it fail with with different errors,
* ending with OUT_OF_ORDER which previously triggered an
* Epoch bump. */
rd_kafka_mock_push_request_errors(
mcluster,
RD_KAFKAP_Produce,
3,
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER);

/* Produce three messages that will be delayed
* and have errors injected.*/
TEST_CALL_ERR__(rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
TEST_CALL_ERR__(rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
TEST_CALL_ERR__(rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));

/* Now sleep a short while so that the messages are processed
* by the broker and errors are returned. */
TEST_SAY("Sleeping..\n");
rd_sleep(5);

rd_kafka_mock_broker_set_rtt(mcluster, leader, 0);

/* Produce a fifth message, should fail with ERR__FATAL since
* a fatal error should have been raised. */
err = rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
"Expected produce() to fail with ERR__FATAL, not %s",
rd_kafka_err2name(err));
TEST_SAY("produce() failed as expected: %s\n",
rd_kafka_err2str(err));

/* Commit the transaction, should fail with a fatal error. */
TIMING_START(&timing, "commit_transaction(-1)");
error = rd_kafka_commit_transaction(rk, -1);
TIMING_STOP(&timing);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");

TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));

TEST_ASSERT(rd_kafka_error_is_fatal(error),
"Expected fatal error");
TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
"Did not expect abortable error");
rd_kafka_error_destroy(error);

rd_kafka_destroy(rk);

SUB_TEST_PASS();
}


int main_0105_transactions_mock (int argc, char **argv) {
if (test_needs_auth()) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
Expand Down Expand Up @@ -2363,5 +2485,7 @@ int main_0105_transactions_mock (int argc, char **argv) {

do_test_txn_switch_coordinator_refresh();

do_test_out_of_order_seq_is_fatal();

return 0;
}

0 comments on commit 10dda4a

Please sign in to comment.