Skip to content

Commit

Permalink
Merge pull request #662 from twmb/patches
Browse files Browse the repository at this point in the history
Patches
  • Loading branch information
twmb authored Jan 21, 2024
2 parents 0b3766d + 636b45c commit a8e33ff
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) {
}
})
if err != nil {
if !errors.Is(err, ErrClientClosed) && !strings.Contains(err.Error(), "operation was canceled") {
if !errors.Is(err, ErrClientClosed) && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "operation was canceled") {
if errors.Is(err, io.EOF) {
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker due to an immediate EOF, which often means the client is using TLS when the broker is not expecting it (is TLS misconfigured?)", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
return nil, &ErrFirstReadEOF{kind: firstReadTLS, err: err}
Expand Down
35 changes: 29 additions & 6 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,11 @@ func (cl *Client) OptValue(opt any) any {
// InstanceID("foo"),
// ConsumeTopics("foo", "bar"),
// )
// idValues = cl.OptValues(InstanceID) // idValues is []any{"foo", true}
// tValues = cl.OptValues(SessionTimeout) // tValues is []any{45 * time.Second}
// topics = cl.OptValues(ConsumeTopics) // topics is []any{[]string{"foo", "bar"}
// idValues = cl.OptValues(InstanceID) // idValues is []any{"foo", true}
// tValues = cl.OptValues(SessionTimeout) // tValues is []any{45 * time.Second}
// topics = cl.OptValues(ConsumeTopics) // topics is []any{[]string{"foo", "bar"}
// bpoll = cl.OptValues(BlockRebalanceOnPoll) // bpoll is []any{false}
// unknown = cl.OptValues("Unknown") // unknown is nil
// unknown = cl.OptValues("Unknown") // unknown is nil
// )
func (cl *Client) OptValues(opt any) []any {
name := namefn(opt)
Expand All @@ -237,7 +237,7 @@ func (cl *Client) OptValues(opt any) []any {
case namefn(SoftwareNameAndVersion):
return []any{cfg.softwareName, cfg.softwareVersion}
case namefn(WithLogger):
if cfg.logger != nil {
if _, wrapped := cfg.logger.(*wrappedLogger); wrapped {
return []any{cfg.logger.(*wrappedLogger).inner}
}
return []any{nil}
Expand Down Expand Up @@ -524,7 +524,7 @@ func NewClient(opts ...Opt) (*Client, error) {
// Opts returns the options that were used to create this client. This can be
// as a base to generate a new client, where you can add override options to
// the end of the original input list. If you want to know a specific option
// value, you can use ConfigValue or ConfigValues.
// value, you can use OptValue or OptValues.
func (cl *Client) Opts() []Opt {
return cl.opts
}
Expand Down Expand Up @@ -2250,8 +2250,12 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
}

resp, err := broker.waitResp(ctx, myIssue.req)
var errIsFromResp bool
if err == nil {
err = sharder.onResp(myUnderlyingReq, resp) // perform some potential cleanup, and potentially receive an error to retry
if ke := (*kerr.Error)(nil); errors.As(err, &ke) {
errIsFromResp = true
}
}

// If we failed to issue the request, we *maybe* will retry.
Expand Down Expand Up @@ -2279,6 +2283,14 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
return
}

// If we pulled an error out of the response body in an attempt
// to possibly retry, the request was NOT an error that we want
// to bubble as a shard error. The request was successful, we
// have a response. Before we add the shard, strip the error.
// The end user can parse the response ErrorCode.
if errIsFromResp {
err = nil
}
addShard(shard(broker, myUnderlyingReq, resp, err)) // the error was not retryable
}()
}
Expand Down Expand Up @@ -2404,6 +2416,7 @@ func (cl *Client) cachedMappedMetadata(ts ...string) (map[string]mappedMetadataT
cached[t] = tcached
} else {
needed = append(needed, t)
delete(cl.mappedMeta, t)
}
}
return cached, needed
Expand Down Expand Up @@ -2453,6 +2466,16 @@ func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useC
t.ps[partition.Partition] = partition
}
}
if len(meta.Topics) != len(cl.mappedMeta) {
for topic, mapped := range cl.mappedMeta {
if mapped.when.Equal(when) {
continue
}
if time.Since(mapped.when) > cl.cfg.metadataMinAge {
delete(cl.mappedMeta, topic)
}
}
}
return r, nil
}

Expand Down
34 changes: 20 additions & 14 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func NewErrFetch(err error) Fetches {
Topics: []FetchTopic{{
Topic: "",
Partitions: []FetchPartition{{
Partition: 0,
Partition: -1,
Err: err,
}},
}},
Expand Down Expand Up @@ -384,11 +384,11 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches {
}

// PollRecords waits for records to be available, returning as soon as any
// broker returns records in a fetch. If the context is nil, this function
// will return immediately with any currently buffered records.
// broker returns records in a fetch. If the context is nil, this function will
// return immediately with any currently buffered records.
//
// If the client is closed, a fake fetch will be injected that has no topic, a
// partition of 0, and a partition error of ErrClientClosed. If the context is
// partition of -1, and a partition error of ErrClientClosed. If the context is
// canceled, a fake fetch will be injected with ctx.Err. These injected errors
// can be used to break out of a poll loop.
//
Expand Down Expand Up @@ -636,22 +636,19 @@ func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) {
}

