Skip to content

Commit

Permalink
Fix for an idempotent producer error, with a message batch not recons…
Browse files Browse the repository at this point in the history
…tructed identically when retried (confluentinc#4750)

Issues: confluentinc#4736
Fix for an idempotent producer error, with a message batch not reconstructed identically when retried. Caused the error message "Local: Inconsistent state: Unable to reconstruct MessageSet".
Happening on large batches. Solved by using the same backoff baseline for all messages in the batch.
Happens since 2.2.0
  • Loading branch information
emasab authored and blindspotbounty committed Oct 11, 2024
1 parent f5811a7 commit f0194e9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
42 changes: 42 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,48 @@

librdkafka v2.3.1 is a feature release:

* Fix segfault when using long client id because of erased segment when using flexver. (#4689)
* Fix for an idempotent producer error, with a message batch not reconstructed
identically when retried (#4750)


## Enhancements

* Update bundled lz4 (used when `./configure --disable-lz4-ext`) to
[v1.9.4](https://github.com/lz4/lz4/releases/tag/v1.9.4), which contains
bugfixes and performance improvements (#4726).


## Fixes

### General fixes

* Issues: [confluentinc/confluent-kafka-dotnet#2084](https://github.com/confluentinc/confluent-kafka-dotnet/issues/2084)
Fix segfault when a segment is erased and more data is written to the buffer.
Happens since 1.x when a portion of the buffer (segment) is erased for flexver or compression.
More likely to happen since 2.1.0, because of the upgrades to flexver, with certain string sizes like a long client id (#4689).

### Idempotent producer fixes

* Issues: #4736
Fix for an idempotent producer error, with a message batch not reconstructed
identically when retried. Caused the error message "Local: Inconsistent state: Unable to reconstruct MessageSet".
Happening on large batches. Solved by using the same backoff baseline for all messages
in the batch.
Happens since 2.2.0 (#4750).



# librdkafka v2.4.0

librdkafka v2.4.0 is a feature release:

* [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): The Next Generation of the Consumer Rebalance Protocol.
**Early Access**: This should be used only for evaluation and must not be used in production. Features and contract of this KIP might change in future (#4610).
* [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records): Augment ProduceResponse error messaging for specific culprit records (#4583).
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes,
check the [release notes](https://www.openssl.org/news/cl30.txt).

Expand Down
12 changes: 10 additions & 2 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -896,14 +896,22 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq,
int retry_max_ms) {
rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable);
rd_kafka_msg_t *rkm, *tmp;
rd_ts_t now;
int64_t jitter = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT,
100 + RD_KAFKA_RETRY_JITTER_PERCENT);
/* Scan through messages to see which ones are eligible for retry,
* move the retryable ones to temporary queue and
* set backoff time for first message and optionally
* increase retry count for each message.
* Sorted insert is not necessary since the original order
* srcq order is maintained. */
* srcq order is maintained.
*
* Start timestamp for calculating backoff is common,
* to avoid that messages from the same batch
* have different backoff, as they need to be retried
* by reconstructing the same batch, when idempotency is
* enabled. */
now = rd_clock();
TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) {
if (rkm->rkm_u.producer.retries + incr_retry > max_retries)
continue;
Expand All @@ -927,7 +935,7 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq,
backoff = jitter * backoff * 10;
if (backoff > retry_max_ms * 1000)
backoff = retry_max_ms * 1000;
backoff = rd_clock() + backoff;
backoff = now + backoff;
}
rkm->rkm_u.producer.ts_backoff = backoff;

Expand Down

0 comments on commit f0194e9

Please sign in to comment.