From b7377546e0b02af5daaad25f739ccfa51dbc601e Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Sat, 6 May 2023 00:35:31 +0800 Subject: [PATCH] add a new metric named `batchQueueTime` to track the batch time cost (#1103) --- writer.go | 55 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/writer.go b/writer.go index ce69ab30c..f5d6fc2c5 100644 --- a/writer.go +++ b/writer.go @@ -27,29 +27,29 @@ import ( // by the function and test if it an instance of kafka.WriteErrors in order to // identify which messages have succeeded or failed, for example: // -// // Construct a synchronous writer (the default mode). -// w := &kafka.Writer{ -// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), -// Topic: "topic-A", -// RequiredAcks: kafka.RequireAll, -// } +// // Construct a synchronous writer (the default mode). +// w := &kafka.Writer{ +// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), +// Topic: "topic-A", +// RequiredAcks: kafka.RequireAll, +// } // -// ... +// ... // -// // Passing a context can prevent the operation from blocking indefinitely. -// switch err := w.WriteMessages(ctx, msgs...).(type) { -// case nil: -// case kafka.WriteErrors: -// for i := range msgs { -// if err[i] != nil { -// // handle the error writing msgs[i] -// ... +// // Passing a context can prevent the operation from blocking indefinitely. +// switch err := w.WriteMessages(ctx, msgs...).(type) { +// case nil: +// case kafka.WriteErrors: +// for i := range msgs { +// if err[i] != nil { +// // handle the error writing msgs[i] +// ... +// } // } +// default: +// // handle other errors +// ... // } -// default: -// // handle other errors -// ... -// } // // In asynchronous mode, the program may configure a completion handler on the // writer to receive notifications of messages being written to kafka: @@ -348,12 +348,13 @@ type WriterStats struct { Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"` Errors int64 `metric:"kafka.writer.error.count" type:"counter"` - BatchTime DurationStats `metric:"kafka.writer.batch.seconds"` - WriteTime DurationStats `metric:"kafka.writer.write.seconds"` - WaitTime DurationStats `metric:"kafka.writer.wait.seconds"` - Retries int64 `metric:"kafka.writer.retries.count" type:"counter"` - BatchSize SummaryStats `metric:"kafka.writer.batch.size"` - BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"` + BatchTime DurationStats `metric:"kafka.writer.batch.seconds"` + BatchQueueTime DurationStats `metric:"kafka.writer.batch.queue.seconds"` + WriteTime DurationStats `metric:"kafka.writer.write.seconds"` + WaitTime DurationStats `metric:"kafka.writer.wait.seconds"` + Retries int64 `metric:"kafka.writer.retries.count" type:"counter"` + BatchSize SummaryStats `metric:"kafka.writer.batch.size"` + BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"` MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"` WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"` @@ -398,6 +399,7 @@ type writerStats struct { errors counter dialTime summary batchTime summary + batchQueueTime summary writeTime summary waitTime summary retries counter @@ -880,6 +882,7 @@ func (w *Writer) Stats() WriterStats { Errors: stats.errors.snapshot(), DialTime: stats.dialTime.snapshotDuration(), BatchTime: stats.batchTime.snapshotDuration(), + BatchQueueTime: stats.batchQueueTime.snapshotDuration(), WriteTime: stats.writeTime.snapshotDuration(), WaitTime: stats.waitTime.snapshotDuration(), Retries: stats.retries.snapshot(), @@ -1088,6 +1091,8 @@ func (ptw *partitionWriter) awaitBatch(batch *writeBatch) { // having it leak until it expires. batch.timer.Stop() } + stats := ptw.w.stats() + stats.batchQueueTime.observe(int64(time.Since(batch.time))) } func (ptw *partitionWriter) writeBatch(batch *writeBatch) {