Skip to content

Commit

Permalink
Merge pull request #169 from stefanotorresi/feature/instrumented-coll…
Browse files Browse the repository at this point in the history
…ectors

Instrumented collectors
  • Loading branch information
stefanotorresi authored Jul 13, 2020
2 parents bcaaf82 + d5af7f4 commit 0af48bf
Show file tree
Hide file tree
Showing 18 changed files with 518 additions and 57 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ REPOSITORY ?= clusterlabs/ha_cluster_exporter
# the Go archs we crosscompile to
ARCHS ?= amd64 arm64 ppc64le s390x

default: clean download mod-tidy fmt vet-check test build
default: clean download mod-tidy generate fmt vet-check test build

download:
go mod download
Expand Down Expand Up @@ -48,6 +48,9 @@ mod-tidy:
fmt-check:
.ci/go_lint.sh

generate:
go generate ./...

test: download
go test -v ./...

Expand Down Expand Up @@ -97,5 +100,5 @@ dashboards-obs-commit: dashboards-obs-workdir
cd build/obs/grafana-ha-cluster-dashboards; osc commit -m "Update from git rev $(REVISION)"

.PHONY: $(ARCHS) build build-all checks clean coverage dashboards-obs-commit dashboards-obs-workdir default download \
exporter-obs-changelog exporter-obs-commit exporter-obs-workdir fmt fmt-check install mod-tidy static-checks \
test vet-check
exporter-obs-changelog exporter-obs-commit exporter-obs-workdir fmt fmt-check generate install mod-tidy \
static-checks test vet-check
24 changes: 17 additions & 7 deletions collector/corosync/corosync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import (
"github.com/ClusterLabs/ha_cluster_exporter/collector"
)

const subsystem = "corosync"

