From e44dde998ca35181a0d22a1b9002a7214804f249 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 14 Sep 2021 11:45:52 -0600 Subject: [PATCH] add goroutine_per_partition consuming example --- .../README.md | 22 +++ .../goroutine_per_partition_consuming/go.mod | 14 ++ .../goroutine_per_partition_consuming/go.sum | 44 +++++ .../goroutine_per_partition_consuming/main.go | 167 ++++++++++++++++++ 4 files changed, 247 insertions(+) create mode 100644 examples/goroutine_per_partition_consuming/README.md create mode 100644 examples/goroutine_per_partition_consuming/go.mod create mode 100644 examples/goroutine_per_partition_consuming/go.sum create mode 100644 examples/goroutine_per_partition_consuming/main.go diff --git a/examples/goroutine_per_partition_consuming/README.md b/examples/goroutine_per_partition_consuming/README.md new file mode 100644 index 00000000..b1302ed7 --- /dev/null +++ b/examples/goroutine_per_partition_consuming/README.md @@ -0,0 +1,22 @@ +Group consuming, using a goroutine per partition +=== + +This example consumes from a group and starts a goroutine to process each +partition concurrently. This type of code may be useful if processing each +record per partition is slow, such that processing records in a single +`PollFetches` loop is not as fast as you want it to be. + +This is just one example of how to process messages concurrently. A simpler +solution would be just to have a group of record consumers selecting from a +channel, and to send all records down this channel in your `PollFetches` loop. +However, that simple solution does not preserve per-partition ordering. + +## Flags + +`-b` can be specified to override the default localhost:9092 broker to any +comma delimited set of brokers. + +`-t` specifies the topic to consume (required) + +`-g` specifies the group to consume in (required) + diff --git a/examples/goroutine_per_partition_consuming/go.mod b/examples/goroutine_per_partition_consuming/go.mod new file mode 100644 index 00000000..026462d3 --- /dev/null +++ b/examples/goroutine_per_partition_consuming/go.mod @@ -0,0 +1,14 @@ +module goroutine_per_partition_consuming + +go 1.17 + +replace github.com/twmb/franz-go => ../../ + +require github.com/twmb/franz-go v1.0.0 + +require ( + github.com/klauspost/compress v1.13.5 // indirect + github.com/pierrec/lz4/v4 v4.1.8 // indirect + github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914042331-106aef61b693 // indirect + github.com/twmb/go-rbtree v1.0.0 // indirect +) diff --git a/examples/goroutine_per_partition_consuming/go.sum b/examples/goroutine_per_partition_consuming/go.sum new file mode 100644 index 00000000..baeee76d --- /dev/null +++ b/examples/goroutine_per_partition_consuming/go.sum @@ -0,0 +1,44 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4= +github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4= +github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914042331-106aef61b693 h1:5O4u9Lc69/GIOnSIWieuwwpr0hZr7vDOhCp0hXJAqXw= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914042331-106aef61b693/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= +github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/goroutine_per_partition_consuming/main.go b/examples/goroutine_per_partition_consuming/main.go new file mode 100644 index 00000000..df6fe75f --- /dev/null +++ b/examples/goroutine_per_partition_consuming/main.go @@ -0,0 +1,167 @@ +package main + +import ( + "context" + "flag" + "fmt" + "strings" + "sync/atomic" + "time" + + "github.com/twmb/franz-go/pkg/kgo" +) + +var ( + brokers = flag.String("b", "", "comma delimited brokers to consume from") + topic = flag.String("t", "", "topic to consume") + group = flag.String("g", "", "group to consume in") +) + +var ( + globalRecs int64 + globalBytes int64 +) + +type pconsumer struct { + quit chan struct{} + recs chan []*kgo.Record +} + +func (pc *pconsumer) consume(topic string, partition int32) { + fmt.Printf("starting, t %s p %d\n", topic, partition) + defer fmt.Printf("killing, t %s p %d\n", topic, partition) + var ( + nrecs int + nbytes int + ticker = time.NewTicker(time.Second) + ) + defer ticker.Stop() + for { + select { + case <-pc.quit: + return + case recs := <-pc.recs: + nrecs += len(recs) + atomic.AddInt64(&globalRecs, int64(len(recs))) + for _, rec := range recs { + nbytes += len(rec.Value) + atomic.AddInt64(&globalBytes, int64(len(rec.Value))) + } + + case t := <-ticker.C: + fmt.Printf("[%s] t %s p %d consumed %0.2f MiB/s, %0.2fk records/s\n", + t.Format("15:04:05.999"), + topic, + partition, + float64(nbytes)/(1024*1024), + float64(nrecs)/1000, + ) + nrecs, nbytes = 0, 0 + } + } +} + +type splitConsume struct { + consumers map[string]map[int32]pconsumer +} + +func (s *splitConsume) assigned(_ context.Context, cl *kgo.Client, assigned map[string][]int32) { + for topic, partitions := range assigned { + if s.consumers[topic] == nil { + s.consumers[topic] = make(map[int32]pconsumer) + } + for _, partition := range partitions { + pc := pconsumer{ + quit: make(chan struct{}), + recs: make(chan []*kgo.Record, 10), + } + s.consumers[topic][partition] = pc + go pc.consume(topic, partition) + } + } +} + +func (s *splitConsume) lost(_ context.Context, cl *kgo.Client, lost map[string][]int32) { + for topic, partitions := range lost { + ptopics := s.consumers[topic] + for _, partition := range partitions { + pc := ptopics[partition] + delete(ptopics, partition) + if len(ptopics) == 0 { + delete(s.consumers, topic) + } + close(pc.quit) + } + } +} + +func main() { + flag.Parse() + + go func() { + for t := range time.Tick(time.Second) { + fmt.Printf("[%s] globally consumed %0.2f MiB/s, %0.2fk records/s\n", + t.Format("15:04:05.999"), + float64(atomic.SwapInt64(&globalBytes, 0))/(1024*1024), + float64(atomic.SwapInt64(&globalRecs, 0))/1000, + ) + } + }() + + s := &splitConsume{ + consumers: make(map[string]map[int32]pconsumer), + } + + if len(*group) == 0 { + fmt.Println("missing required group") + return + } + if len(*topic) == 0 { + fmt.Println("missing required topic") + return + } + + opts := []kgo.Opt{ + kgo.SeedBrokers(strings.Split(*brokers, ",")...), + kgo.ConsumerGroup(*group), + kgo.ConsumeTopics(*topic), + kgo.OnPartitionsAssigned(s.assigned), + kgo.OnPartitionsRevoked(s.lost), + kgo.OnPartitionsLost(s.lost), + } + + cl, err := kgo.NewClient(opts...) + if err != nil { + panic(err) + } + + s.poll(cl) +} + +func (s *splitConsume) poll(cl *kgo.Client) { + for { + fetches := cl.PollFetches(context.Background()) + if fetches.IsClientClosed() { + return + } + fetches.EachError(func(_ string, _ int32, err error) { + panic(err) + }) + fetches.EachTopic(func(t kgo.FetchTopic) { + tconsumers := s.consumers[t.Topic] + if tconsumers == nil { + return + } + t.EachPartition(func(p kgo.FetchPartition) { + pc, ok := tconsumers[p.Partition] + if !ok { + return + } + select { + case pc.recs <- p.Records: + case <-pc.quit: + } + }) + }) + } +}