From 655c343963a859ca3ac4e011340c173e6050096a Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 1 Jul 2024 10:18:05 +0200 Subject: [PATCH] Update github.com/twmb/franz-go/pkg/kfake digest to ee4cbf5 (#8563) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 +- .../twmb/franz-go/pkg/kfake/00_produce.go | 21 +++- .../twmb/franz-go/pkg/kfake/01_fetch.go | 24 +++- .../franz-go/pkg/kfake/08_offset_commit.go | 10 +- .../franz-go/pkg/kfake/19_create_topics.go | 5 - .../pkg/kfake/23_offset_for_leader_epoch.go | 28 +++-- .../twmb/franz-go/pkg/kfake/cluster.go | 1 + .../twmb/franz-go/pkg/kfake/data.go | 62 ---------- .../twmb/franz-go/pkg/kfake/groups.go | 107 +++++++++++++----- vendor/modules.txt | 4 +- 11 files changed, 146 insertions(+), 122 deletions(-) diff --git a/go.mod b/go.mod index 8ba4d9e6344..3e9d5bbb603 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( github.com/thanos-io/objstore v0.0.0-20240622095743-1afe5d4bc3cd github.com/twmb/franz-go v1.17.0 github.com/twmb/franz-go/pkg/kadm v1.12.0 - github.com/twmb/franz-go/pkg/kfake v0.0.0-20240509060506-c77d58eb5693 + github.com/twmb/franz-go/pkg/kfake v0.0.0-20240613152313-ee4cbf59292f github.com/twmb/franz-go/pkg/kmsg v1.8.0 github.com/twmb/franz-go/plugin/kotel v1.4.1 github.com/twmb/franz-go/plugin/kprom v1.1.0 diff --git a/go.sum b/go.sum index 55f759857e1..82c0b70b85c 100644 --- a/go.sum +++ b/go.sum @@ -916,8 +916,8 @@ github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk= github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/twmb/franz-go/pkg/kadm v1.12.0 h1:I8P/gpXFzhl73QcAYmJu+1fOXvrynyH/MAotr2udEg4= github.com/twmb/franz-go/pkg/kadm v1.12.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0= -github.com/twmb/franz-go/pkg/kfake v0.0.0-20240509060506-c77d58eb5693 h1:7ad3LETpOy+6CLSgyT2aoQlr1E8cUg6fOJ7lhIhXefE= -github.com/twmb/franz-go/pkg/kfake v0.0.0-20240509060506-c77d58eb5693/go.mod h1:DCMFat7WCZfk946rqd9aVAcAmB6/rIcdMTslJSjJZgk= +github.com/twmb/franz-go/pkg/kfake v0.0.0-20240613152313-ee4cbf59292f h1:41JH/Hr8JuN5HBUHR7luhSpDlMVA7pq5URlf3OXIhlM= +github.com/twmb/franz-go/pkg/kfake v0.0.0-20240613152313-ee4cbf59292f/go.mod h1:nkBI/wGFp7t1NJnnCeJdS4sX5atPAqwCPpDXKuI7SC8= github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= github.com/twmb/franz-go/plugin/kotel v1.4.1 h1:HHdYllwjB9KRrI4rkEeMzMCw3SXsBIvgE2Uj81zWx3Q= diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/00_produce.go b/vendor/github.com/twmb/franz-go/pkg/kfake/00_produce.go index e2c1c691e1a..9aeb7668130 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/00_produce.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/00_produce.go @@ -2,6 +2,8 @@ package kfake import ( "hash/crc32" + "net" + "strconv" "time" "github.com/twmb/franz-go/pkg/kerr" @@ -14,7 +16,7 @@ import ( // * Multiple batches in one produce // * Compact -func init() { regKey(0, 3, 9) } +func init() { regKey(0, 3, 10) } func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, error) { var ( @@ -46,6 +48,7 @@ func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, er donet(t, errCode) } } + var includeBrokers bool toresp := func() kmsg.Response { for topic, partitions := range tdone { st := kmsg.NewProduceResponseTopic() @@ -53,6 +56,17 @@ func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, er st.Partitions = partitions resp.Topics = append(resp.Topics, st) } + if includeBrokers { + for _, b := range c.bs { + sb := kmsg.NewProduceResponseBroker() + h, p, _ := net.SplitHostPort(b.ln.Addr().String()) + p32, _ := strconv.Atoi(p) + sb.NodeID = b.node + sb.Host = h + sb.Port = int32(p32) + resp.Brokers = append(resp.Brokers, sb) + } + } return resp } @@ -76,7 +90,10 @@ func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, er continue } if pd.leader != b { - donep(rt.Topic, rp, kerr.NotLeaderForPartition.Code) + p := donep(rt.Topic, rp, kerr.NotLeaderForPartition.Code) + p.CurrentLeader.LeaderID = pd.leader.node + p.CurrentLeader.LeaderEpoch = pd.epoch + includeBrokers = true continue } diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/01_fetch.go b/vendor/github.com/twmb/franz-go/pkg/kfake/01_fetch.go index f0fc4f07d71..53f9a8e6ee9 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/01_fetch.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/01_fetch.go @@ -1,6 +1,8 @@ package kfake import ( + "net" + "strconv" "sync" "time" @@ -16,7 +18,7 @@ import ( // * Out of range fetch causes early return // * Raw bytes of batch counts against wait bytes -func init() { regKey(1, 4, 13) } +func init() { regKey(1, 4, 16) } func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, error) { var ( @@ -132,6 +134,21 @@ func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, er return &st.Partitions[len(st.Partitions)-1] } + var includeBrokers bool + defer func() { + if includeBrokers { + for _, b := range c.bs { + sb := kmsg.NewFetchResponseBroker() + h, p, _ := net.SplitHostPort(b.ln.Addr().String()) + p32, _ := strconv.Atoi(p) + sb.NodeID = b.node + sb.Host = h + sb.Port = int32(p32) + resp.Brokers = append(resp.Brokers, sb) + } + } + }() + var batchesAdded int full: for _, rt := range req.Topics { @@ -146,7 +163,10 @@ full: continue } if pd.leader != creq.cc.b { - donep(rt.Topic, rt.TopicID, rp.Partition, kerr.NotLeaderForPartition.Code) + p := donep(rt.Topic, rt.TopicID, rp.Partition, kerr.NotLeaderForPartition.Code) + p.CurrentLeader.LeaderID = pd.leader.node + p.CurrentLeader.LeaderEpoch = pd.epoch + includeBrokers = true continue } sp := donep(rt.Topic, rt.TopicID, rp.Partition, 0) diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/08_offset_commit.go b/vendor/github.com/twmb/franz-go/pkg/kfake/08_offset_commit.go index 692f4cc0063..b82400e0f4f 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/08_offset_commit.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/08_offset_commit.go @@ -1,7 +1,6 @@ package kfake import ( - "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kmsg" ) @@ -9,16 +8,11 @@ func init() { regKey(8, 0, 8) } func (c *Cluster) handleOffsetCommit(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.OffsetCommitRequest) - resp := req.ResponseKind().(*kmsg.OffsetCommitResponse) if err := checkReqVersion(req.Key(), req.Version); err != nil { return nil, err } - if c.groups.handleOffsetCommit(creq) { - return nil, nil - } - - fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code) - return resp, nil + c.groups.handleOffsetCommit(creq) + return nil, nil } diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/19_create_topics.go b/vendor/github.com/twmb/franz-go/pkg/kfake/19_create_topics.go index d2d3185e2e4..1f9d00e620b 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/19_create_topics.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/19_create_topics.go @@ -46,7 +46,6 @@ func (c *Cluster) handleCreateTopics(b *broker, kreq kmsg.Request) (kmsg.Respons uniq[rt.Topic] = struct{}{} } -topics: for _, rt := range req.Topics { if _, ok := c.data.tps.gett(rt.Topic); ok { donet(rt.Topic, kerr.TopicAlreadyExists.Code) @@ -66,10 +65,6 @@ topics: } configs := make(map[string]*string) for _, c := range rt.Configs { - if ok := validateSetTopicConfig(c.Name, c.Value); !ok { - donet(rt.Topic, kerr.InvalidConfig.Code) - continue topics - } configs[c.Name] = c.Value } c.data.mkt(rt.Topic, int(rt.NumPartitions), int(rt.ReplicationFactor), configs) diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/23_offset_for_leader_epoch.go b/vendor/github.com/twmb/franz-go/pkg/kfake/23_offset_for_leader_epoch.go index 6b6d768eece..f531ecf89cf 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/23_offset_for_leader_epoch.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/23_offset_for_leader_epoch.go @@ -74,11 +74,24 @@ func (c *Cluster) handleOffsetForLeaderEpoch(b *broker, kreq kmsg.Request) (kmsg continue } + // If our epoch was bumped before anything was + // produced, return the epoch and a start offset of 0. + if len(pd.batches) == 0 { + sp.LeaderEpoch = pd.epoch + sp.EndOffset = 0 + if rp.LeaderEpoch > pd.epoch { + sp.LeaderEpoch = -1 + sp.EndOffset = -1 + } + continue + } + // What is the largest epoch after the requested epoch? + nextEpoch := rp.LeaderEpoch + 1 idx, _ := sort.Find(len(pd.batches), func(idx int) int { batchEpoch := pd.batches[idx].epoch switch { - case rp.LeaderEpoch <= batchEpoch: + case nextEpoch <= batchEpoch: return -1 default: return 1 @@ -92,19 +105,16 @@ func (c *Cluster) handleOffsetForLeaderEpoch(b *broker, kreq kmsg.Request) (kmsg continue } - // Requested epoch is before the LSO: return the requested - // epoch and the LSO. - if idx == 0 && pd.batches[0].epoch > rp.LeaderEpoch { + // Next epoch is actually the first epoch: return the + // requested epoch and the LSO. + if idx == 0 { sp.LeaderEpoch = rp.LeaderEpoch sp.EndOffset = pd.logStartOffset continue } - // The requested epoch exists and is not the latest - // epoch, we return the end offset being the first - // offset of the next epoch. - sp.LeaderEpoch = pd.batches[idx].epoch - sp.EndOffset = pd.batches[idx+1].FirstOffset + sp.LeaderEpoch = pd.batches[idx-1].epoch + sp.EndOffset = pd.batches[idx].FirstOffset } } return resp, nil diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/cluster.go b/vendor/github.com/twmb/franz-go/pkg/kfake/cluster.go index e1a92969dec..275b14402ad 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/cluster.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/cluster.go @@ -958,6 +958,7 @@ func (c *Cluster) MoveTopicPartition(topic string, partition int32, nodeID int32 return } pd.leader = br + pd.epoch++ }) return err } diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/data.go b/vendor/github.com/twmb/franz-go/pkg/kfake/data.go index 725e282739f..9f5d46c6b86 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/data.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/data.go @@ -5,7 +5,6 @@ import ( "math/rand" "sort" "strconv" - "strings" "time" "github.com/twmb/franz-go/pkg/kmsg" @@ -244,9 +243,6 @@ func (d *data) configs(t string, fn func(k string, v *string, src kmsg.ConfigSou // Unlike Kafka, we validate the value before allowing it to be set. func (c *Cluster) setBrokerConfig(k string, v *string, dry bool) bool { - if !validateSetBrokerConfig(k, v) { - return false - } if dry { return true } @@ -255,9 +251,6 @@ func (c *Cluster) setBrokerConfig(k string, v *string, dry bool) bool { } func (d *data) setTopicConfig(t string, k string, v *string, dry bool) bool { - if !validateSetTopicConfig(k, v) { - return false - } if dry { return true } @@ -268,61 +261,6 @@ func (d *data) setTopicConfig(t string, k string, v *string, dry bool) bool { return true } -func validateSetTopicConfig(k string, v *string) bool { - if _, ok := validTopicConfigs[k]; !ok { - return false - } - fn, ok := validateSetConfig[k] - if !ok { - return false - } - return fn(v) -} - -func validateSetBrokerConfig(k string, v *string) bool { - if _, ok := validBrokerConfigs[k]; !ok { - return false - } - fn, ok := validateSetConfig[k] - if !ok { - return false - } - return fn(v) -} - -// Validation functions for all configs we support setting. Keys not in this -// map are not settable. -var validateSetConfig = map[string]func(*string) bool{ - "cleanup.policy": func(v *string) bool { - if v == nil { - return false - } - s := strings.Split(*v, ",") - for _, policy := range s { - if policy != "delete" && policy != "compact" { - return false - } - } - return true - }, - - "compression.type": staticConfig("uncompressed", "lz4", "zstd", "snappy", "gzip", "producer"), - - "max.message.bytes": numberConfig(0, true, 0, false), - "message.timestamp.type": staticConfig("CreateTime", "LogAppendTime"), - "min.insync.replicas": numberConfig(1, true, 0, false), - "retention.bytes": numberConfig(-1, true, 0, false), - "retention.ms": numberConfig(-1, true, 0, false), - - "default.replication.factor": numberConfig(1, true, 0, false), - "fetch.max.bytes": numberConfig(1024, true, 0, false), - "log.dir": func(v *string) bool { return v != nil }, - "log.message.timestamp.type": staticConfig("CreateTime", "LogAppendTime"), - "log.retention.bytes": numberConfig(-1, true, 0, false), - "log.retention.ms": numberConfig(-1, true, 0, false), - "message.max.bytes": numberConfig(0, true, 0, false), -} - // All valid topic configs we support, as well as the equivalent broker // config if there is one. var validTopicConfigs = map[string]string{ diff --git a/vendor/github.com/twmb/franz-go/pkg/kfake/groups.go b/vendor/github.com/twmb/franz-go/pkg/kfake/groups.go index a0e5a98efdc..bc6934d6dcb 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kfake/groups.go +++ b/vendor/github.com/twmb/franz-go/pkg/kfake/groups.go @@ -102,7 +102,7 @@ func (gs groupState) String() string { func (c *Cluster) coordinator(id string) *broker { gen := c.coordinatorGen.Load() - n := hashString(fmt.Sprint("%d", gen)+"\x00\x00"+id) % uint64(len(c.bs)) + n := hashString(fmt.Sprintf("%d", gen)+"\x00\x00"+id) % uint64(len(c.bs)) return c.bs[n] } @@ -132,6 +132,20 @@ func generateMemberID(clientID string, instanceID *string) string { // GROUPS // //////////// +func (gs *groups) newGroup(name string) *group { + return &group{ + c: gs.c, + gs: gs, + name: name, + members: make(map[string]*groupMember), + pending: make(map[string]*groupMember), + protocols: make(map[string]int), + reqCh: make(chan *clientReq), + controlCh: make(chan func()), + quitCh: make(chan struct{}), + } +} + // handleJoin completely hijacks the incoming request. func (gs *groups) handleJoin(creq *clientReq) { if gs.gs == nil { @@ -141,17 +155,7 @@ func (gs *groups) handleJoin(creq *clientReq) { start: g := gs.gs[req.Group] if g == nil { - g = &group{ - c: gs.c, - gs: gs, - name: req.Group, - members: make(map[string]*groupMember), - pending: make(map[string]*groupMember), - protocols: make(map[string]int), - reqCh: make(chan *clientReq), - controlCh: make(chan func()), - quitCh: make(chan struct{}), - } + g = gs.newGroup(req.Group) waitJoin := make(chan struct{}) gs.gs[req.Group] = g go g.manage(func() { close(waitJoin) }) @@ -194,8 +198,25 @@ func (gs *groups) handleLeave(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.LeaveGroupRequest).Group, creq) } -func (gs *groups) handleOffsetCommit(creq *clientReq) bool { - return gs.handleHijack(creq.kreq.(*kmsg.OffsetCommitRequest).Group, creq) +func (gs *groups) handleOffsetCommit(creq *clientReq) { + if gs.gs == nil { + gs.gs = make(map[string]*group) + } + req := creq.kreq.(*kmsg.OffsetCommitRequest) +start: + g := gs.gs[req.Group] + if g == nil { + g = gs.newGroup(req.Group) + waitCommit := make(chan struct{}) + gs.gs[req.Group] = g + go g.manage(func() { close(waitCommit) }) + defer func() { <-waitCommit }() + } + select { + case g.reqCh <- creq: + case <-g.quitCh: + goto start + } } func (gs *groups) handleOffsetDelete(creq *clientReq) bool { @@ -551,7 +572,9 @@ func (g *group) manage(detachNew func()) { case *kmsg.LeaveGroupRequest: kresp = g.handleLeave(creq) case *kmsg.OffsetCommitRequest: - kresp = g.handleOffsetCommit(creq) + var ok bool + kresp, ok = g.handleOffsetCommit(creq) + firstJoin(ok) case *kmsg.OffsetDeleteRequest: kresp = g.handleOffsetDelete(creq) } @@ -807,34 +830,60 @@ func fillOffsetCommit(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResp } // Handles a commit. -func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse { +func (g *group) handleOffsetCommit(creq *clientReq) (*kmsg.OffsetCommitResponse, bool) { req := creq.kreq.(*kmsg.OffsetCommitRequest) resp := req.ResponseKind().(*kmsg.OffsetCommitResponse) if kerr := g.c.validateGroup(creq, req.Group); kerr != nil { fillOffsetCommit(req, resp, kerr.Code) - return resp + return resp, false } if req.InstanceID != nil { fillOffsetCommit(req, resp, kerr.InvalidGroupID.Code) - return resp - } - m, ok := g.members[req.MemberID] - if !ok { - fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code) - return resp + return resp, false } - if req.Generation != g.generation { - fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code) - return resp + + var m *groupMember + if len(g.members) > 0 { + var ok bool + m, ok = g.members[req.MemberID] + if !ok { + fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code) + return resp, false + } + if req.Generation != g.generation { + fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code) + return resp, false + } + } else { + if req.MemberID != "" { + fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code) + return resp, false + } + if req.Generation != -1 { + fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code) + return resp, false + } + if g.state != groupEmpty { + panic("invalid state: no members, but group not empty") + } } switch g.state { default: fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code) - return resp + return resp, true case groupEmpty: - // for when we support empty group commits + for _, t := range req.Topics { + for _, p := range t.Partitions { + g.commits.set(t.Topic, p.Partition, offsetCommit{ + offset: p.Offset, + leaderEpoch: p.LeaderEpoch, + metadata: p.Metadata, + }) + } + } + fillOffsetCommit(req, resp, 0) case groupPreparingRebalance, groupStable: for _, t := range req.Topics { for _, p := range t.Partitions { @@ -851,7 +900,7 @@ func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse { fillOffsetCommit(req, resp, kerr.RebalanceInProgress.Code) g.updateHeartbeat(m) } - return resp + return resp, true } // Transitions the group to the preparing rebalance state. We first need to diff --git a/vendor/modules.txt b/vendor/modules.txt index f31bbaf436f..73603686977 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1122,8 +1122,8 @@ github.com/twmb/franz-go/pkg/sasl # github.com/twmb/franz-go/pkg/kadm v1.12.0 ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kadm -# github.com/twmb/franz-go/pkg/kfake v0.0.0-20240509060506-c77d58eb5693 -## explicit; go 1.20 +# github.com/twmb/franz-go/pkg/kfake v0.0.0-20240613152313-ee4cbf59292f +## explicit; go 1.21 github.com/twmb/franz-go/pkg/kfake # github.com/twmb/franz-go/pkg/kmsg v1.8.0 ## explicit; go 1.19