func NewCollector(cfgToolPath string, quorumToolPath string) (*corosyncCollector, error) {
err := collector.CheckExecutables(cfgToolPath, quorumToolPath)
if err != nil {
return nil, errors.Wrap(err, "could not initialize Corosync collector")
return nil, errors.Wrapf(err, "could not initialize '%s' collector", subsystem)
}

c := &corosyncCollector{
collector.NewDefaultCollector("corosync"),
collector.NewDefaultCollector(subsystem),
cfgToolPath,
quorumToolPath,
NewParser(),
Expand All @@ -35,27 +37,35 @@ type corosyncCollector struct {
collector.DefaultCollector
cfgToolPath string
quorumToolPath string
cfgToolParser Parser
parser Parser
}

func (c *corosyncCollector) Collect(ch chan<- prometheus.Metric) {
func (c *corosyncCollector) CollectWithError(ch chan<- prometheus.Metric) error {
log.Debugln("Collecting corosync metrics...")

// We suppress the exec errors because if any interface is faulty the tools will exit with code 1, but we still want to parse the output.
cfgToolOutput, _ := exec.Command(c.cfgToolPath, "-s").Output()
quorumToolOutput, _ := exec.Command(c.quorumToolPath).Output()

status, err := c.cfgToolParser.Parse(cfgToolOutput, quorumToolOutput)
status, err := c.parser.Parse(cfgToolOutput, quorumToolOutput)
if err != nil {
log.Warnf("Corosync Collector scrape failed: %s", err)
return
return errors.Wrap(err, "corosync parser error")
}

c.collectRings(status, ch)
c.collectRingErrors(status, ch)
c.collectQuorate(status, ch)
c.collectQuorumVotes(status, ch)
c.collectMemberVotes(status, ch)

return nil
}

func (c *corosyncCollector) Collect(ch chan<- prometheus.Metric) {
err := c.CollectWithError(ch)
if err != nil {
log.Warnf("'%s' collector scrape failed: %s", c.GetSubsystem(), err)
}
}

func (c *corosyncCollector) collectQuorumVotes(status *Status, ch chan<- prometheus.Metric) {
Expand Down
8 changes: 8 additions & 0 deletions collector/default_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (

const NAMESPACE = "ha_cluster"

type SubsystemCollector interface {
GetSubsystem() string
}

type DefaultCollector struct {
subsystem string
descriptors map[string]*prometheus.Desc
Expand Down Expand Up @@ -48,6 +52,10 @@ func (c *DefaultCollector) Describe(ch chan<- *prometheus.Desc) {
}
}

func (c *DefaultCollector) GetSubsystem() string {
return c.subsystem
}

func (c *DefaultCollector) MakeGaugeMetric(name string, value float64, labelValues ...string) prometheus.Metric {
return c.makeMetric(name, value, prometheus.GaugeValue, labelValues...)
}
Expand Down
22 changes: 15 additions & 7 deletions collector/drbd/drbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/ClusterLabs/ha_cluster_exporter/collector"
)

const subsystem = "drbd"

// drbdStatus is for parsing relevant data we want to convert to metrics
type drbdStatus struct {
Name string `json:"name"`
Expand Down Expand Up @@ -48,11 +50,11 @@ type drbdStatus struct {
func NewCollector(drbdSetupPath string, drbdSplitBrainPath string) (*drbdCollector, error) {
err := collector.CheckExecutables(drbdSetupPath)
if err != nil {
return nil, errors.Wrap(err, "could not initialize DRBD collector")
return nil, errors.Wrapf(err, "could not initialize '%s' collector", subsystem)
}

c := &drbdCollector{
collector.NewDefaultCollector("drbd"),
collector.NewDefaultCollector(subsystem),
drbdSetupPath,
drbdSplitBrainPath,
}
Expand Down Expand Up @@ -82,21 +84,19 @@ type drbdCollector struct {
drbdSplitBrainPath string
}

func (c *drbdCollector) Collect(ch chan<- prometheus.Metric) {
func (c *drbdCollector) CollectWithError(ch chan<- prometheus.Metric) error {
log.Debugln("Collecting DRBD metrics...")

c.recordDrbdSplitBrainMetric(ch)

drbdStatusRaw, err := exec.Command(c.drbdsetupPath, "status", "--json").Output()
if err != nil {
log.Warnf("DRBD Collector scrape failed: %s", err)
return
return errors.Wrap(err, "drbdsetup command failed")
}
// populate structs and parse relevant info we will expose via metrics
drbdDev, err := parseDrbdStatus(drbdStatusRaw)
if err != nil {
log.Warnf("DRBD Collector scrape failed: %s", err)
return
return errors.Wrap(err, "could not parse drbdsetup status output")
}

for _, resource := range drbdDev {
Expand Down Expand Up @@ -137,6 +137,14 @@ func (c *drbdCollector) Collect(ch chan<- prometheus.Metric) {
}
}

return nil
}

func (c *drbdCollector) Collect(ch chan<- prometheus.Metric) {
err := c.CollectWithError(ch)
if err != nil {
log.Warnf("'%s' collector scrape failed: %s", c.GetSubsystem(), err)
}
}

func parseDrbdStatus(statusRaw []byte) ([]drbdStatus, error) {
Expand Down
72 changes: 72 additions & 0 deletions collector/instrumented_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package collector

import (
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

"github.com/ClusterLabs/ha_cluster_exporter/internal/clock"
)

//go:generate mockgen -destination ../test/mock_collector/instrumented_collector.go github.com/ClusterLabs/ha_cluster_exporter/collector InstrumentableCollector

// describes a collector that can return errors from collection cycles,
// instead of the default Prometheus one, which has void Collect returns
type InstrumentableCollector interface {
prometheus.Collector
SubsystemCollector
CollectWithError(ch chan<- prometheus.Metric) error
}

type InstrumentedCollector struct {
collector InstrumentableCollector
Clock clock.Clock
scrapeDurationDesc *prometheus.Desc
scrapeSuccessDesc *prometheus.Desc
}

func NewInstrumentedCollector(collector InstrumentableCollector) *InstrumentedCollector {
return &InstrumentedCollector{
collector,
&clock.SystemClock{},
prometheus.NewDesc(
prometheus.BuildFQName(NAMESPACE, "scrape", "duration_seconds"),
"Duration of a collector scrape.",
nil,
prometheus.Labels{
"collector": collector.GetSubsystem(),
},
),
prometheus.NewDesc(
prometheus.BuildFQName(NAMESPACE, "scrape", "success"),
"Whether a collector succeeded.",
nil,
prometheus.Labels{
"collector": collector.GetSubsystem(),
},
),
}
}

func (ic *InstrumentedCollector) Collect(ch chan<- prometheus.Metric) {
var success float64
begin := ic.Clock.Now()
err := ic.collector.CollectWithError(ch)
duration := ic.Clock.Since(begin)
if err == nil {
success = 1
} else {
log.Warnf("'%s' collector scrape failed: %s", ic.collector.GetSubsystem(), err)
}
ch <- prometheus.MustNewConstMetric(ic.scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds())
ch <- prometheus.MustNewConstMetric(ic.scrapeSuccessDesc, prometheus.GaugeValue, success)
}

func (ic *InstrumentedCollector) Describe(ch chan<- *prometheus.Desc) {
ic.collector.Describe(ch)
ch <- ic.scrapeDurationDesc
ch <- ic.scrapeSuccessDesc
}

func (ic *InstrumentedCollector) GetSubsystem() string {
return ic.collector.GetSubsystem()
}
71 changes: 71 additions & 0 deletions collector/instrumented_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package collector

import (
"errors"
"io/ioutil"
"strings"
"testing"

"github.com/golang/mock/gomock"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sirupsen/logrus"
testlog "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"

"github.com/ClusterLabs/ha_cluster_exporter/internal/clock"
"github.com/ClusterLabs/ha_cluster_exporter/test/mock_collector"
)

func init() {
logrus.SetOutput(ioutil.Discard)
}

func TestInstrumentedCollector(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockCollector := mock_collector.NewMockInstrumentableCollector(ctrl)
mockCollector.EXPECT().GetSubsystem().Return("mock_collector").AnyTimes()
mockCollector.EXPECT().Describe(gomock.Any())
mockCollector.EXPECT().CollectWithError(gomock.Any())

SUT := NewInstrumentedCollector(mockCollector)
SUT.Clock = &clock.StoppedClock{}

metrics := `# HELP ha_cluster_scrape_duration_seconds Duration of a collector scrape.
# TYPE ha_cluster_scrape_duration_seconds gauge
ha_cluster_scrape_duration_seconds{collector="mock_collector"} 1.234
# HELP ha_cluster_scrape_success Whether a collector succeeded.
# TYPE ha_cluster_scrape_success gauge
ha_cluster_scrape_success{collector="mock_collector"} 1
`

err := testutil.CollectAndCompare(SUT, strings.NewReader(metrics))
assert.NoError(t, err)
}

func TestInstrumentedCollectorScrapeFailure(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

logHook := testlog.NewGlobal()
defer logHook.Reset()

mockCollector := mock_collector.NewMockInstrumentableCollector(ctrl)
mockCollector.EXPECT().GetSubsystem().Return("mock_collector").AnyTimes()
mockCollector.EXPECT().Describe(gomock.Any())
mockCollector.EXPECT().CollectWithError(gomock.Any()).Return(errors.New("test error"))

SUT := NewInstrumentedCollector(mockCollector)

metrics := `# HELP ha_cluster_scrape_success Whether a collector succeeded.
# TYPE ha_cluster_scrape_success gauge
ha_cluster_scrape_success{collector="mock_collector"} 0
`

err := testutil.CollectAndCompare(SUT, strings.NewReader(metrics), "ha_cluster_scrape_success")
assert.NoError(t, err)

assert.Len(t, logHook.Entries, 1)
assert.Contains(t, logHook.LastEntry().Message, "test error")
}
25 changes: 17 additions & 8 deletions collector/pacemaker/pacemaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ import (
log "github.com/sirupsen/logrus"
)

const subsystem = "pacemaker"

func NewCollector(crmMonPath string, cibAdminPath string) (*pacemakerCollector, error) {
err := collector.CheckExecutables(crmMonPath, cibAdminPath)
if err != nil {
return nil, errors.Wrap(err, "could not initialize Pacemaker collector")
return nil, errors.Wrapf(err, "could not initialize '%s' collector", subsystem)
}

c := &pacemakerCollector{
collector.NewDefaultCollector("pacemaker"),
collector.NewDefaultCollector(subsystem),
crmmon.NewCrmMonParser(crmMonPath),
cib.NewCibAdminParser(cibAdminPath),
}
Expand All @@ -44,19 +46,17 @@ type pacemakerCollector struct {
cibParser cib.Parser
}

func (c *pacemakerCollector) Collect(ch chan<- prometheus.Metric) {
func (c *pacemakerCollector) CollectWithError(ch chan<- prometheus.Metric) error {
log.Debugln("Collecting pacemaker metrics...")

crmMon, err := c.crmMonParser.Parse()
if err != nil {
log.Warnf("Pacemaker Collector scrape failed: %s", err)
return
return errors.Wrap(err, "crm_mon parser error")
}

CIB, err := c.cibParser.Parse()
if err != nil {
log.Warnf("Pacemaker Collector scrape failed: %s", err)
return
return errors.Wrap(err, "cibadmin parser error")
}

c.recordStonithStatus(crmMon, ch)
Expand All @@ -69,7 +69,16 @@ func (c *pacemakerCollector) Collect(ch chan<- prometheus.Metric) {

err = c.recordCibLastChange(crmMon, ch)
if err != nil {
log.Warnf("Pacemaker Collector scrape failed: %s", err)
return errors.Wrap(err, "could not record CIB last change")
}

return nil
}

func (c *pacemakerCollector) Collect(ch chan<- prometheus.Metric) {
err := c.CollectWithError(ch)
if err != nil {
log.Warnf("'%s' collector scrape failed: %s", c.GetSubsystem(), err)
}
}

Expand Down
Loading

0 comments on commit 0af48bf

Please sign in to comment.