Skip to content

Commit

Permalink
Merge pull request #383 from twmb/opt_interrogate
Browse files Browse the repository at this point in the history
kgo.Client: add ConfigValue, ConfigValues, UpdateFetchMaxBytes
  • Loading branch information
twmb authored Mar 13, 2023
2 parents 8b9c538 + c4aa4dc commit 24afab3
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 11 deletions.
12 changes: 12 additions & 0 deletions pkg/kgo/atomic_maybe_work.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kgo

import "sync/atomic"

const (
stateUnstarted = iota
stateWorking
Expand Down Expand Up @@ -62,3 +64,13 @@ func (l *workLoop) maybeFinish(again bool) bool {
func (l *workLoop) hardFinish() {
l.state.Store(stateUnstarted)
}

// lazyI32 is used in a few places where we want atomics _sometimes_. Some
// uses do not need to be atomic (notably, setup), and we do not want the
// noCopy guard.
//
// Specifically, this is used for a few int32 settings in the config.
type lazyI32 int32

func (v *lazyI32) store(s int32) { atomic.StoreInt32((*int32)(v), s) }
func (v *lazyI32) load() int32 { return atomic.LoadInt32((*int32)(v)) }
241 changes: 241 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"math/rand"
"net"
"reflect"
"runtime"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -153,6 +154,246 @@ func validateCfg(opts ...Opt) (cfg, []hostport, *compressor, error) {
return cfg, seeds, compressor, nil
}

func namefn(fn any) string {
v := reflect.ValueOf(fn)
if v.Type().Kind() != reflect.Func {
return ""
}
name := runtime.FuncForPC(v.Pointer()).Name()
dot := strings.LastIndexByte(name, '.')
if dot >= 0 {
return name[dot+1:]
}
return name
}

// OptValue returns the value for the given configuration option. If the
// given option does not exist, this returns nil. This function takes either a
// raw Opt, or an Opt function name.
//
// If a configuration option has multiple inputs, this function returns only
// the first input. If the function is a boolean function (such as
// BlockRebalanceOnPoll), this function returns the value of the internal bool.
// Variadic option inputs are returned as a single slice. Options that are
// internally stored as a pointer (ClientID, TransactionalID, and InstanceID)
// are returned as their string input; you can see if the option is internally
// nil by looking at the second value returned from OptValues.
//
// var (
// cl, _ := NewClient(
// InstanceID("foo"),
// ConsumeTopics("foo", "bar"),
// )
// iid = cl.OptValue(InstanceID) // iid is "foo"
// gid = cl.OptValue(ConsumerGroup) // gid is "" since groups are not used
// topics = cl.OptValue("ConsumeTopics") // topics is []string{"foo", "bar"}; string lookup for the option works
// bpoll = cl.OptValue(BlockRebalanceOnPoll) // bpoll is false
// t = cl.OptValue(SessionTimeout) // t is 45s, the internal default
// td = t.(time.Duration) // safe conversion since SessionTimeout's input is a time.Duration
// unk = cl.OptValue("Unknown"), // unk is nil
// )
func (cl *Client) OptValue(opt any) any {
vs := cl.OptValues(opt)
if len(vs) > 0 {
return vs[0]
}
return nil
}

// OptValues returns all values for options. This method is useful for
// options that have multiple inputs (notably, SoftwareNameAndVersion). This is
// also useful for options that are internally stored as a pointer (ClientID,
// TransactionalID, and InstanceID) -- this function will return the string
// value of the option but also whether the option is non-nil. Boolean options
// are returned as a single-element slice with the bool value. Variadic inputs
// are returned as a signle slice. If the input option does not exist, this
// returns nil.
//
// var (
// cl, _ = NewClient(
// InstanceID("foo"),
// ConsumeTopics("foo", "bar"),
// )
// idValues = cl.OptValues(InstanceID) // idValues is []any{"foo", true}
// tValues = cl.OptValues(SessionTimeout) // tValues is []any{45 * time.Second}
// topics = cl.OptValues(ConsumeTopics) // topics is []any{[]string{"foo", "bar"}
// bpoll = cl.OptValues(BlockRebalanceOnPoll) // bpoll is []any{false}
// unknown = cl.OptValues("Unknown") // unknown is nil
// )
func (cl *Client) OptValues(opt any) []any {
name := namefn(opt)
if s, ok := opt.(string); ok {
name = s
}
cfg := &cl.cfg

switch name {
case namefn(ClientID):
if cfg.id != nil {
return []any{*cfg.id, true}
}
return []any{"", false}
case namefn(SoftwareNameAndVersion):
return []any{cfg.softwareName, cfg.softwareVersion}
case namefn(WithLogger):
if cfg.logger != nil {
return []any{cfg.logger.(*wrappedLogger).inner}
}
return []any{nil}
case namefn(RequestTimeoutOverhead):
return []any{cfg.requestTimeoutOverhead}
case namefn(ConnIdleTimeout):
return []any{cfg.connIdleTimeout}
case namefn(Dialer):
return []any{cfg.dialFn}
case namefn(DialTLSConfig):
return []any{cfg.dialTLS}
case namefn(SeedBrokers):
return []any{cfg.seedBrokers}
case namefn(MaxVersions):
return []any{cfg.maxVersions}
case namefn(MinVersions):
return []any{cfg.minVersions}
case namefn(RetryBackoffFn):
return []any{cfg.retryBackoff}
case namefn(RequestRetries):
return []any{cfg.retries}
case namefn(RetryTimeout):
return []any{cfg.retryTimeout(0)}
case namefn(RetryTimeoutFn):
return []any{cfg.retryTimeout}
case namefn(AllowAutoTopicCreation):
return []any{cfg.allowAutoTopicCreation}
case namefn(BrokerMaxWriteBytes):
return []any{cfg.maxBrokerWriteBytes}
case namefn(BrokerMaxReadBytes):
return []any{cfg.maxBrokerReadBytes}
case namefn(MetadataMaxAge):
return []any{cfg.metadataMaxAge}
case namefn(MetadataMinAge):
return []any{cfg.metadataMinAge}
case namefn(SASL):
return []any{cfg.sasls}
case namefn(WithHooks):
return []any{cfg.hooks}
case namefn(ConcurrentTransactionsBackoff):
return []any{cfg.txnBackoff}

case namefn(DefaultProduceTopic):
return []any{cfg.defaultProduceTopic}
case namefn(RequiredAcks):
return []any{cfg.acks}
case namefn(DisableIdempotentWrite):
return []any{cfg.disableIdempotency}
case namefn(MaxProduceRequestsInflightPerBroker):
return []any{cfg.maxProduceInflight}
case namefn(ProducerBatchCompression):
return []any{cfg.compression}
case namefn(ProducerBatchMaxBytes):
return []any{cfg.maxRecordBatchBytes}
case namefn(MaxBufferedRecords):
return []any{cfg.maxBufferedRecords}
case namefn(RecordPartitioner):
return []any{cfg.partitioner}
case namefn(ProduceRequestTimeout):
return []any{cfg.produceTimeout}
case namefn(RecordRetries):
return []any{cfg.recordRetries}
case namefn(UnknownTopicRetries):
return []any{cfg.maxUnknownFailures}
case namefn(StopProducerOnDataLossDetected):
return []any{cfg.stopOnDataLoss}
case namefn(ProducerOnDataLossDetected):
return []any{cfg.onDataLoss}
case namefn(ProducerLinger):
return []any{cfg.linger}
case namefn(ManualFlushing):
return []any{cfg.manualFlushing}
case namefn(RecordDeliveryTimeout):
return []any{cfg.recordTimeout}
case namefn(TransactionalID):
if cfg.txnID != nil {
return []any{cfg.txnID, true}
}
return []any{"", false}
case namefn(TransactionTimeout):
return []any{cfg.txnTimeout}

case namefn(ConsumePartitions):
return []any{cfg.partitions}
case namefn(ConsumePreferringLagFn):
return []any{cfg.preferLagFn}
case namefn(ConsumeRegex):
return []any{cfg.regex}
case namefn(ConsumeResetOffset):
return []any{cfg.resetOffset}
case namefn(ConsumeTopics):
return []any{cfg.topics}
case namefn(DisableFetchSessions):
return []any{cfg.disableFetchSessions}
case namefn(FetchIsolationLevel):
return []any{cfg.isolationLevel}
case namefn(FetchMaxBytes):
return []any{int32(cfg.maxBytes)}
case namefn(FetchMaxPartitionBytes):
return []any{int32(cfg.maxPartBytes)}
case namefn(FetchMaxWait):
return []any{time.Duration(cfg.maxWait) * time.Millisecond}
case namefn(FetchMinBytes):
return []any{cfg.minBytes}
case namefn(KeepControlRecords):
return []any{cfg.keepControl}
case namefn(MaxConcurrentFetches):
return []any{cfg.maxConcurrentFetches}
case namefn(Rack):
return []any{cfg.rack}

case namefn(AdjustFetchOffsetsFn):
return []any{cfg.adjustOffsetsBeforeAssign}
case namefn(AutoCommitCallback):
return []any{cfg.commitCallback}
case namefn(AutoCommitInterval):
return []any{cfg.autocommitInterval}
case namefn(AutoCommitMarks):
return []any{cfg.autocommitMarks}
case namefn(Balancers):
return []any{cfg.balancers}
case namefn(BlockRebalanceOnPoll):
return []any{cfg.blockRebalanceOnPoll}
case namefn(ConsumerGroup):
return []any{cfg.group}
case namefn(DisableAutoCommit):
return []any{cfg.autocommitDisable}
case namefn(GreedyAutoCommit):
return []any{cfg.autocommitGreedy}
case namefn(GroupProtocol):
return []any{cfg.protocol}
case namefn(HeartbeatInterval):
return []any{cfg.heartbeatInterval}
case namefn(InstanceID):
if cfg.instanceID != nil {
return []any{*cfg.instanceID, true}
}
return []any{"", false}
case namefn(OnOffsetsFetched):
return []any{cfg.onFetched}
case namefn(OnPartitionsAssigned):
return []any{cfg.onAssigned}
case namefn(OnPartitionsLost):
return []any{cfg.onLost}
case namefn(OnPartitionsRevoked):
return []any{cfg.onRevoked}
case namefn(RebalanceTimeout):
return []any{cfg.rebalanceTimeout}
case namefn(RequireStableFetchOffsets):
return []any{cfg.requireStable}
case namefn(SessionTimeout):
return []any{cfg.sessionTimeout}
default:
return nil
}
}

// NewClient returns a new Kafka client with the given options or an error if
// the options are invalid. Connections to brokers are lazily created only when
// requests are written to them.
Expand Down
13 changes: 7 additions & 6 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ func (consumerOpt) consumerOpt() {}
func (groupOpt) groupOpt() {}

// A cfg can be written to while initializing a client, and after that it is
// only ever read from. Some areas of initializing may follow options, but all
// initializing is done before NewClient returns.
// (mostly) only ever read from. Some areas can continue to be modified --
// particularly reconfiguring what to consume from -- but most areas are
// static.
type cfg struct {
/////////////////////
// GENERAL SECTION //
Expand Down Expand Up @@ -132,8 +133,8 @@ type cfg struct {

maxWait int32
minBytes int32
maxBytes int32
maxPartBytes int32
maxBytes lazyI32
maxPartBytes lazyI32
resetOffset Offset
isolationLevel int8
keepControl bool
Expand Down Expand Up @@ -1131,7 +1132,7 @@ func FetchMaxWait(wait time.Duration) ConsumerOpt {
// recommended to set this option so that decompression does not eat all of
// your RAM.
func FetchMaxBytes(b int32) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.maxBytes = b }}
return consumerOpt{func(cfg *cfg) { cfg.maxBytes = lazyI32(b) }}
}

// FetchMinBytes sets the minimum amount of bytes a broker will try to send
Expand All @@ -1153,7 +1154,7 @@ func FetchMinBytes(b int32) ConsumerOpt {
//
// This corresponds to the Java max.partition.fetch.bytes setting.
func FetchMaxPartitionBytes(b int32) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.maxPartBytes = b }}
return consumerOpt{func(cfg *cfg) { cfg.maxPartBytes = lazyI32(b) }}
}

