Skip to content

Commit

Permalink
Revert "kgo: add ConsumeRecreatedTopics option, for UNKNOWN_TOPIC_ID"
Browse files Browse the repository at this point in the history
This reverts commit 4e882fa.

This has WAY more edge cases than I anticipated. Last failure that I
decided to not work around was epoch loading during a topic recreation,
we will get a data loss error.
  • Loading branch information
twmb committed Mar 13, 2023
1 parent ee51d5a commit 9a8f04a
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 182 deletions.
29 changes: 6 additions & 23 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,8 @@ type cfg struct {
rack string
preferLagFn PreferLagFn

maxConcurrentFetches int
disableFetchSessions bool
consumeRecreatedTopics bool
maxConcurrentFetches int
disableFetchSessions bool

topics map[string]*regexp.Regexp // topics to consume; if regex is true, values are compiled regular expressions
partitions map[string]map[int32]Offset // partitions to directly consume from
Expand Down Expand Up @@ -629,7 +628,7 @@ func DialTimeout(timeout time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.dialTimeout = timeout }}
}

// DialTLSConfig opts into dialing brokers with the given TLS config with a
// DialTLSConfig opts in to dialing brokers with the given TLS config with a
// 10s dial timeout. This is a shortcut for manually specifying a tls dialer
// using the Dialer option. You can also change the default 10s timeout with
// DialTimeout.
Expand Down Expand Up @@ -1295,7 +1294,7 @@ func ConsumeRegex() ConsumerOpt {
//
// A "fetch session" is is a way to reduce bandwidth for fetch requests &
// responses, and to potentially reduce the amount of work that brokers have to
// do to handle fetch requests. A fetch session opts into the broker tracking
// do to handle fetch requests. A fetch session opts in to the broker tracking
// some state of what the client is interested in. For example, say that you
// are interested in thousands of topics, and most of these topics are
// receiving data only rarely. A fetch session allows the client to register
Expand Down Expand Up @@ -1341,22 +1340,6 @@ func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.preferLagFn = fn }}
}

// ConsumeRecreatedTopics opts into consuming topics that are recreated in
// Kafka 3.1+. Starting in 3.1, Kafka uses unique topic IDs when fetching, and
// if you delete and recreate your topic while a consumer is active, the
// consumer will begin failing because the topic ID has changed. This option
// lets you opt into consuming the recreated topic.
//
// Internally, this option causes the client to purge the topic from the
// consumer and then re-add it, which likely will cause rebalances for consumer
// groups. As well, other consumers in the consumer group will independently
// discover the topic ID has changed, so they will rejoin at different points.
// These rejoins likely will all happen close together, but be aware there will
// be a rebalance storm if you recreate your topic.
func ConsumeRecreatedTopics() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.consumeRecreatedTopics = true }}
}

//////////////////////////////////
// CONSUMER GROUP CONFIGURATION //
//////////////////////////////////
Expand All @@ -1383,7 +1366,7 @@ func ConsumerGroup(group string) GroupOpt {
// For balancing, Kafka chooses the first protocol that all group members agree
// to support.
//
// Note that if you opt into cooperative-sticky rebalancing, cooperative group
// Note that if you opt in to cooperative-sticky rebalancing, cooperative group
// balancing is incompatible with eager (classical) rebalancing and requires a
// careful rollout strategy (see KIP-429).
func Balancers(balancers ...GroupBalancer) GroupOpt {
Expand Down Expand Up @@ -1619,7 +1602,7 @@ func DisableAutoCommit() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.autocommitDisable = true }}
}

// GreedyAutoCommit opts into committing everything that has been polled when
// GreedyAutoCommit opts in to committing everything that has been polled when
// autocommitting (the dirty offsets), rather than committing what has
// previously been polled. This option may result in message loss if your
// application crashes.
Expand Down
27 changes: 0 additions & 27 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,6 @@ type consumer struct {
pollWaitMu sync.Mutex
pollWaitC *sync.Cond
pollWaitState uint64 // 0 == nothing, low 32 bits: # pollers, high 32: # waiting rebalances

unknownTopicReloadingMu sync.Mutex
unknownTopicReloading map[string]struct{}
}

func (c *consumer) loadPaused() pausedTopics { return c.paused.Load().(pausedTopics) }
Expand Down Expand Up @@ -681,30 +678,6 @@ func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bo
}
}

func (c *consumer) reloadUnknownTopicID(topic string) {
c.unknownTopicReloadingMu.Lock()
defer c.unknownTopicReloadingMu.Unlock()
if c.unknownTopicReloading == nil {
c.unknownTopicReloading = make(map[string]struct{})
}
if _, exists := c.unknownTopicReloading[topic]; exists {
return
}
c.unknownTopicReloading[topic] = struct{}{}
go func() {
c.purgeTopics([]string{topic})
// At this point, the topic cannot be returned anymore from
// fetches. Delete our guard.
c.unknownTopicReloadingMu.Lock()
delete(c.unknownTopicReloading, topic)
if len(c.unknownTopicReloading) == 0 {
c.unknownTopicReloading = nil
}
c.unknownTopicReloadingMu.Unlock()
c.cl.AddConsumeTopics(topic)
}()
}

