From 3b07e2d759304fb45fd2b40f3676a0b223d34733 Mon Sep 17 00:00:00 2001 From: dkuldeep22 Date: Mon, 2 May 2022 19:15:05 +0530 Subject: [PATCH] feat: allow other fluentd metrics apart from retry_count, buffer_queue_length, and buffer_total_queued_size Currently, telegraf creates only three metrics in Gather function and ignore rest of the metrics exposed by fluentd. Fluentd as td-agent exposes following metrics : buffer_queue_length, buffer_total_queued_size, retry_count, emit_records, emit_size, emit_count, write_count, rollback_count, slow_flush_count, flush_time_count, buffer_stage_length, buffer_stage_byte_size, buffer_queue_byte_size, buffer_available_buffer_space_ratios The above metrics are ignored by telegraf. --- plugins/inputs/fluentd/README.md | 17 +++++- plugins/inputs/fluentd/fluentd.go | 83 +++++++++++++++++++++++--- plugins/inputs/fluentd/fluentd_test.go | 83 +++++++++++++++++++++++++- 3 files changed, 170 insertions(+), 13 deletions(-) diff --git a/plugins/inputs/fluentd/README.md b/plugins/inputs/fluentd/README.md index a7947ab2a7397..0686af82a8afb 100644 --- a/plugins/inputs/fluentd/README.md +++ b/plugins/inputs/fluentd/README.md @@ -40,9 +40,20 @@ example configuration with `@id` parameter for http plugin: Fields may vary depending on the plugin type - fluentd - - retry_count (float, unit) - - buffer_queue_length (float, unit) + - retry_count (float, unit) + - buffer_queue_length (float, unit) - buffer_total_queued_size (float, unit) + - rollback_count (float, unit) + - flush_time_count (float, unit) + - slow_flush_count (float, unit) + - emit_count (float, unit) + - emit_records (float, unit) + - emit_size (float, unit) + - write_count (float, unit) + - buffer_stage_length (float, unit) + - buffer_queue_byte_size (float, unit) + - buffer_stage_byte_size (float, unit) + - buffer_available_buffer_space_ratios (float, unit) ## Tags @@ -61,5 +72,5 @@ $ telegraf --config fluentd.conf --input-filter fluentd --test > fluentd,plugin_id=object:820190,plugin_category=input,plugin_type=monitor_agent,host=T440s retry_count=0,buffer_total_queued_size=0,buffer_queue_length=0 1492006105000000000 > fluentd,plugin_id=object:c5e054,plugin_category=output,plugin_type=stdout,host=T440s buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000 > fluentd,plugin_type=s3,host=T440s,plugin_id=object:bd7a90,plugin_category=output buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000 - +> fluentd,plugin_id=output_td, plugin_category=output,plugin_type=tdlog, host=T440s buffer_available_buffer_space_ratios=100,buffer_queue_byte_size=0,buffer_queue_length=0,buffer_stage_byte_size=0,buffer_stage_length=0,buffer_total_queued_size=0,emit_count=0,emit_records=0,flush_time_count=0,retry_count=0,rollback_count=0,slow_flush_count=0,write_count=0 1651474085000000000 ``` diff --git a/plugins/inputs/fluentd/fluentd.go b/plugins/inputs/fluentd/fluentd.go index af1c912d1061c..b90a177593bd5 100644 --- a/plugins/inputs/fluentd/fluentd.go +++ b/plugins/inputs/fluentd/fluentd.go @@ -26,12 +26,23 @@ type endpointInfo struct { } type pluginData struct { - PluginID string `json:"plugin_id"` - PluginType string `json:"type"` - PluginCategory string `json:"plugin_category"` - RetryCount *float64 `json:"retry_count"` - BufferQueueLength *float64 `json:"buffer_queue_length"` - BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"` + PluginID string `json:"plugin_id"` + PluginType string `json:"type"` + PluginCategory string `json:"plugin_category"` + RetryCount *float64 `json:"retry_count"` + BufferQueueLength *float64 `json:"buffer_queue_length"` + BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"` + RollbackCount *float64 `json:"rollback_count"` + EmitRecords *float64 `json:"emit_records"` + EmitSize *float64 `json:"emit_size"` + EmitCount *float64 `json:"emit_count"` + WriteCount *float64 `json:"write_count"` + SlowFlushCount *float64 `json:"slow_flush_count"` + FlushTimeCount *float64 `json:"flush_time_count"` + BufferStageLength *float64 `json:"buffer_stage_length"` + BufferStageByteSize *float64 `json:"buffer_stage_byte_size"` + BufferQueueByteSize *float64 `json:"buffer_queue_byte_size"` + AvailBufferSpaceRatios *float64 `json:"buffer_available_buffer_space_ratios"` } // parse JSON from fluentd Endpoint @@ -121,6 +132,7 @@ func (h *Fluentd) Gather(acc telegraf.Accumulator) error { if p.BufferQueueLength != nil { tmpFields["buffer_queue_length"] = *p.BufferQueueLength } + if p.RetryCount != nil { tmpFields["retry_count"] = *p.RetryCount } @@ -129,7 +141,64 @@ func (h *Fluentd) Gather(acc telegraf.Accumulator) error { tmpFields["buffer_total_queued_size"] = *p.BufferTotalQueuedSize } - if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) { + if p.RollbackCount != nil { + tmpFields["rollback_count"] = *p.RollbackCount + } + + if p.EmitRecords != nil { + tmpFields["emit_records"] = *p.EmitRecords + } + + if p.EmitCount != nil { + tmpFields["emit_count"] = *p.EmitCount + } + + if p.EmitSize != nil { + tmpFields["emit_size"] = *p.EmitSize + } + + if p.WriteCount != nil { + tmpFields["write_count"] = *p.WriteCount + } + + if p.SlowFlushCount != nil { + tmpFields["slow_flush_count"] = *p.SlowFlushCount + } + + if p.FlushTimeCount != nil { + tmpFields["flush_time_count"] = *p.FlushTimeCount + } + + if p.BufferStageLength != nil { + tmpFields["buffer_stage_length"] = *p.BufferStageLength + } + + if p.BufferStageByteSize != nil { + tmpFields["buffer_stage_byte_size"] = *p.BufferStageByteSize + } + + if p.BufferQueueByteSize != nil { + tmpFields["buffer_queue_byte_size"] = *p.BufferQueueByteSize + } + + if p.AvailBufferSpaceRatios != nil { + tmpFields["buffer_available_buffer_space_ratios"] = *p.AvailBufferSpaceRatios + } + + if !((p.BufferQueueLength == nil) && + (p.RetryCount == nil) && + (p.BufferTotalQueuedSize == nil) && + (p.EmitCount == nil) && + (p.EmitRecords == nil) && + (p.EmitSize == nil) && + (p.WriteCount == nil) && + (p.FlushTimeCount == nil) && + (p.SlowFlushCount == nil) && + (p.RollbackCount == nil) && + (p.BufferStageLength == nil) && + (p.BufferStageByteSize == nil) && + (p.BufferQueueByteSize == nil) && + (p.AvailBufferSpaceRatios == nil)) { acc.AddFields(measurement, tmpFields, tmpTags) } } diff --git a/plugins/inputs/fluentd/fluentd_test.go b/plugins/inputs/fluentd/fluentd_test.go index a822c763f1402..444c1869f0c8a 100644 --- a/plugins/inputs/fluentd/fluentd_test.go +++ b/plugins/inputs/fluentd/fluentd_test.go @@ -88,8 +88,53 @@ const sampleJSON = ` }, "output_plugin": true, "buffer_queue_length": 0, + "retry_count": 0, + "buffer_total_queued_size": 0 + }, + { + "plugin_id": "object:output_td_1", + "plugin_category": "output", + "type": "tdlog", + "config": { + "@type": "tdlog", + "@id": "output_td", + "apikey": "xxxxxx", + "auto_create_table": "" + }, + "output_plugin": true, + "buffer_queue_length": 0, "buffer_total_queued_size": 0, - "retry_count": 0 + "retry_count": 0, + "emit_records": 0, + "emit_size": 0, + "emit_count": 0, + "write_count": 0, + "rollback_count": 0, + "slow_flush_count": 0, + "flush_time_count": 0, + "buffer_stage_length": 0, + "buffer_stage_byte_size": 0, + "buffer_queue_byte_size": 0, + "buffer_available_buffer_space_ratios": 0 + }, + { + "plugin_id": "object:output_td_2", + "plugin_category": "output", + "type": "tdlog", + "config": { + "@type": "tdlog", + "@id": "output_td", + "apikey": "xxxxxx", + "auto_create_table": "" + }, + "output_plugin": true, + "buffer_queue_length": 0, + "buffer_total_queued_size": 0, + "retry_count": 0, + "rollback_count": 0, + "emit_records": 0, + "slow_flush_count": 0, + "buffer_available_buffer_space_ratios": 0 } ] } @@ -101,8 +146,10 @@ var ( // {"object:f48698", "dummy", "input", nil, nil, nil}, // {"object:e27138", "dummy", "input", nil, nil, nil}, // {"object:d74060", "monitor_agent", "input", nil, nil, nil}, - {"object:11a5e2c", "stdout", "output", &zero, nil, nil}, - {"object:11237ec", "s3", "output", &zero, &zero, &zero}, + {"object:11a5e2c", "stdout", "output", &zero, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil}, + {"object:11237ec", "s3", "output", &zero, &zero, &zero, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil}, + {"object:output_td_1", "tdlog", "output", &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero}, + {"object:output_td_2", "tdlog", "output", &zero, &zero, &zero, &zero, &zero, nil, nil, nil, &zero, nil, nil, nil, nil, &zero}, } fluentdTest = &Fluentd{ Endpoint: "http://localhost:8081", @@ -111,6 +158,7 @@ var ( func Test_parse(t *testing.T) { t.Log("Testing parser function") + t.Logf("JSON (%s) ", sampleJSON) _, err := parse([]byte(sampleJSON)) if err != nil { @@ -159,4 +207,33 @@ func Test_Gather(t *testing.T) { require.Equal(t, *expectedOutput[1].RetryCount, acc.Metrics[1].Fields["retry_count"]) require.Equal(t, *expectedOutput[1].BufferQueueLength, acc.Metrics[1].Fields["buffer_queue_length"]) require.Equal(t, *expectedOutput[1].BufferTotalQueuedSize, acc.Metrics[1].Fields["buffer_total_queued_size"]) + + require.Equal(t, expectedOutput[2].PluginID, acc.Metrics[2].Tags["plugin_id"]) + require.Equal(t, expectedOutput[2].PluginType, acc.Metrics[2].Tags["plugin_type"]) + require.Equal(t, expectedOutput[2].PluginCategory, acc.Metrics[2].Tags["plugin_category"]) + require.Equal(t, *expectedOutput[2].RetryCount, acc.Metrics[2].Fields["retry_count"]) + require.Equal(t, *expectedOutput[2].BufferQueueLength, acc.Metrics[2].Fields["buffer_queue_length"]) + require.Equal(t, *expectedOutput[2].BufferTotalQueuedSize, acc.Metrics[2].Fields["buffer_total_queued_size"]) + require.Equal(t, *expectedOutput[2].EmitRecords, acc.Metrics[2].Fields["emit_records"]) + require.Equal(t, *expectedOutput[2].EmitSize, acc.Metrics[2].Fields["emit_size"]) + require.Equal(t, *expectedOutput[2].EmitCount, acc.Metrics[2].Fields["emit_count"]) + require.Equal(t, *expectedOutput[2].RollbackCount, acc.Metrics[2].Fields["rollback_count"]) + require.Equal(t, *expectedOutput[2].SlowFlushCount, acc.Metrics[2].Fields["slow_flush_count"]) + require.Equal(t, *expectedOutput[2].WriteCount, acc.Metrics[2].Fields["write_count"]) + require.Equal(t, *expectedOutput[2].FlushTimeCount, acc.Metrics[2].Fields["flush_time_count"]) + require.Equal(t, *expectedOutput[2].BufferStageLength, acc.Metrics[2].Fields["buffer_stage_length"]) + require.Equal(t, *expectedOutput[2].BufferStageByteSize, acc.Metrics[2].Fields["buffer_stage_byte_size"]) + require.Equal(t, *expectedOutput[2].BufferQueueByteSize, acc.Metrics[2].Fields["buffer_queue_byte_size"]) + require.Equal(t, *expectedOutput[2].AvailBufferSpaceRatios, acc.Metrics[2].Fields["buffer_available_buffer_space_ratios"]) + + require.Equal(t, expectedOutput[3].PluginID, acc.Metrics[3].Tags["plugin_id"]) + require.Equal(t, expectedOutput[3].PluginType, acc.Metrics[3].Tags["plugin_type"]) + require.Equal(t, expectedOutput[3].PluginCategory, acc.Metrics[3].Tags["plugin_category"]) + require.Equal(t, *expectedOutput[3].RetryCount, acc.Metrics[3].Fields["retry_count"]) + require.Equal(t, *expectedOutput[3].BufferQueueLength, acc.Metrics[3].Fields["buffer_queue_length"]) + require.Equal(t, *expectedOutput[3].BufferTotalQueuedSize, acc.Metrics[3].Fields["buffer_total_queued_size"]) + require.Equal(t, *expectedOutput[3].EmitRecords, acc.Metrics[3].Fields["emit_records"]) + require.Equal(t, *expectedOutput[3].RollbackCount, acc.Metrics[3].Fields["rollback_count"]) + require.Equal(t, *expectedOutput[3].SlowFlushCount, acc.Metrics[3].Fields["slow_flush_count"]) + require.Equal(t, *expectedOutput[3].AvailBufferSpaceRatios, acc.Metrics[3].Fields["buffer_available_buffer_space_ratios"]) }