// MaxConcurrentFetches sets the maximum number of fetch requests to allow in
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,14 @@ func (cl *Client) AllowRebalance() {
cl.consumer.allowRebalance()
}

// UpdateFetchMaxBytes updates the max bytes that a fetch request will ask for
// and the max partition bytes that a fetch request will ask for each
// partition.
func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) {
cl.cfg.maxBytes.store(maxBytes)
cl.cfg.maxPartBytes.store(maxPartBytes)
}

// PauseFetchTopics sets the client to no longer fetch the given topics and
// returns all currently paused topics. Paused topics persist until resumed.
// You can call this function with no topics to simply receive the list of
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ func (s *source) createReq() *fetchRequest {
req := &fetchRequest{
maxWait: s.cl.cfg.maxWait,
minBytes: s.cl.cfg.minBytes,
maxBytes: s.cl.cfg.maxBytes,
maxPartBytes: s.cl.cfg.maxPartBytes,
maxBytes: s.cl.cfg.maxBytes.load(),
maxPartBytes: s.cl.cfg.maxPartBytes.load(),
rack: s.cl.cfg.rack,
isolationLevel: s.cl.cfg.isolationLevel,
preferLagFn: s.cl.cfg.preferLagFn,
Expand Down
2 changes: 1 addition & 1 deletion plugin/klogr/klogr.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (l *Logger) Level() kgo.LogLevel {

// Log using the underlying logr.Logger. If kgo.LogLevelError is set, keyvals
// will be type checked for an error, and the first one found will be used.
func (l *Logger) Log(level kgo.LogLevel, msg string, keyvals ...interface{}) {
func (l *Logger) Log(level kgo.LogLevel, msg string, keyvals ...any) {
switch level {
case kgo.LogLevelNone:
case kgo.LogLevelError:
Expand Down
5 changes: 3 additions & 2 deletions plugin/klogr/klogr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ func BenchmarkLogNone(b *testing.B) {

var (
msg = "message"
keyvals = []interface{}{
keyvals = []any{
"bool", true,
"string", "str",
"int", 42,
"float", 3.14,
"struct", struct{ A, B int }{13, 37},
"struct",
struct{ A, B int }{13, 37},
"err", fmt.Errorf("error"),
}
)
Expand Down

0 comments on commit 24afab3

Please sign in to comment.