From c723d778d0142691809fef83d2eef853a0e96891 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A8le=20Oul=C3=A8s?= Date: Tue, 29 Oct 2024 17:33:32 +0100 Subject: [PATCH] Expose BulkIndexer total flushed bytes metric --- esutil/bulk_indexer.go | 57 ++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index b13077871f..64812726fa 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -86,14 +86,15 @@ type BulkIndexerConfig struct { // BulkIndexerStats represents the indexer statistics. type BulkIndexerStats struct { - NumAdded uint64 - NumFlushed uint64 - NumFailed uint64 - NumIndexed uint64 - NumCreated uint64 - NumUpdated uint64 - NumDeleted uint64 - NumRequests uint64 + NumAdded uint64 + NumFlushed uint64 + NumFailed uint64 + NumIndexed uint64 + NumCreated uint64 + NumUpdated uint64 + NumDeleted uint64 + NumRequests uint64 + FlushedBytes uint64 } // BulkIndexerItem represents an indexer item. @@ -266,14 +267,15 @@ type bulkIndexer struct { } type bulkIndexerStats struct { - numAdded uint64 - numFlushed uint64 - numFailed uint64 - numIndexed uint64 - numCreated uint64 - numUpdated uint64 - numDeleted uint64 - numRequests uint64 + numAdded uint64 + numFlushed uint64 + numFailed uint64 + numIndexed uint64 + numCreated uint64 + numUpdated uint64 + numDeleted uint64 + numRequests uint64 + flushedBytes uint64 } // NewBulkIndexer creates a new bulk indexer. @@ -354,14 +356,15 @@ func (bi *bulkIndexer) Close(ctx context.Context) error { // Stats returns indexer statistics. func (bi *bulkIndexer) Stats() BulkIndexerStats { return BulkIndexerStats{ - NumAdded: atomic.LoadUint64(&bi.stats.numAdded), - NumFlushed: atomic.LoadUint64(&bi.stats.numFlushed), - NumFailed: atomic.LoadUint64(&bi.stats.numFailed), - NumIndexed: atomic.LoadUint64(&bi.stats.numIndexed), - NumCreated: atomic.LoadUint64(&bi.stats.numCreated), - NumUpdated: atomic.LoadUint64(&bi.stats.numUpdated), - NumDeleted: atomic.LoadUint64(&bi.stats.numDeleted), - NumRequests: atomic.LoadUint64(&bi.stats.numRequests), + NumAdded: atomic.LoadUint64(&bi.stats.numAdded), + NumFlushed: atomic.LoadUint64(&bi.stats.numFlushed), + NumFailed: atomic.LoadUint64(&bi.stats.numFailed), + NumIndexed: atomic.LoadUint64(&bi.stats.numIndexed), + NumCreated: atomic.LoadUint64(&bi.stats.numCreated), + NumUpdated: atomic.LoadUint64(&bi.stats.numUpdated), + NumDeleted: atomic.LoadUint64(&bi.stats.numDeleted), + NumRequests: atomic.LoadUint64(&bi.stats.numRequests), + FlushedBytes: atomic.LoadUint64(&bi.stats.flushedBytes), } } @@ -508,7 +511,9 @@ func (w *worker) flushBuffer(ctx context.Context) error { defer func() { w.bi.config.OnFlushEnd(ctx) }() } - if w.buf.Len() < 1 { + bufLen := w.buf.Len() + + if bufLen < 1 { if w.bi.config.DebugLogger != nil { w.bi.config.DebugLogger.Printf("[worker-%03d] Flush: Buffer empty\n", w.id) } @@ -631,6 +636,8 @@ func (w *worker) flushBuffer(ctx context.Context) error { } } + atomic.AddUint64(&w.bi.stats.flushedBytes, uint64(bufLen)) + return err }