Skip to content

Commit

Permalink
Merge pull request #391 from twmb/no_consume_recreated
Browse files Browse the repository at this point in the history
Remove ConsumeRecreatedTopics, add minor internal improvements
  • Loading branch information
twmb committed Mar 14, 2023
2 parents ee51d5a + d27690a commit 0eae6ae
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 148 deletions.
25 changes: 5 additions & 20 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,8 @@ type cfg struct {
rack string
preferLagFn PreferLagFn

maxConcurrentFetches int
disableFetchSessions bool
consumeRecreatedTopics bool
maxConcurrentFetches int
disableFetchSessions bool

topics map[string]*regexp.Regexp // topics to consume; if regex is true, values are compiled regular expressions
partitions map[string]map[int32]Offset // partitions to directly consume from
Expand Down Expand Up @@ -870,7 +869,9 @@ func RequiredAcks(acks Acks) ProducerOpt {

// DisableIdempotentWrite disables idempotent produce requests, opting out of
// Kafka server-side deduplication in the face of reissued requests due to
// transient network problems.
// transient network problems. Disabling idempotent write by default
// upper-bounds the number of in-flight produce requests per broker to 1, vs.
// the default of 5 when using idempotency.
//
// Idempotent production is strictly a win, but does require the
// IDEMPOTENT_WRITE permission on CLUSTER (pre Kafka 3.0), and not all clients
Expand Down Expand Up @@ -1341,22 +1342,6 @@ func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.preferLagFn = fn }}
}

// ConsumeRecreatedTopics opts into consuming topics that are recreated in
// Kafka 3.1+. Starting in 3.1, Kafka uses unique topic IDs when fetching, and
// if you delete and recreate your topic while a consumer is active, the
// consumer will begin failing because the topic ID has changed. This option
// lets you opt into consuming the recreated topic.
//
// Internally, this option causes the client to purge the topic from the
// consumer and then re-add it, which likely will cause rebalances for consumer
// groups. As well, other consumers in the consumer group will independently
// discover the topic ID has changed, so they will rejoin at different points.
// These rejoins likely will all happen close together, but be aware there will
// be a rebalance storm if you recreate your topic.
func ConsumeRecreatedTopics() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.consumeRecreatedTopics = true }}
}

//////////////////////////////////
// CONSUMER GROUP CONFIGURATION //
//////////////////////////////////
Expand Down
27 changes: 0 additions & 27 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,6 @@ type consumer struct {
pollWaitMu sync.Mutex
pollWaitC *sync.Cond
pollWaitState uint64 // 0 == nothing, low 32 bits: # pollers, high 32: # waiting rebalances

unknownTopicReloadingMu sync.Mutex
unknownTopicReloading map[string]struct{}
}

func (c *consumer) loadPaused() pausedTopics { return c.paused.Load().(pausedTopics) }
Expand Down Expand Up @@ -681,30 +678,6 @@ func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bo
}
}

func (c *consumer) reloadUnknownTopicID(topic string) {
c.unknownTopicReloadingMu.Lock()
defer c.unknownTopicReloadingMu.Unlock()
if c.unknownTopicReloading == nil {
c.unknownTopicReloading = make(map[string]struct{})
}
if _, exists := c.unknownTopicReloading[topic]; exists {
return
}
c.unknownTopicReloading[topic] = struct{}{}
go func() {
c.purgeTopics([]string{topic})
// At this point, the topic cannot be returned anymore from
// fetches. Delete our guard.
c.unknownTopicReloadingMu.Lock()
delete(c.unknownTopicReloading, topic)
if len(c.unknownTopicReloading) == 0 {
c.unknownTopicReloading = nil
}
c.unknownTopicReloadingMu.Unlock()
c.cl.AddConsumeTopics(topic)
}()
}

// This is guaranteed to be called in a blocking metadata fn, which ensures
// that metadata does not load the tps we are changing. Basically, we ensure
// everything w.r.t. consuming is at a stand still.
Expand Down
86 changes: 0 additions & 86 deletions pkg/kgo/consumer_test.go