// SetOffsets sets any matching offsets in setOffsets to the given
// epoch/offset. Partitions that are not specified are not set. It is invalid
// to set topics that were not yet returned from a PollFetches: this function
// sets only partitions that were previously consumed, any extra partitions are
// skipped.
// epoch/offset. Partitions that are not specified are not set.
//
// If directly consuming, this function operates as expected given the caveats
// of the prior paragraph.
//
// If using transactions, it is advised to just use a GroupTransactSession and
// avoid this function entirely.
//
// If using group consuming, It is strongly recommended to use this function
// If using group consuming, it is strongly recommended to use this function
// outside of the context of a PollFetches loop and only when you know the
// group is not revoked (i.e., block any concurrent revoke while issuing this
// call) and to not use this concurrent with committing. Any other usage is
// prone to odd interactions.
// prone to odd interactions around rebalancing.
func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) {
cl.setOffsets(setOffsets, true)
}
Expand All @@ -661,6 +658,12 @@ func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bo
return
}

topics := make([]string, 0, len(setOffsets))
for topic := range setOffsets {
topics = append(topics, topic)
}
cl.AddConsumeTopics(topics...)

// We assignPartitions before returning, so we grab the consumer lock
// first to preserve consumer mu => group mu ordering, or to ensure
// no concurrent metadata assign for direct consuming.
Expand Down Expand Up @@ -747,15 +750,18 @@ func (cl *Client) AddConsumeTopics(topics ...string) {
c.mu.Lock()
defer c.mu.Unlock()

var added bool
if c.g != nil {
c.g.tps.storeTopics(topics)
added = c.g.tps.storeTopics(topics)
} else {
c.d.tps.storeTopics(topics)
added = c.d.tps.storeTopics(topics)
for _, topic := range topics {
c.d.m.addt(topic)
added = c.d.m.addt(topic) || added
}
}
cl.triggerUpdateMetadataNow("from AddConsumeTopics")
if added {
cl.triggerUpdateMetadataNow("from AddConsumeTopics")
}
}

// AddConsumePartitions adds new partitions to be consumed at the given
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/consumer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
// the topic is explicitly specified.
if useTopic {
partitions := topicPartitions.load()
if d.cfg.regex && partitions.isInternal {
if d.cfg.regex && partitions.isInternal || len(partitions.partitions) == 0 {
continue
}
toUseTopic := make(map[int32]Offset, len(partitions.partitions))
Expand Down
105 changes: 105 additions & 0 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"context"
"errors"
"fmt"
"os"
"sort"
"sync/atomic"
"testing"
"time"

"github.com/twmb/franz-go/pkg/kerr"
)

// Allow adding a topic to consume after the client is initialized with nothing
Expand Down Expand Up @@ -439,6 +442,7 @@ func TestIssue523(t *testing.T) {
MetadataMinAge(100*time.Millisecond),
FetchMaxWait(time.Second),
KeepRetryableFetchErrors(),
UnknownTopicRetries(-1),
)
defer cl.Close()

Expand Down Expand Up @@ -466,3 +470,104 @@ func TestIssue523(t *testing.T) {
}
}
}

func TestSetOffsetsForNewTopic(t *testing.T) {
t.Parallel()
t1, tcleanup := tmpTopicPartitions(t, 1)
defer tcleanup()

{
cl, _ := newTestClient(
DefaultProduceTopic(t1),
MetadataMinAge(100*time.Millisecond),
FetchMaxWait(time.Second),
UnknownTopicRetries(-1),
)
defer cl.Close()

if err := cl.ProduceSync(context.Background(), StringRecord("foo")).FirstErr(); err != nil {
t.Fatal(err)
}
cl.Close()
}

{
cl, _ := newTestClient(
MetadataMinAge(100*time.Millisecond),
FetchMaxWait(time.Second),
)
defer cl.Close()

cl.SetOffsets(map[string]map[int32]EpochOffset{
t1: {0: EpochOffset{Epoch: -1, Offset: 0}},
})
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
fs := cl.PollFetches(ctx)
cancel()
if errors.Is(fs.Err0(), context.DeadlineExceeded) {
t.Errorf("failed waiting for record")
return
}
if fs.NumRecords() == 0 {
t.Errorf("failed waiting for record")
return
}
cl.Close()
}

// Duplicate above, but with a group.
{
g1, gcleanup := tmpGroup(t)
defer gcleanup()

cl, _ := newTestClient(
MetadataMinAge(100*time.Millisecond),
FetchMaxWait(time.Second),
ConsumerGroup(g1),
WithLogger(BasicLogger(os.Stderr, LogLevelDebug, nil)),
)
defer cl.Close()

cl.SetOffsets(map[string]map[int32]EpochOffset{
t1: {0: EpochOffset{Epoch: -1, Offset: 0}},
})
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
fs := cl.PollFetches(ctx)
cancel()
if errors.Is(fs.Err0(), context.DeadlineExceeded) {
t.Errorf("failed waiting for record")
return
}
if fs.NumRecords() == 0 {
t.Errorf("failed waiting for record")
return
}
cl.Close()
}
}

