Skip to content

Commit

Permalink
add a new metric named batchQueueTime to track the batch time cost (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored May 5, 2023
1 parent 185381d commit b737754
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,29 @@ import (
// by the function and test if it an instance of kafka.WriteErrors in order to
// identify which messages have succeeded or failed, for example:
//
// // Construct a synchronous writer (the default mode).
// w := &kafka.Writer{
// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
// Topic: "topic-A",
// RequiredAcks: kafka.RequireAll,
// }
// // Construct a synchronous writer (the default mode).
// w := &kafka.Writer{
// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
// Topic: "topic-A",
// RequiredAcks: kafka.RequireAll,
// }
//
// ...
// ...
//
// // Passing a context can prevent the operation from blocking indefinitely.
// switch err := w.WriteMessages(ctx, msgs...).(type) {
// case nil:
// case kafka.WriteErrors:
// for i := range msgs {
// if err[i] != nil {
// // handle the error writing msgs[i]
// ...
// // Passing a context can prevent the operation from blocking indefinitely.
// switch err := w.WriteMessages(ctx, msgs...).(type) {
// case nil:
// case kafka.WriteErrors:
// for i := range msgs {
// if err[i] != nil {
// // handle the error writing msgs[i]
// ...
// }
// }
// default:
// // handle other errors
// ...
// }
// default:
// // handle other errors
// ...
// }
//
// In asynchronous mode, the program may configure a completion handler on the
// writer to receive notifications of messages being written to kafka:
Expand Down Expand Up @@ -348,12 +348,13 @@ type WriterStats struct {
Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"`
Errors int64 `metric:"kafka.writer.error.count" type:"counter"`

BatchTime DurationStats `metric:"kafka.writer.batch.seconds"`
WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
Retries int64 `metric:"kafka.writer.retries.count" type:"counter"`
BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
BatchTime DurationStats `metric:"kafka.writer.batch.seconds"`
BatchQueueTime DurationStats `metric:"kafka.writer.batch.queue.seconds"`
WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
Retries int64 `metric:"kafka.writer.retries.count" type:"counter"`
BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`

MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"`
Expand Down Expand Up @@ -398,6 +399,7 @@ type writerStats struct {
errors counter
dialTime summary
batchTime summary
batchQueueTime summary
writeTime summary
waitTime summary
retries counter
Expand Down Expand Up @@ -880,6 +882,7 @@ func (w *Writer) Stats() WriterStats {
Errors: stats.errors.snapshot(),
DialTime: stats.dialTime.snapshotDuration(),
BatchTime: stats.batchTime.snapshotDuration(),
BatchQueueTime: stats.batchQueueTime.snapshotDuration(),
WriteTime: stats.writeTime.snapshotDuration(),
WaitTime: stats.waitTime.snapshotDuration(),
Retries: stats.retries.snapshot(),
Expand Down Expand Up @@ -1088,6 +1091,8 @@ func (ptw *partitionWriter) awaitBatch(batch *writeBatch) {
// having it leak until it expires.
batch.timer.Stop()
}
stats := ptw.w.stats()
stats.batchQueueTime.observe(int64(time.Since(batch.time)))
}

func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
Expand Down

0 comments on commit b737754

Please sign in to comment.