Skip to content

Commit

Permalink
Add leader epoch to control messages
Browse files Browse the repository at this point in the history
to make sure they're stored for committing
even without a subsequent fetch message
  • Loading branch information
emasab committed Sep 20, 2023
1 parent 5fc10ab commit 9c44d55
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ librdkafka v2.2.1 is a maintenance release:

* Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
* Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0.
* Fix to add leader epoch to control messages, to make sure they're stored
for committing even without a subsequent fetch message (#4434).



Expand Down
71 changes: 41 additions & 30 deletions src/rdkafka_msgset_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -668,14 +668,14 @@ rd_kafka_msgset_reader_msg_v0_1(rd_kafka_msgset_reader_t *msetr) {

/* Create op/message container for message. */
rko = rd_kafka_op_new_fetch_msg(
&rkm, rktp, msetr->msetr_tver->version, rkbuf, hdr.Offset,
&rkm, rktp, msetr->msetr_tver->version, rkbuf,
RD_KAFKA_FETCH_POS(hdr.Offset, msetr->msetr_leader_epoch),
(size_t)RD_KAFKAP_BYTES_LEN(&Key),
RD_KAFKAP_BYTES_IS_NULL(&Key) ? NULL : Key.data,
(size_t)RD_KAFKAP_BYTES_LEN(&Value),
RD_KAFKAP_BYTES_IS_NULL(&Value) ? NULL : Value.data);

rkm->rkm_u.consumer.leader_epoch = msetr->msetr_leader_epoch;
rkm->rkm_broker_id = msetr->msetr_broker_id;
rkm->rkm_broker_id = msetr->msetr_broker_id;

/* Assign message timestamp.
* If message was in a compressed MessageSet and the outer/wrapper
Expand Down Expand Up @@ -733,6 +733,7 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) {
? LOG_DEBUG
: 0;
size_t message_end;
rd_kafka_fetch_pos_t msetr_pos;

rd_kafka_buf_read_varint(rkbuf, &hdr.Length);
message_end =
Expand All @@ -742,15 +743,23 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) {
rd_kafka_buf_read_varint(rkbuf, &hdr.TimestampDelta);
rd_kafka_buf_read_varint(rkbuf, &hdr.OffsetDelta);
hdr.Offset = msetr->msetr_v2_hdr->BaseOffset + hdr.OffsetDelta;

/* Skip message if outdated */
msetr_pos = RD_KAFKA_FETCH_POS(hdr.Offset, msetr->msetr_leader_epoch);

/* Skip message if outdated.
* Don't check offset leader epoch, just log it, as if current leader
* epoch is different the fetch will fail (KIP-320) and if offset leader
* epoch is different it'll return an empty fetch (KIP-595). If we
* checked it, it's possible to have a loop when moving from a broker
* that supports leader epoch to one that doesn't. */
if (hdr.Offset < rktp->rktp_offsets.fetch_pos.offset) {
rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG",
"%s [%" PRId32
"]: "
"Skip offset %" PRId64 " < fetch_offset %" PRId64,
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
hdr.Offset, rktp->rktp_offsets.fetch_pos.offset);
rd_rkb_dbg(
msetr->msetr_rkb, MSG, "MSG",
"%s [%" PRId32
"]: "
"Skip %s < fetch %s",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_kafka_fetch_pos2str(msetr_pos),
rd_kafka_fetch_pos2str(rktp->rktp_offsets.fetch_pos));
rd_kafka_buf_skip_to(rkbuf, message_end);
return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */
}
Expand All @@ -771,10 +780,11 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) {
rkbuf,
"%s [%" PRId32
"]: "
"Ctrl message at offset %" PRId64
"Ctrl message at %s"
" has invalid key size %" PRId64,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, hdr.Offset,
rktp->rktp_partition,
rd_kafka_fetch_pos2str(msetr_pos),
ctrl_data.KeySize);

rd_kafka_buf_read_i16(rkbuf, &ctrl_data.Version);
Expand All @@ -784,11 +794,10 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) {
"%s [%" PRId32
"]: "
"Skipping ctrl msg with "
"unsupported version %" PRId16
" at offset %" PRId64,
"unsupported version %" PRId16 " at %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, ctrl_data.Version,
hdr.Offset);
rd_kafka_fetch_pos2str(msetr_pos));
rd_kafka_buf_skip_to(rkbuf, message_end);
return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next
msg */
Expand All @@ -799,10 +808,11 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) {
rkbuf,
"%s [%" PRId32
"]: "
"Ctrl message at offset %" PRId64
"Ctrl message at %s"
" has invalid key size %" PRId64,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, hdr.Offset,
rktp->rktp_partition,
rd_kafka_fetch_pos2str(msetr_pos),
ctrl_data.KeySize);

rd_kafka_buf_read_i16(rkbuf, &ctrl_data.Type);
Expand All @@ -827,14 +837,15 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) {
MSG | RD_KAFKA_DBG_EOS, "TXN",
"%s [%" PRId32
"] received abort txn "
"ctrl msg at offset %" PRId64
"ctrl msg at %s"
" for "
"PID %" PRId64
", but there are no "
"known aborted transactions: "
"ignoring",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, hdr.Offset,
rktp->rktp_partition,
rd_kafka_fetch_pos2str(msetr_pos),
msetr->msetr_v2_hdr->PID);
break;
}
Expand All @@ -844,22 +855,23 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) {
aborted_txn_start_offset =
rd_kafka_aborted_txns_pop_offset(
msetr->msetr_aborted_txns,
msetr->msetr_v2_hdr->PID, hdr.Offset);
msetr->msetr_v2_hdr->PID, msetr_pos.offset);

