Skip to content

Commit

Permalink
kgo: add HookClientClosed and HookProduceRecordPartitioned
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Mar 13, 2023
1 parent 24afab3 commit 461d2ef
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 8 deletions.
7 changes: 7 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,12 @@ func (cl *Client) CloseAllowingRebalance() {
// notification of revoked partitions. If you want to automatically allow
// rebalancing, use CloseAllowingRebalance.
func (cl *Client) Close() {
defer cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookClientClosed); ok {
h.OnClientClosed(cl)
}
})

c := &cl.consumer
if c.g != nil {
cl.LeaveGroup()
Expand All @@ -934,6 +940,7 @@ func (cl *Client) Close() {
c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in Close")
c.mu.Unlock()
}

// After the above, consumers cannot consume anymore. LeaveGroup
// internally assigns noTopicsPartitions, which uses noConsumerSession,
// which prevents loopFetch from starting. Assigning also waits for the
Expand Down
34 changes: 32 additions & 2 deletions pkg/kgo/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ type HookNewClient interface {
OnNewClient(*Client)
}

// HookClientClosed is called in Close or CloseAfterRebalance after a client
// has been closed. This hook can be used to perform final cleanup work.
type HookClientClosed interface {
// OnClientClosed is passed the client that has been closed, after
// all client-internal close cleanup has happened.
OnClientClosed(*Client)
}

//////////////////
// BROKER HOOKS //
//////////////////
Expand Down Expand Up @@ -316,13 +324,33 @@ type HookProduceRecordBuffered interface {
OnProduceRecordBuffered(*Record)
}

// HookProduceRecordPartitioned is called when a record is partitioned and
// internally ready to be flushed.
//
// This hook can be used to create metrics of buffered records per partition,
// and then you can correlate that to partition leaders and determine which
// brokers are having problems.
//
// Note that this hook will slow down high-volume producing and it is
// recommended to only use this temporarily or if you are ok with the
// performance hit.
type HookProduceRecordPartitioned interface {
// OnProduceRecordPartitioned is passed a record that has been
// partitioned and the current broker leader for the partition
// (note that the leader may change if the partition is moved).
//
// This hook is called once a record is queued to be flushed. The
// record's Partition and Timestamp fields are safe to read.
OnProduceRecordPartitioned(*Record, int32)
}

// HookProduceRecordUnbuffered is called just before a record's promise is
// finished; this is effectively a mirror of a record promise.
//
// As an example, if using HookProduceRecordBuffered for a gauge of how many
// record bytes are buffered, this hook can be used to decrement the gauge.
//
// Note that this hook may slow down high-volume producing a bit.
// Note that this hook will slow down high-volume producing a bit.
type HookProduceRecordUnbuffered interface {
// OnProduceRecordUnbuffered is passed a record that is just about to
// have its produce promise called, as well as the error that the
Expand All @@ -339,7 +367,7 @@ type HookProduceRecordUnbuffered interface {
// records buffered, use the client's BufferedFetchRecords method, as it is
// faster.
//
// Note that this hook may slow down high-volume consuming a bit.
// Note that this hook will slow down high-volume consuming a bit.
type HookFetchRecordBuffered interface {
// OnFetchRecordBuffered is passed a record that is now buffered, ready
// to be polled.
Expand Down Expand Up @@ -371,6 +399,7 @@ type HookFetchRecordUnbuffered interface {
func implementsAnyHook(h Hook) bool {
switch h.(type) {
case HookNewClient,
HookClientClosed,
HookBrokerConnect,
HookBrokerDisconnect,
HookBrokerWrite,
Expand All @@ -381,6 +410,7 @@ func implementsAnyHook(h Hook) bool {
HookProduceBatchWritten,
HookFetchBatchRead,
HookProduceRecordBuffered,
HookProduceRecordPartitioned,
HookProduceRecordUnbuffered,
HookFetchRecordBuffered,
HookFetchRecordUnbuffered:
Expand Down
18 changes: 12 additions & 6 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type producer struct {
// Hooks exist behind a pointer because likely they are not used.
// We only take up one byte vs. 6.
hooks *struct {
buffered []HookProduceRecordBuffered
unbuffered []HookProduceRecordUnbuffered
buffered []HookProduceRecordBuffered
partitioned []HookProduceRecordPartitioned
unbuffered []HookProduceRecordUnbuffered
}

hasHookBatchWritten bool
Expand Down Expand Up @@ -108,8 +109,9 @@ func (p *producer) init(cl *Client) {
inithooks := func() {
if p.hooks == nil {
p.hooks = &struct {
buffered []HookProduceRecordBuffered
unbuffered []HookProduceRecordUnbuffered
buffered []HookProduceRecordBuffered
partitioned []HookProduceRecordPartitioned
unbuffered []HookProduceRecordUnbuffered
}{}
}
}
Expand All @@ -119,6 +121,10 @@ func (p *producer) init(cl *Client) {
inithooks()
p.hooks.buffered = append(p.hooks.buffered, h)
}
if h, ok := h.(HookProduceRecordPartitioned); ok {
inithooks()
p.hooks.partitioned = append(p.hooks.partitioned, h)
}
if h, ok := h.(HookProduceRecordUnbuffered); ok {
inithooks()
p.hooks.unbuffered = append(p.hooks.unbuffered, h)
Expand Down Expand Up @@ -375,7 +381,7 @@ func (cl *Client) produce(
}

p := &cl.producer
if p.hooks != nil {
if p.hooks != nil && len(p.hooks.buffered) > 0 {
for _, h := range p.hooks.buffered {
h.OnProduceRecordBuffered(r)
}
Expand Down Expand Up @@ -466,7 +472,7 @@ start:
func (cl *Client) finishRecordPromise(pr promisedRec, err error) {
p := &cl.producer

if p.hooks != nil {
if p.hooks != nil && len(p.hooks.unbuffered) > 0 {
for _, h := range p.hooks.unbuffered {
h.OnProduceRecordUnbuffered(pr.Record, err)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,7 @@ func (recBuf *recBuf) bufferRecord(pr promisedRec, abortOnNewBatch bool) bool {
pr.Timestamp = time.Now()
}
pr.Timestamp = pr.Timestamp.Truncate(time.Millisecond)
pr.Partition = recBuf.partition // set now, for the hook below

if recBuf.purged {
recBuf.cl.producer.promiseRecord(pr, errPurged)
Expand Down Expand Up @@ -1201,6 +1202,13 @@ func (recBuf *recBuf) bufferRecord(pr promisedRec, abortOnNewBatch bool) bool {
}

recBuf.buffered.Add(1)

if recBuf.cl.producer.hooks != nil && len(recBuf.cl.producer.hooks.partitioned) > 0 {
for _, h := range recBuf.cl.producer.hooks.partitioned {
h.OnProduceRecordPartitioned(pr.Record, recBuf.sink.nodeID)
}
}

return true
}

Expand Down

0 comments on commit 461d2ef

Please sign in to comment.