Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument obsreport.Scraper #19

Merged
merged 6 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions internal/obsreportconfig/obsreportconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,7 @@ func allViews() []*view.View {
views = append(views, receiverViews()...)

// Scraper views.
measures = []*stats.Int64Measure{
obsmetrics.ScraperScrapedMetricPoints,
obsmetrics.ScraperErroredMetricPoints,
}
tagKeys = []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
views = append(views, scraperViews()...)

// Exporter views.
measures = []*stats.Int64Measure{
Expand Down Expand Up @@ -136,6 +131,20 @@ func receiverViews() []*view.View {
return genViews(measures, tagKeys, view.Sum())
}

func scraperViews() []*view.View {
if featuregate.GetRegistry().IsEnabled(UseOtelForInternalMetricsfeatureGateID) {
return nil
}

measures := []*stats.Int64Measure{
obsmetrics.ScraperScrapedMetricPoints,
obsmetrics.ScraperErroredMetricPoints,
}
tagKeys := []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}

return genViews(measures, tagKeys, view.Sum())
}

func genViews(
measures []*stats.Int64Measure,
tagKeys []tag.Key,
Expand Down
76 changes: 71 additions & 5 deletions obsreport/obsreport_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,40 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/receiver/scrapererror"
)

var (
scraperName = "scraper"
scraperScope = scopeName + nameSep + scraperName
)

// Scraper is a helper to add observability to a component.Scraper.
type Scraper struct {
level configtelemetry.Level
receiverID config.ComponentID
scraper config.ComponentID
mutators []tag.Mutator
tracer trace.Tracer

logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue
scrapedMetricsPoints syncint64.Counter
erroredMetricsPoints syncint64.Counter
}

// ScraperSettings are settings for creating a Scraper.
Expand All @@ -48,15 +66,58 @@ type ScraperSettings struct {

// NewScraper creates a new Scraper.
func NewScraper(cfg ScraperSettings) *Scraper {
return &Scraper{
return newScraper(cfg, featuregate.GetRegistry())
}

func newScraper(cfg ScraperSettings, registry *featuregate.Registry) *Scraper {
scraper := &Scraper{
level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel,
receiverID: cfg.ReceiverID,
scraper: cfg.Scraper,
mutators: []tag.Mutator{
tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(obsmetrics.TagKeyScraper, cfg.Scraper.String(), tag.WithTTL(tag.TTLNoPropagation))},
tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.Scraper.String()),

logger: cfg.ReceiverCreateSettings.Logger,
useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()),
attribute.String(obsmetrics.ScraperKey, cfg.Scraper.String()),
},
}

scraper.createOtelMetrics(cfg)
return scraper
}

func (s *Scraper) createOtelMetrics(cfg ScraperSettings) {
meter := cfg.ReceiverCreateSettings.MeterProvider.Meter(scraperScope)
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved
if !s.useOtelForMetrics {
return
}

var err error
handleError := func(metricName string, err error) {
if err != nil {
s.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName))
}
}

s.scrapedMetricsPoints, err = meter.SyncInt64().Counter(
obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey,
instrument.WithDescription("Number of metric points successfully scraped."),
instrument.WithUnit(unit.Dimensionless),
)
handleError(obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey, err)

s.erroredMetricsPoints, err = meter.SyncInt64().Counter(
obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey,
instrument.WithDescription("Number of metric points that were unable to be scraped."),
instrument.WithUnit(unit.Dimensionless),
)
handleError(obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey, err)

}