if (unlikely(aborted_txn_start_offset == -1)) {
rd_rkb_dbg(msetr->msetr_rkb,
MSG | RD_KAFKA_DBG_EOS, "TXN",
"%s [%" PRId32
"] received abort txn "
"ctrl msg at offset %" PRId64
"ctrl msg at %s"
" for "
"PID %" PRId64
", but this offset is "
"not listed as an aborted "
"transaction: aborted transaction "
"was possibly empty: ignoring",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, hdr.Offset,
rktp->rktp_partition,
rd_kafka_fetch_pos2str(msetr_pos),
msetr->msetr_v2_hdr->PID);
break;
}
Expand All @@ -873,16 +885,16 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) {
"]: "
"Unsupported ctrl message "
"type %" PRId16
" at offset"
" %" PRId64 ": ignoring",
" at "
" %s: ignoring",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, ctrl_data.Type,
hdr.Offset);
rd_kafka_fetch_pos2str(msetr_pos));
break;
}

rko = rd_kafka_op_new_ctrl_msg(rktp, msetr->msetr_tver->version,
rkbuf, hdr.Offset);
rkbuf, msetr_pos);
rd_kafka_q_enq(&msetr->msetr_rkq, rko);
msetr->msetr_msgcnt++;

Expand All @@ -905,14 +917,13 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) {

/* Create op/message container for message. */
rko = rd_kafka_op_new_fetch_msg(
&rkm, rktp, msetr->msetr_tver->version, rkbuf, hdr.Offset,
&rkm, rktp, msetr->msetr_tver->version, rkbuf, msetr_pos,
(size_t)RD_KAFKAP_BYTES_LEN(&hdr.Key),
RD_KAFKAP_BYTES_IS_NULL(&hdr.Key) ? NULL : hdr.Key.data,
(size_t)RD_KAFKAP_BYTES_LEN(&hdr.Value),
RD_KAFKAP_BYTES_IS_NULL(&hdr.Value) ? NULL : hdr.Value.data);

rkm->rkm_u.consumer.leader_epoch = msetr->msetr_leader_epoch;
rkm->rkm_broker_id = msetr->msetr_broker_id;
rkm->rkm_broker_id = msetr->msetr_broker_id;

/* Store pointer to unparsed message headers, they will
* be parsed on the first access.
Expand Down
9 changes: 5 additions & 4 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -747,11 +747,11 @@ rd_kafka_op_call(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
rd_kafka_op_t *rd_kafka_op_new_ctrl_msg(rd_kafka_toppar_t *rktp,
int32_t version,
rd_kafka_buf_t *rkbuf,
int64_t offset) {
rd_kafka_fetch_pos_t pos) {
rd_kafka_msg_t *rkm;
rd_kafka_op_t *rko;

rko = rd_kafka_op_new_fetch_msg(&rkm, rktp, version, rkbuf, offset, 0,
rko = rd_kafka_op_new_fetch_msg(&rkm, rktp, version, rkbuf, pos, 0,
NULL, 0, NULL);

rkm->rkm_flags |= RD_KAFKA_MSG_F_CONTROL;
Expand All @@ -770,7 +770,7 @@ rd_kafka_op_t *rd_kafka_op_new_fetch_msg(rd_kafka_msg_t **rkmp,
rd_kafka_toppar_t *rktp,
int32_t version,
rd_kafka_buf_t *rkbuf,
int64_t offset,
rd_kafka_fetch_pos_t pos,
size_t key_len,
const void *key,
size_t val_len,
Expand All @@ -792,7 +792,8 @@ rd_kafka_op_t *rd_kafka_op_new_fetch_msg(rd_kafka_msg_t **rkmp,
rko->rko_u.fetch.rkbuf = rkbuf;
rd_kafka_buf_keep(rkbuf);

rkm->rkm_offset = offset;
rkm->rkm_offset = pos.offset;
rkm->rkm_u.consumer.leader_epoch = pos.leader_epoch;

rkm->rkm_key = (void *)key;
rkm->rkm_key_len = key_len;
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ rd_kafka_op_t *rd_kafka_op_new_fetch_msg(rd_kafka_msg_t **rkmp,
rd_kafka_toppar_t *rktp,
int32_t version,
rd_kafka_buf_t *rkbuf,
int64_t offset,
rd_kafka_fetch_pos_t pos,
size_t key_len,
const void *key,
size_t val_len,
Expand All @@ -734,7 +734,7 @@ rd_kafka_op_t *rd_kafka_op_new_fetch_msg(rd_kafka_msg_t **rkmp,
rd_kafka_op_t *rd_kafka_op_new_ctrl_msg(rd_kafka_toppar_t *rktp,
int32_t version,
rd_kafka_buf_t *rkbuf,
int64_t offset);
rd_kafka_fetch_pos_t pos);

void rd_kafka_op_throttle_time(struct rd_kafka_broker_s *rkb,
rd_kafka_q_t *rkq,
Expand Down

0 comments on commit 9c44d55

Please sign in to comment.