From ee8b12d9087a078142f42eda71a554e8bf7709fd Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 20 Sep 2021 21:05:13 -0600 Subject: [PATCH] producer: fail topics that repeatedly fail to load after 5 tries If a record is produced to a topic that cannot load, it makes no sense to continue trying to load it. Rather than trying indefinitely up to the RecordDeliveryTimeout or the RecordRetries limit, we now fail after 5 load attempts. Given that we round-robin the brokers we load metadata from, and given that this unknown load only happens on the first record for a topic, 5 tries should be a safe default for any produce. --- pkg/kgo/config.go | 4 ++++ pkg/kgo/producer.go | 3 +++ 2 files changed, 7 insertions(+) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index db80b39f..0f923680 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -875,6 +875,10 @@ func ProduceRequestTimeout(limit time.Duration) ProducerOpt { // one record only to produce a later one successfully. This also allows for // easier sequence number ordering internally. // +// If a topic repeatedly fails to load with UNKNOWN_TOPIC_OR_PARTITION, it has +// a different, internal retry limit. All records for a topic that repeatedly +// cannot be loaded are failed when the internal limit is hit. +// // This option is different from RequestRetries to allow finer grained control // of when to fail when producing records. func RecordRetries(n int) ProducerOpt { diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index c18f9100..872887d6 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -716,6 +716,9 @@ func (cl *Client) waitUnknownTopic( if int64(tries) >= cl.cfg.recordRetries { err = fmt.Errorf("no partitions available after attempting to refresh metadata %d times, last err: %w", tries, retriableErr) } + if tries > 5 && errors.Is(retriableErr, kerr.UnknownTopicOrPartition) { + err = retriableErr + } } }