This file was deleted.

12 changes: 2 additions & 10 deletions pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,6 @@ func tmpTopicPartitions(tb testing.TB, partitions int) (string, func()) {
return tmpNamedTopicPartitions(tb, topic, partitions)
}

func tmpNamedTopic(tb testing.TB, topic string) func() {
partitions := npartitions[int(atomic.AddInt64(&npartitionsAt, 1))%len(npartitions)]
_, cleanup := tmpNamedTopicPartitions(tb, topic, partitions)
return cleanup
}

func tmpNamedTopicPartitions(tb testing.TB, topic string, partitions int) (string, func()) {
tb.Helper()

Expand All @@ -147,16 +141,14 @@ issue:
// starts, we can receive dial errors for a bit if the container is not
// fully initialized. Handle this by retrying specifically dial errors.
if ne := (*net.OpError)(nil); errors.As(err, &ne) && ne.Op == "dial" && time.Since(start) < 5*time.Second {
tb.Log("topic creation failed with dial error, sleeping 100ms and trying again")
time.Sleep(100 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
goto issue
}

if err == nil {
err = kerr.ErrorForCode(resp.Topics[0].ErrorCode)
if errors.Is(err, kerr.TopicAlreadyExists) {
tb.Log("topic creation failed with already exists, sleeping 100ms and trying again")
time.Sleep(100 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
goto issue
}
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,18 @@ func (cl *Client) mergeTopicPartitions(
}
}

// For any partitions **not currently in use**, we need to add them to
// the sink or source. If they are in use, they could be getting
// managed or moved by the sink or source itself, so we should not
// check the index field (which may be concurrently modified).
if len(lv.partitions) > len(r.partitions) {
return
}
newPartitions := r.partitions[len(lv.partitions):]

// Anything left with a negative recBufsIdx / cursorsIdx is a new topic
// partition and must be added to the sink / source.
for _, newTP := range r.partitions {
for _, newTP := range newPartitions {
if isProduce && newTP.records.recBufsIdx == -1 {
newTP.records.sink.addRecBuf(newTP.records)
} else if !isProduce && newTP.cursor.cursorsIdx == -1 {
Expand Down
20 changes: 17 additions & 3 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type cursor struct {
topicID [16]byte
partition int32

unknownIDFails atomicI32

keepControl bool // whether to keep control records

cursorsIdx int // updated under source mutex
Expand Down Expand Up @@ -874,16 +876,28 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
keep = true
}

case nil:
partOffset.from.unknownIDFails.Store(0)
keep = true

case kerr.UnknownTopicID:
// We need to keep UnknownTopicID even though it is
// retryable, because encountering this error means
// the topic has been recreated and we will never
// consume the topic again anymore. This is an error
// worth bubbling up.
if s.cl.cfg.consumeRecreatedTopics {
s.cl.consumer.reloadUnknownTopicID(topic)
} else {
//
// FUN FACT: Kafka will actually return this error
// for a brief window immediately after creating a
// topic for the first time, meaning the controller
// has not yet propagated to the leader that it is
// the leader of a new partition. We need to ignore
// this error for a _litttttlllleee bit_.
if fails := partOffset.from.unknownIDFails.Add(1); fails > 5 {
partOffset.from.unknownIDFails.Add(-1)
keep = true
} else {
numErrsStripped++
}

case kerr.OffsetOutOfRange:
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *testConsumer) transact(txnsBeforeQuit int) {
// It also returns NotLeaderXyz; we handle both problems.
UnknownTopicRetries(-1),
TransactionalID(randsha()),
TransactionTimeout(10 * time.Second),
TransactionTimeout(60 * time.Second),
WithLogger(testLogger()),
// Control records have their own unique offset, so for testing,
// we keep the record to ensure we do not doubly consume control
Expand Down

0 comments on commit 0eae6ae

Please sign in to comment.