// StartMetricsOp is called when a scrape operation is started. The
Expand Down Expand Up @@ -91,10 +152,15 @@ func (s *Scraper) EndMetricsOp(
span := trace.SpanFromContext(scraperCtx)

if s.level != configtelemetry.LevelNone {
stats.Record(
scraperCtx,
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
if s.useOtelForMetrics {
s.scrapedMetricsPoints.Add(scraperCtx, int64(numScrapedMetrics), s.otelAttrs...)
s.erroredMetricsPoints.Add(scraperCtx, int64(numErroredMetrics), s.otelAttrs...)
} else { // OC for metrics
stats.Record(
scraperCtx,
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
}
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved
}

// end span according to errors
Expand Down
96 changes: 47 additions & 49 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,61 +214,59 @@ func TestReceiveMetricsOp(t *testing.T) {
}

func TestScrapeMetricsDataOp(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

params := []testParams{
{items: 23, err: partialErrFake},
{items: 29, err: errFake},
{items: 15, err: nil},
}
for i := range params {
scrp := NewScraper(ScraperSettings{
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
})
ctx := scrp.StartMetricsOp(parentCtx)
assert.NotNil(t, ctx)
scrp.EndMetricsOp(ctx, params[i].items, params[i].err)
}
params := []testParams{
{items: 23, err: partialErrFake},
{items: 29, err: errFake},
{items: 15, err: nil},
}
for i := range params {
scrp := NewScraper(ScraperSettings{
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
})
ctx := scrp.StartMetricsOp(parentCtx)
assert.NotNil(t, ctx)
scrp.EndMetricsOp(ctx, params[i].items, params[i].err)
}

spans := tt.SpanRecorder.Ended()
require.Equal(t, len(params), len(spans))
spans := tt.SpanRecorder.Ended()
require.Equal(t, len(params), len(spans))

var scrapedMetricPoints, erroredMetricPoints int
for i, span := range spans {
assert.Equal(t, "scraper/"+receiver.String()+"/"+scraper.String()+"/MetricsScraped", span.Name())
switch {
case params[i].err == nil:
scrapedMetricPoints += params[i].items
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)})
assert.Equal(t, codes.Unset, span.Status().Code)
case errors.Is(params[i].err, errFake):
erroredMetricPoints += params[i].items
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
assert.Equal(t, codes.Error, span.Status().Code)
assert.Equal(t, params[i].err.Error(), span.Status().Description)
var scrapedMetricPoints, erroredMetricPoints int
for i, span := range spans {
assert.Equal(t, "scraper/"+receiver.String()+"/"+scraper.String()+"/MetricsScraped", span.Name())
switch {
case params[i].err == nil:
scrapedMetricPoints += params[i].items
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)})
assert.Equal(t, codes.Unset, span.Status().Code)
case errors.Is(params[i].err, errFake):
erroredMetricPoints += params[i].items
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
assert.Equal(t, codes.Error, span.Status().Code)
assert.Equal(t, params[i].err.Error(), span.Status().Description)

case errors.Is(params[i].err, partialErrFake):
scrapedMetricPoints += params[i].items
erroredMetricPoints++
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)})
assert.Equal(t, codes.Error, span.Status().Code)
assert.Equal(t, params[i].err.Error(), span.Status().Description)
default:
t.Fatalf("unexpected err param: %v", params[i].err)
case errors.Is(params[i].err, partialErrFake):
scrapedMetricPoints += params[i].items
erroredMetricPoints++
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)})
assert.Equal(t, codes.Error, span.Status().Code)
assert.Equal(t, params[i].err.Error(), span.Status().Description)
default:
t.Fatalf("unexpected err param: %v", params[i].err)
}
}
}

require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, int64(scrapedMetricPoints), int64(erroredMetricPoints)))
require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, int64(scrapedMetricPoints), int64(erroredMetricPoints)))
})
}

func TestExportTraceDataOp(t *testing.T) {
Expand Down
7 changes: 2 additions & 5 deletions obsreport/obsreporttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,8 @@ func CheckReceiverMetrics(tts TestTelemetry, receiver config.ComponentID, protoc

// CheckScraperMetrics checks that for the current exported values for metrics scraper metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func CheckScraperMetrics(_ TestTelemetry, receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error {
scraperTags := tagsForScraperView(receiver, scraper)
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved
return multierr.Combine(
checkValueForView(scraperTags, scrapedMetricPoints, "scraper/scraped_metric_points"),
checkValueForView(scraperTags, erroredMetricPoints, "scraper/errored_metric_points"))
func CheckScraperMetrics(tts TestTelemetry, receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error {
return tts.otelPrometheusChecker.checkScraperMetrics(receiver, scraper, scrapedMetricPoints, erroredMetricPoints)
}

// checkValueForView checks that for the current exported value in the view with the given name
Expand Down
21 changes: 21 additions & 0 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,31 @@ const (
)

var (
scraper = config.NewComponentID("fakeScraper")
receiver = config.NewComponentID("fakeReicever")
exporter = config.NewComponentID("fakeExporter")
)

func TestCheckScraperMetricsViews(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

s := obsreport.NewScraper(obsreport.ScraperSettings{
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
})
ctx := s.StartMetricsOp(context.Background())
require.NotNil(t, ctx)
s.EndMetricsOp(ctx, 7, nil)

assert.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 0))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 7))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 0))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 7))
}

func TestCheckReceiverTracesViews(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
Expand Down
13 changes: 13 additions & 0 deletions obsreport/obsreporttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ type prometheusChecker struct {
promHandler http.Handler
}

func (pc *prometheusChecker) checkScraperMetrics(receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error{
scraperAttrs := attributesForScraperMetrics(receiver, scraper)
return multierr.Combine(
pc.checkCounter("scraper_scraped_metric_points", scrapedMetricPoints, scraperAttrs),
pc.checkCounter("scraper_errored_metric_points", erroredMetricPoints, scraperAttrs))
}

func (pc *prometheusChecker) checkReceiverTraces(receiver config.ComponentID, protocol string, acceptedSpans, droppedSpans int64) error {
receiverAttrs := attributesForReceiverMetrics(receiver, protocol)
return multierr.Combine(
Expand Down Expand Up @@ -124,6 +131,12 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli
return parser.TextToMetricFamilies(rr.Body)
}

func attributesForScraperMetrics(receiver config.ComponentID, scraper config.ComponentID) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String(receiverTag.Name(), receiver.String()),
attribute.String(scraperTag.Name(), scraper.String()),
}
}
// attributesForReceiverMetrics returns the attributes that are needed for the receiver metrics.
func attributesForReceiverMetrics(receiver config.ComponentID, transport string) []attribute.KeyValue {
return []attribute.KeyValue{
Expand Down