diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 04c458dd35bf..adcc1a901ef9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -176,6 +176,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Print errors that were being omitted in vSphere metricsets. {pull}12816[12816] - Fix redis key metricset dashboard references to index pattern. {pull}13303[13303] - Check if fields in DBInstance is nil in rds metricset. {pull}13294[13294] {issue}13037[13037] +- Fix silent failures in kafka and prometheus module. {pull}13353[13353] {issue}13252[13252] *Packetbeat* diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go index 7638f58c3658..d7e3676d5e5e 100644 --- a/metricbeat/helper/prometheus/prometheus.go +++ b/metricbeat/helper/prometheus/prometheus.go @@ -22,6 +22,7 @@ import ( "io" "net/http" + "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" @@ -83,6 +84,7 @@ func (p *prometheus) GetFamilies() ([]*dto.MetricFamily, error) { if err == io.EOF { break } + return nil, errors.Wrap(err, "decoding of metric family failed") } else { families = append(families, mf) } diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index da3dfacb3c83..ccb5dd72d54b 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -26,6 +26,8 @@ import ( "strings" "time" + "github.com/pkg/errors" + "github.com/Shopify/sarama" "github.com/elastic/beats/libbeat/common" @@ -174,12 +176,12 @@ func (b *Broker) PartitionOffset( req.AddBlock(topic, partition, time, 1) resp, err := b.broker.GetAvailableOffsets(req) if err != nil { - return -1, err + return -1, errors.Wrap(err, "get available offsets failed") } block := resp.GetBlock(topic, partition) if len(block.Offsets) == 0 { - return -1, nil + return -1, errors.Wrap(block.Err, "block offsets is empty") } return block.Offsets[0], nil diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 507df25fb875..4aa5c3a5721a 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -128,7 +128,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { msg := fmt.Errorf("Failed to query kafka partition (%v:%v) offsets: %v", topic.Name, partition.ID, err) - m.Logger().Error(msg) + m.Logger().Warn(msg) r.Error(msg) continue } @@ -194,12 +194,12 @@ func queryOffsetRange( ) (int64, int64, bool, error) { oldest, err := b.PartitionOffset(replicaID, topic, partition, sarama.OffsetOldest) if err != nil { - return -1, -1, false, err + return -1, -1, false, errors.Wrap(err, "failed to get oldest offset") } newest, err := b.PartitionOffset(replicaID, topic, partition, sarama.OffsetNewest) if err != nil { - return -1, -1, false, err + return -1, -1, false, errors.Wrap(err, "failed to get newest offset") } okOld := oldest != -1