// This is guaranteed to be called in a blocking metadata fn, which ensures
// that metadata does not load the tps we are changing. Basically, we ensure
// everything w.r.t. consuming is at a stand still.
Expand Down
86 changes: 0 additions & 86 deletions pkg/kgo/consumer_test.go

This file was deleted.

2 changes: 0 additions & 2 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ var (
// Returned for all buffered produce records when a user purges topics.
errPurged = errors.New("topic purged while buffered")

errMissingMetadataPartition = errors.New("metadata update is missing a partition that we were previously using")

//////////////
// EXTERNAL //
//////////////
Expand Down
21 changes: 3 additions & 18 deletions pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,14 @@ var randsha = func() func() string {

func tmpTopic(tb testing.TB) (string, func()) {
partitions := npartitions[int(atomic.AddInt64(&npartitionsAt, 1))%len(npartitions)]
topic := randsha()
return tmpNamedTopicPartitions(tb, topic, partitions)
return tmpTopicPartitions(tb, partitions)
}

func tmpTopicPartitions(tb testing.TB, partitions int) (string, func()) {
topic := randsha()
return tmpNamedTopicPartitions(tb, topic, partitions)
}

func tmpNamedTopic(tb testing.TB, topic string) func() {
partitions := npartitions[int(atomic.AddInt64(&npartitionsAt, 1))%len(npartitions)]
_, cleanup := tmpNamedTopicPartitions(tb, topic, partitions)
return cleanup
}

func tmpNamedTopicPartitions(tb testing.TB, topic string, partitions int) (string, func()) {
tb.Helper()

topic := randsha()

req := kmsg.NewPtrCreateTopicsRequest()
reqTopic := kmsg.NewCreateTopicsRequestTopic()
reqTopic.Topic = topic
Expand All @@ -154,11 +144,6 @@ issue:

if err == nil {
err = kerr.ErrorForCode(resp.Topics[0].ErrorCode)
if errors.Is(err, kerr.TopicAlreadyExists) {
tb.Log("topic creation failed with already exists, sleeping 100ms and trying again")
time.Sleep(100 * time.Millisecond)
goto issue
}
}
if err != nil {
tb.Fatalf("unable to create topic %q: %v", topic, err)
Expand Down
34 changes: 14 additions & 20 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,37 +662,31 @@ func (cl *Client) mergeTopicPartitions(
//
// Both of these scenarios should be rare to non-existent, and we do
// nothing if we encounter them.
//
// NOTE: we previously removed deleted partitions, but this was
// removed. Deleting partitions really should not happen, and not
// doing so simplifies the code.
//
// See commit 385cecb928e9ec3d9610c7beb223fcd1ed303fd0 for the last
// commit that contained deleting partitions.

// Migrating topicPartitions is a little tricky because we have to
// worry about underlying pointers that may currently be loaded.
for part, oldTP := range lv.partitions {
exists := part < len(r.partitions)
if !exists {
// This is the "deleted" case; see the comment above.
//
// We need to keep the partition around. For producing,
// the partition could be loaded and a record could be
// added to it after we bump the load error. For
// consuming, the partition is part of a group or part
// of what was loaded for direct consuming.
//
// We only clear a partition if it is purged from the
// client (which can happen automatically for consumers
// if the user opted into ConsumeRecreatedTopics).
dup := *oldTP
newTP := &dup
newTP.loadErr = errMissingMetadataPartition

r.partitions = append(r.partitions, newTP)
// We will just keep our old information.
r.partitions = append(r.partitions, oldTP)
if oldTP.loadErr == nil {
r.writablePartitions = append(r.writablePartitions, oldTP)
}

cl.cfg.logger.Log(LogLevelDebug, "metadata update is missing partition in topic, we are keeping the partition around for safety -- use PurgeTopicsFromClient if you wish to remove the topic",
cl.cfg.logger.Log(LogLevelDebug, "metadata update is missing partition in topic, keeping old data",
"topic", topic,
"partition", part,
)
if isProduce {
oldTP.records.bumpRepeatedLoadErr(errMissingMetadataPartition)
}
retryWhy.add(topic, int32(part), errMissingMetadataPartition)

continue
}
newTP := r.partitions[part]
Expand Down
8 changes: 2 additions & 6 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
s.session.reset()
return

case kerr.FetchSessionTopicIDError, kerr.InconsistentTopicID:
case kerr.FetchSessionTopicIDError, kerr.UnknownTopicID, kerr.InconsistentTopicID:
s.cl.cfg.logger.Log(LogLevelInfo, "topic id issues, resetting session and updating metadata", "broker", logID(s.nodeID), "err", err)
s.session.reset()
s.cl.triggerUpdateMetadataNow("topic id issues")
Expand Down Expand Up @@ -880,11 +880,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// the topic has been recreated and we will never
// consume the topic again anymore. This is an error
// worth bubbling up.
if s.cl.cfg.consumeRecreatedTopics {
s.cl.consumer.reloadUnknownTopicID(topic)
} else {
keep = true
}
keep = true

case kerr.OffsetOutOfRange:
// If we are out of range, we reset to what we can.
Expand Down

0 comments on commit 9a8f04a

Please sign in to comment.