From 604ba12d1ca6c60d8d5ec191ab9e466fb9590689 Mon Sep 17 00:00:00 2001 From: ruflin Date: Thu, 1 Dec 2016 08:37:42 +0100 Subject: [PATCH] Make error fields optional in partition event * Update data.json --- .../module/kafka/partition/_meta/data.json | 3 +- .../module/kafka/partition/partition.go | 34 ++++++++++++------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/metricbeat/module/kafka/partition/_meta/data.json b/metricbeat/module/kafka/partition/_meta/data.json index 44d98e32ccf..a41e118b9a5 100644 --- a/metricbeat/module/kafka/partition/_meta/data.json +++ b/metricbeat/module/kafka/partition/_meta/data.json @@ -15,14 +15,13 @@ "oldest": 0 }, "partition": { - "error": 0, "id": 0, "insync_replica": true, "leader": 0, "replica": 0 }, "topic": { - "name": "test-metricbeat-8760238589576171408" + "name": "test-metricbeat-70692474374989458" } } }, diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 6cb29d6987a..c3b6bd3ba2c 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -130,9 +130,12 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { for _, topic := range response.Topics { evtTopic := common.MapStr{ "name": topic.Name, - "error": common.MapStr{ + } + + if topic.Err != 0 { + evtTopic["error"] = common.MapStr{ "code": topic.Err, - }, + } } for _, partition := range topic.Partitions { @@ -157,19 +160,24 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { continue } + partitionEvent := common.MapStr{ + "id": partition.ID, + "leader": partition.Leader, + "replica": id, + "insync_replica": hasID(id, partition.Isr), + } + + if partition.Err != 0 { + partitionEvent["error"] = common.MapStr{ + "code": partition.Err, + } + } + // create event event := common.MapStr{ - "topic": evtTopic, - "broker": evtBroker, - "partition": common.MapStr{ - "id": partition.ID, - "error": common.MapStr{ - "code": partition.Err, - }, - "leader": partition.Leader, - "replica": id, - "insync_replica": hasID(id, partition.Isr), - }, + "topic": evtTopic, + "broker": evtBroker, + "partition": partitionEvent, "offset": common.MapStr{ "newest": offNewest, "oldest": offOldest,