From 95aeef69f99585fd74befdc52c94c69c6c2a7154 Mon Sep 17 00:00:00 2001 From: "Rieb, Elias" Date: Fri, 3 Nov 2023 08:52:24 +0100 Subject: [PATCH] Migrate from franz-go to sarama --- go.mod | 23 +++++-- go.sum | 109 ++++++++++++++++++++++--------- pkg/aukafka/common.go | 57 +++++------------ pkg/aukafka/consumer.go | 120 +++++++++++++++++++++++++---------- pkg/aukafka/logger.go | 33 ---------- pkg/aukafka/producer.go | 63 ------------------ pkg/aukafka/producer_sync.go | 73 +++++++++++++++++++++ 7 files changed, 273 insertions(+), 205 deletions(-) delete mode 100644 pkg/aukafka/logger.go delete mode 100644 pkg/aukafka/producer.go create mode 100644 pkg/aukafka/producer_sync.go diff --git a/go.mod b/go.mod index 22a863e..84ee553 100644 --- a/go.mod +++ b/go.mod @@ -3,19 +3,32 @@ module github.com/Roshick/go-autumn-kafka go 1.21.0 require ( + github.com/IBM/sarama v1.42.0 github.com/StephanHCB/go-autumn-config-api v0.2.1 github.com/StephanHCB/go-autumn-config-env v0.2.2 github.com/StephanHCB/go-autumn-logging v0.3.0 - github.com/twmb/franz-go v1.15.0 ) require ( - github.com/StephanHCB/go-autumn-acorn-registry v0.3.1 // indirect - github.com/StephanHCB/go-autumn-logging-zerolog v0.5.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/klauspost/compress v1.16.7 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect - github.com/rs/zerolog v1.31.0 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.6.1 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/rogpeppe/go-internal v1.11.0 // indirect golang.org/x/crypto v0.14.0 // indirect + golang.org/x/net v0.17.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 235ff89..abe9477 100644 --- a/go.sum +++ b/go.sum @@ -1,58 +1,109 @@ -github.com/StephanHCB/go-autumn-acorn-registry v0.3.1 h1:rAJlEsrSTJArQZHOt4Q6Gkc4NgL2ObSQGvxW0chiRiM= -github.com/StephanHCB/go-autumn-acorn-registry v0.3.1/go.mod h1:KB7wPWOEy2n8VGNw75H4w7wBSWSrgwNNJNmet/F+9RI= +github.com/IBM/sarama v1.42.0 h1:E5Kp9D5iIxI4b0Y0DYdiXil72v3kHIZMG8qTfWXVh2s= +github.com/IBM/sarama v1.42.0/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/StephanHCB/go-autumn-config-api v0.2.1 h1:t2EeTsdFpLM2xH2T7QFQtbFYI8hG5I9S+Q2o3KT6mlk= github.com/StephanHCB/go-autumn-config-api v0.2.1/go.mod h1:6nJBwuT1uURHApOSFr6Rw+naK2YkO+sAduwEWZ0qsSU= github.com/StephanHCB/go-autumn-config-env v0.2.2 h1:DWal2O4gKlNsrKnq8V5nRNUevqkH2+xGT/BwCyqrV3I= github.com/StephanHCB/go-autumn-config-env v0.2.2/go.mod h1:aTwlB8AVSnqGt4537uVHtZs9/3NEFYQwuVTRaNutgwg= github.com/StephanHCB/go-autumn-logging v0.3.0 h1:G0zs8xoth8i8mOeoFgG3Dvk6dIY9dPPJ7wkm6mjaPyY= github.com/StephanHCB/go-autumn-logging v0.3.0/go.mod h1:dPABYdECU3XrFib03uXbQFVLftUP5c4YaKSineiw37U= -github.com/StephanHCB/go-autumn-logging-zerolog v0.5.0 h1:UnSgOuwwZH6vJigW6aBtUgXWd14jvMJ0hmFlD9GHIMo= -github.com/StephanHCB/go-autumn-logging-zerolog v0.5.0/go.mod h1:Hspu94dHAKtgjMAkqtjMDZJnVv0VjVjyq6uKHI0Q4R8= -github.com/StephanHCB/go-backend-service-common v0.6.0 h1:PWqGdUq7TPqI+rPwgHB+pcXZUWQOCMBMS4wFWoWXvvc= -github.com/StephanHCB/go-backend-service-common v0.6.0/go.mod h1:/S3+UrSczKX6Ypk35G5xz1NRjnH6SifKLlfsFrveAmc= -github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU= -github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= -github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/twmb/franz-go v1.15.0 h1:bw5n1COKJzWpkCXG/kMtHrurcS9HSWV6e3If5CUdc+M= -github.com/twmb/franz-go v1.15.0/go.mod h1:nMAvTC2kHtK+ceaSHeHm4dlxC78389M/1DjpOswEgu4= -github.com/twmb/franz-go/pkg/kmsg v1.6.1 h1:tm6hXPv5antMHLasTfKv9R+X03AjHSkSkXhQo2c5ALM= -github.com/twmb/franz-go/pkg/kmsg v1.6.1/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/aukafka/common.go b/pkg/aukafka/common.go index 0d18ae7..0838935 100644 --- a/pkg/aukafka/common.go +++ b/pkg/aukafka/common.go @@ -1,47 +1,20 @@ package aukafka -import ( - "crypto/tls" - "net" - "time" +import "github.com/IBM/sarama" - "github.com/twmb/franz-go/pkg/kgo" - "github.com/twmb/franz-go/pkg/sasl" - "github.com/twmb/franz-go/pkg/sasl/plain" - "github.com/twmb/franz-go/pkg/sasl/scram" -) - -func defaultTopicOptions(logKey string, config TopicConfig) []kgo.Opt { - tlsDialer := &tls.Dialer{ - NetDialer: &net.Dialer{Timeout: 10 * time.Second}, - Config: &tls.Config{InsecureSkipVerify: true}, - } - - var mechanism sasl.Mechanism - switch config.AuthType { - case "sha256": - mechanism = scram.Auth{ - User: config.Username, - Pass: config.Password, - }.AsSha256Mechanism() - case "plain": - fallthrough - default: - mechanism = plain.Auth{ - User: config.Username, - Pass: config.Password, - }.AsMechanism() +func mergeConfigWithPreset( + topicConfig TopicConfig, + configPreset *sarama.Config, +) (*sarama.Config, error) { + var clientConfig *sarama.Config + if configPreset != nil { + clientConfig = configPreset + } else { + clientConfig = sarama.NewConfig() } - - opts := []kgo.Opt{ - kgo.SeedBrokers(config.Brokers...), - kgo.SASL(mechanism), - kgo.Dialer(tlsDialer.DialContext), - kgo.SessionTimeout(30 * time.Second), - kgo.RequestRetries(2), - kgo.RetryTimeout(5 * time.Second), - kgo.WithLogger(Logger{Key: logKey}), - } - - return opts + clientConfig.Net.SASL.User = topicConfig.Username + clientConfig.Net.SASL.Password = topicConfig.Password + clientConfig.Net.SASL.Enable = true + clientConfig.Net.SASL.Mechanism = sarama.SASLMechanism(topicConfig.AuthType) + return clientConfig, nil } diff --git a/pkg/aukafka/consumer.go b/pkg/aukafka/consumer.go index bb15348..c6b2fe3 100644 --- a/pkg/aukafka/consumer.go +++ b/pkg/aukafka/consumer.go @@ -3,36 +3,43 @@ package aukafka import ( "context" "encoding/json" + "errors" "fmt" "time" + "github.com/IBM/sarama" aulogging "github.com/StephanHCB/go-autumn-logging" - "github.com/twmb/franz-go/pkg/kgo" ) type Consumer[E any] struct { - client *kgo.Client + client sarama.ConsumerGroup + topicConfig TopicConfig receiveCallback func(ctx context.Context, key *string, event *E, timestamp time.Time) error } func CreateConsumer[E any]( ctx context.Context, - config TopicConfig, + topicConfig TopicConfig, receiveCallback func(context.Context, *string, *E, time.Time) error, - customOpts ...kgo.Opt, + configPreset *sarama.Config, ) (*Consumer[E], error) { - opts := defaultTopicOptions(fmt.Sprintf("%s consumer", config.Topic), config) - opts = append(opts, customOpts...) - opts = append(opts, kgo.ConsumerGroup(*config.ConsumerGroup), kgo.ConsumeTopics(config.Topic)) + if topicConfig.ConsumerGroup == nil || *topicConfig.ConsumerGroup == "" { + return nil, fmt.Errorf("failed to create consumer group client: consumer group is missing or empty") + } - client, err := kgo.NewClient(opts...) + clientConfig, err := mergeConfigWithPreset(topicConfig, configPreset) if err != nil { - aulogging.Logger.Ctx(ctx).Error().WithErr(err).Printf("failed to connect to topic %s", config.Topic) - return nil, err + return nil, fmt.Errorf("failed to create consumer group client: %s", err.Error()) + } + + client, err := sarama.NewConsumerGroup(topicConfig.Brokers, *topicConfig.ConsumerGroup, clientConfig) + if err != nil { + return nil, fmt.Errorf("failed to create consumer group client: %s", err.Error()) } consumer := Consumer[E]{ client: client, + topicConfig: topicConfig, receiveCallback: receiveCallback, } @@ -40,40 +47,87 @@ func CreateConsumer[E any]( return &consumer, nil } -func (c *Consumer[E]) Stop() { - c.client.Close() +func (c *Consumer[E]) Close(ctx context.Context) { + err := c.client.Close() + if err != nil { + aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Print("failed to close kafka consumer") + } } -func (c *Consumer[E]) run( - ctx context.Context, -) { +func (c *Consumer[E]) run(ctx context.Context) { + defer func() { + r := recover() + if err, ok := r.(error); ok { + aulogging.Logger.Ctx(ctx).Error().WithErr(err).Print("caught panic in kafka consumer") + } + }() + for { - fetches := c.client.PollFetches(ctx) - if fetches.IsClientClosed() { - aulogging.Logger.NoCtx().Info().Print("kafka client closed, stopping consumer") - return + // `Consume` should be called inside an infinite loop, when a + // server-side rebalance happens, the consumer session will need to be + // recreated to get the new claims + if err := c.client.Consume(ctx, []string{c.topicConfig.Topic}, c); err != nil { + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + aulogging.Logger.Ctx(ctx).Error().WithErr(err).Print("kafka consumer returned with error") } - aulogging.Logger.NoCtx().Debug().Printf("received %s fetches", len(fetches)) + } +} - fetches.EachError(func(t string, p int32, err error) { - aulogging.Logger.NoCtx().Error().WithErr(err).Printf("fetch error occurred for partition %d of topic %s", p, t) - }) +// Setup is run at the beginning of a new session, before ConsumeClaim +func (c *Consumer[E]) Setup(sarama.ConsumerGroupSession) error { + return nil +} - fetches.EachRecord(func(record *kgo.Record) { +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +func (c *Consumer[E]) Cleanup(sarama.ConsumerGroupSession) error { + return nil +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). +// Once the Messages() channel is closed, the Handler must finish its processing +// loop and exit. +func (c *Consumer[E]) ConsumeClaim( + session sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim, +) error { + // NOTE: + // Do not move the code below to a goroutine. + // The `ConsumeClaim` itself is called within a goroutine, see: + // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29 + for { + select { + case message, ok := <-claim.Messages(): + if !ok { + aulogging.Logger.Ctx(session.Context()).Info().Print("message channel was closed") + return nil + } + timestamp := message.Timestamp.UTC() key := new(string) - if record.Key != nil { - *key = string(record.Key) + if message.Key != nil { + *key = string(message.Key) } event := new(E) - if record.Value != nil { - if err := json.Unmarshal(record.Value, &event); err != nil { - aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Printf("failed to unmarshal event") - return + if message.Value != nil { + if err := json.Unmarshal(message.Value, &event); err != nil { + aulogging.Logger.Ctx(session.Context()).Warn().WithErr(err). + Printf("failed to unmarshal event key = %s, value = %s, timestamp = %v, topic = %s", + key, event, timestamp, message.Topic) } } - if err := c.receiveCallback(ctx, key, event, record.Timestamp); err != nil { - aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Printf("failed to perform event callback") + if err := c.receiveCallback(session.Context(), key, event, timestamp); err != nil { + aulogging.Logger.Ctx(session.Context()).Warn().WithErr(err). + Printf("failed to perform callback on event key = %s, value = %s, timestamp = %v, topic = %s", + key, event, timestamp, message.Topic) + } else { + session.MarkMessage(message, "") } - }) + // Should return when `session.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/IBM/sarama/issues/1192 + case <-session.Context().Done(): + return nil + } } } diff --git a/pkg/aukafka/logger.go b/pkg/aukafka/logger.go deleted file mode 100644 index 8a8c070..0000000 --- a/pkg/aukafka/logger.go +++ /dev/null @@ -1,33 +0,0 @@ -package aukafka - -import ( - aulogging "github.com/StephanHCB/go-autumn-logging" - "github.com/twmb/franz-go/pkg/kgo" -) - -type Logger struct { - Key string -} - -func (l Logger) Level() kgo.LogLevel { - return kgo.LogLevelDebug // set to Debug to see all output -} - -func (l Logger) Log(level kgo.LogLevel, msg string, _ ...any) { - switch level { - case kgo.LogLevelError: - aulogging.Logger.NoCtx().Error().Printf("kgo %s error: %s", l.Key, msg) - return - case kgo.LogLevelWarn: - aulogging.Logger.NoCtx().Warn().Printf("kgo %s warning: %s", l.Key, msg) - return - case kgo.LogLevelInfo: - aulogging.Logger.NoCtx().Info().Printf("kgo %s info: %s", l.Key, msg) - return - case kgo.LogLevelDebug: - aulogging.Logger.NoCtx().Debug().Printf("kgo %s debug: %s", l.Key, msg) - return - default: - return - } -} diff --git a/pkg/aukafka/producer.go b/pkg/aukafka/producer.go deleted file mode 100644 index b23be6f..0000000 --- a/pkg/aukafka/producer.go +++ /dev/null @@ -1,63 +0,0 @@ -package aukafka - -import ( - "context" - "encoding/json" - "fmt" - - aulogging "github.com/StephanHCB/go-autumn-logging" - "github.com/twmb/franz-go/pkg/kgo" -) - -type Producer[V any] struct { - client *kgo.Client -} - -func CreateProducer[V any]( - ctx context.Context, - config TopicConfig, - customOpts ...kgo.Opt, -) (*Producer[V], error) { - opts := defaultTopicOptions(fmt.Sprintf("%s producer", config.Topic), config) - opts = append(opts, customOpts...) - opts = append(opts, kgo.DefaultProduceTopic(config.Topic), kgo.ProducerBatchCompression(kgo.NoCompression())) - - client, err := kgo.NewClient(opts...) - if err != nil { - aulogging.Logger.Ctx(ctx).Error().WithErr(err).Printf("failed to connect to topic %s", config.Topic) - return nil, err - } - - return &Producer[V]{ - client: client, - }, err -} - -func (p *Producer[V]) ProduceSync( - ctx context.Context, - key *string, - value *V, -) error { - var keyBytes []byte - if key != nil { - keyBytes = []byte(*key) - } - var eventBytes []byte - if value != nil { - var err error - eventBytes, err = json.Marshal(*value) - if err != nil { - return err - } - } - record := &kgo.Record{ - Key: keyBytes, - Value: eventBytes, - } - result := p.client.ProduceSync(ctx, record) - return result.FirstErr() -} - -func (p *Producer[E]) Close() { - p.client.Close() -} diff --git a/pkg/aukafka/producer_sync.go b/pkg/aukafka/producer_sync.go new file mode 100644 index 0000000..5e56e33 --- /dev/null +++ b/pkg/aukafka/producer_sync.go @@ -0,0 +1,73 @@ +package aukafka + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/IBM/sarama" + aulogging "github.com/StephanHCB/go-autumn-logging" +) + +type SyncProducer[V any] struct { + client sarama.SyncProducer + topic string +} + +func CreateSyncProducer[V any]( + _ context.Context, + topicConfig TopicConfig, + configPreset *sarama.Config, +) (*SyncProducer[V], error) { + clientConfig, err := mergeConfigWithPreset(topicConfig, configPreset) + if err != nil { + return nil, fmt.Errorf("failed to create sync producer client: %s", err.Error()) + } + clientConfig.Producer.Return.Successes = true + + client, err := sarama.NewSyncProducer(topicConfig.Brokers, clientConfig) + if err != nil { + return nil, fmt.Errorf("failed to create sync producer client: %s", err.Error()) + } + + return &SyncProducer[V]{ + client: client, + topic: topicConfig.Topic, + }, err +} + +func (p *SyncProducer[V]) Produce( + _ context.Context, + key *string, + value *V, +) error { + var keyBytes []byte + if key != nil { + keyBytes = []byte(*key) + } + var valueBytes []byte + if value != nil { + var err error + valueBytes, err = json.Marshal(*value) + if err != nil { + return err + } + } + message := sarama.ProducerMessage{ + Topic: p.topic, + Key: sarama.ByteEncoder(keyBytes), + Value: sarama.ByteEncoder(valueBytes), + } + + if _, _, err := p.client.SendMessage(&message); err != nil { + return err + } + return nil +} + +func (p *SyncProducer[E]) Close(ctx context.Context) { + err := p.client.Close() + if err != nil { + aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Print("failed to close kafka producer") + } +}