func TestIssue648(t *testing.T) {
t.Parallel()
cl, _ := newTestClient(
MetadataMinAge(100*time.Millisecond),
ConsumeTopics("bizbazbuz"),
FetchMaxWait(time.Second),
KeepRetryableFetchErrors(),
)
defer cl.Close()
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
fs := cl.PollFetches(ctx)
cancel()

var found bool
fs.EachError(func(_ string, _ int32, err error) {
if !errors.Is(err, kerr.UnknownTopicOrPartition) {
t.Errorf("expected ErrUnknownTopicOrPartition, got %v", err)
} else {
found = true
}
})
if !found {
t.Errorf("did not see ErrUnknownTopicOrPartition")
}
}
23 changes: 14 additions & 9 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,6 @@ func (c *consumer) initGroup() {
}
g.tps.storeTopics(topics)
}

if !g.cfg.autocommitDisable && g.cfg.autocommitInterval > 0 {
g.cfg.logger.Log(LogLevelInfo, "beginning autocommit loop", "group", g.cfg.group)
go g.loopCommit()
}
}

// Manages the group consumer's join / sync / heartbeat / fetch offset flow.
Expand All @@ -380,6 +375,10 @@ func (c *consumer) initGroup() {
func (g *groupConsumer) manage() {
defer close(g.manageDone)
g.cfg.logger.Log(LogLevelInfo, "beginning to manage the group lifecycle", "group", g.cfg.group)
if !g.cfg.autocommitDisable && g.cfg.autocommitInterval > 0 {
g.cfg.logger.Log(LogLevelInfo, "beginning autocommit loop", "group", g.cfg.group)
go g.loopCommit()
}

var consecutiveErrors int
joinWhy := "beginning to manage the group lifecycle"
Expand Down Expand Up @@ -2131,11 +2130,17 @@ func (g *groupConsumer) loopCommit() {
g.noCommitDuringJoinAndSync.RLock()
g.mu.Lock()
if !g.blockAuto {
g.cfg.logger.Log(LogLevelDebug, "autocommitting", "group", g.cfg.group)
g.commit(g.ctx, g.getUncommittedLocked(true, false), func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
uncommitted := g.getUncommittedLocked(true, false)
if len(uncommitted) == 0 {
g.cfg.logger.Log(LogLevelDebug, "skipping autocommit due to no offsets to commit", "group", g.cfg.group)
g.noCommitDuringJoinAndSync.RUnlock()
g.cfg.commitCallback(cl, req, resp, err)
})
} else {
g.cfg.logger.Log(LogLevelDebug, "autocommitting", "group", g.cfg.group)
g.commit(g.ctx, uncommitted, func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
g.noCommitDuringJoinAndSync.RUnlock()
g.cfg.commitCallback(cl, req, resp, err)
})
}
} else {
g.noCommitDuringJoinAndSync.RUnlock()
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
for topic := range latest {
allTopics = append(allTopics, topic)
}
tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics)
tpsConsumerLoad, _ = tpsConsumer.ensureTopics(allTopics)
defer tpsConsumer.storeData(tpsConsumerLoad)

// For regex consuming, if a topic is not returned in the
Expand Down Expand Up @@ -695,6 +695,8 @@ func (cl *Client) mergeTopicPartitions(
for _, topicPartition := range lv.partitions {
topicPartition.records.bumpRepeatedLoadErr(lv.loadErr)
}
} else if !kerr.IsRetriable(r.loadErr) || cl.cfg.keepRetryableFetchErrors {
cl.consumer.addFakeReadyForDraining(topic, -1, r.loadErr, "metadata refresh has a load error on this entire topic")
}
retryWhy.add(topic, -1, r.loadErr)
return
Expand Down Expand Up @@ -753,7 +755,7 @@ func (cl *Client) mergeTopicPartitions(
}
newTP := r.partitions[part]

// Like above for the entire topic, an individual partittion
// Like above for the entire topic, an individual partition
// can have a load error. Unlike for the topic, individual
// partition errors are always retryable.
//
Expand All @@ -765,6 +767,8 @@ func (cl *Client) mergeTopicPartitions(
newTP.loadErr = err
if isProduce {
newTP.records.bumpRepeatedLoadErr(newTP.loadErr)
} else if !kerr.IsRetriable(newTP.loadErr) || cl.cfg.keepRetryableFetchErrors {
cl.consumer.addFakeReadyForDraining(topic, int32(part), newTP.loadErr, "metadata refresh has a load error on this partition")
}
retryWhy.add(topic, int32(part), newTP.loadErr)
continue
Expand Down
Loading

0 comments on commit a8e33ff

Please sign in to comment.