Skip to content

Commit

Permalink
Use Proto bytes as attribute (#216)
Browse files Browse the repository at this point in the history
* Use json encoded data

* Send stats payload to channel

* Fix lint issues

* Add changelog

* Using bytes proto

* PR feedback use stats payload on NewTranslator

* Update .chloggen/dinesh.gurumurthy_OTEL-1305.yaml

Co-authored-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* update gotidy

* Update .chloggen/dinesh.gurumurthy_OTEL-1305.yaml

Co-authored-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* PR feedback - revert un-needed changes

* Add Old code back

* PR feedback

* Update change type

* Run gotidy

* Apply suggestions from code review

Co-authored-by: Yang Song <songy23@users.noreply.github.com>

* Address PR feedback

* Update pkg/otlp/metrics/statspayload_test.go

Co-authored-by: Yang Song <songy23@users.noreply.github.com>

---------

Co-authored-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
Co-authored-by: Yang Song <songy23@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 15, 2023
1 parent 0928442 commit 01206f4
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 14 deletions.
12 changes: 12 additions & 0 deletions .chloggen/dinesh.gurumurthy_OTEL-1305.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement
# The name of the component (e.g. pkg/quantile)
component: pkg/otlp/metrics
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: StatsPayload can now be sent on out channel provided.
# The PR related to this change
issues: [216]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ linters-settings:
- github.com/lightstep/go-expohisto
- github.com/patrickmn/go-cache
- github.com/stretchr/testify
- google.golang.org/protobuf/proto
- golang.org/x/exp
- gopkg.in/yaml.v3
- go.uber.org
Expand Down
11 changes: 10 additions & 1 deletion pkg/otlp/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type translatorConfig struct {
deltaTTL int64

fallbackSourceProvider source.Provider
// statsOut is the channel where the translator will send its APM statsPayload bytes
statsOut chan<- []byte
}

// TranslatorOption is a translator creation option.
Expand Down Expand Up @@ -123,7 +125,6 @@ const (
// The default mode is HistogramModeOff.
func WithHistogramMode(mode HistogramMode) TranslatorOption {
return func(t *translatorConfig) error {

switch mode {
case HistogramModeNoBuckets, HistogramModeCounters, HistogramModeDistributions:
t.HistMode = mode
Expand Down Expand Up @@ -171,6 +172,14 @@ func WithNumberMode(mode NumberMode) TranslatorOption {
}
}

// WithStatsOut sets the channel where the translator will send its APM statsPayload bytes
func WithStatsOut(statsOut chan<- []byte) TranslatorOption {
return func(t *translatorConfig) error {
t.statsOut = statsOut
return nil
}
}

// InitialCumulMonoValueMode defines what the exporter should do with the initial value
// of a cumulative monotonic sum when under the 'cumulative_to_delta' mode.
// It is not used when the mode is 'raw_value'.
Expand Down
2 changes: 1 addition & 1 deletion pkg/otlp/metrics/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.21.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
google.golang.org/protobuf v1.31.0
)

require (
Expand Down Expand Up @@ -52,7 +53,6 @@ require (
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
16 changes: 13 additions & 3 deletions pkg/otlp/metrics/metrics_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ import (
"strings"
"time"

"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
otelmetric "go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"golang.org/x/exp/slices"

"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics/internal/instrumentationlibrary"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics/internal/instrumentationscope"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"
)

const (
Expand Down Expand Up @@ -716,6 +716,16 @@ func (t *Translator) MapMetrics(ctx context.Context, md pmetric.Metrics, consume
newMetrics := pmetric.NewMetricSlice()
for k := 0; k < metricsArray.Len(); k++ {
md := metricsArray.At(k)
if md.Name() == keyStatsPayload && md.Type() == pmetric.MetricTypeSum {

// these metrics are an APM Stats payload; consume it as such
for l := 0; l < md.Sum().DataPoints().Len(); l++ {
if payload, ok := md.Sum().DataPoints().At(l).Attributes().Get(keyStatsPayload); ok && t.cfg.statsOut != nil && payload.Type() == pcommon.ValueTypeBytes {
t.cfg.statsOut <- payload.Bytes().AsRaw()
}
}
continue
}
if v, ok := runtimeMetricsMappings[md.Name()]; ok {
metadata.Languages = extractLanguageTag(md.Name(), metadata.Languages)
for _, mp := range v {
Expand Down
39 changes: 33 additions & 6 deletions pkg/otlp/metrics/metrics_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import (
"testing"
"time"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile/summary"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile/summary"
"google.golang.org/protobuf/proto"
)

func TestIsCumulativeMonotonic(t *testing.T) {
Expand Down Expand Up @@ -98,12 +98,13 @@ func (t testProvider) Source(context.Context) (source.Source, error) {
}, nil
}

func newTranslator(t *testing.T, logger *zap.Logger) *Translator {
func newTranslatorWithStatsChannel(t *testing.T, logger *zap.Logger, ch chan []byte) *Translator {
options := []TranslatorOption{
WithFallbackSourceProvider(testProvider(fallbackHostname)),
WithHistogramMode(HistogramModeDistributions),
WithNumberMode(NumberModeCumulativeToDelta),
WithHistogramAggregations(),
WithStatsOut(ch),
}

set := componenttest.NewNopTelemetrySettings()
Expand All @@ -117,6 +118,10 @@ func newTranslator(t *testing.T, logger *zap.Logger) *Translator {
return tr
}

func newTranslator(t *testing.T, logger *zap.Logger) *Translator {
return newTranslatorWithStatsChannel(t, logger, nil)
}

type metric struct {
name string
typ DataType
Expand Down Expand Up @@ -921,6 +926,28 @@ func TestMapAPMStats(t *testing.T) {
require.Equal(t, consumer.apmstats, statsPayloads)
}

func TestMapAPMStatsWithBytes(t *testing.T) {
consumer := &mockFullConsumer{}
logger, err := zap.NewDevelopment()
require.NoError(t, err)
ch := make(chan []byte, 10)
tr := newTranslatorWithStatsChannel(t, logger, ch)
want := &pb.StatsPayload{
Stats: []*pb.ClientStatsPayload{statsPayloads[0], statsPayloads[1]},
}
md, err := tr.StatsToMetrics(want)
assert.NoError(t, err)

ctx := context.Background()
tr.MapMetrics(ctx, md, consumer)
got := &pb.StatsPayload{}

payload := <-ch
err = proto.Unmarshal(payload, got)
assert.NoError(t, err)
assert.True(t, proto.Equal(want, got))
}

func TestMapDoubleMonotonicReportDiffForFirstValue(t *testing.T) {
ctx := context.Background()
tr := newTranslator(t, zap.NewNop())
Expand Down
30 changes: 27 additions & 3 deletions pkg/otlp/metrics/statspayload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"strings"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/sketches-go/ddsketch"
"github.com/DataDog/sketches-go/ddsketch/mapping"
"github.com/DataDog/sketches-go/ddsketch/pb/sketchpb"
Expand All @@ -27,16 +29,17 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
)

// keyAPMStats specifies the key name of the resource attribute which identifies resource metrics
// as being an APM Stats Payload. The presence of the key results in them being treated and consumed
// differently by the Translator.
const keyAPMStats = "_dd.apm_stats"

// keyStatsPayload is the key for the stats payload in the attributes map.
// This is used as Metric name and Attribute key.
const keyStatsPayload = "dd.internal.stats.payload"

// This group of constants specifies the metric attribute keys used for APM Stats aggregation keys.
const (
statsKeyHostname = "dd.hostname"
Expand Down Expand Up @@ -473,3 +476,24 @@ func getInt(m pcommon.Map, k string) uint64 {
}
return uint64(v.Int())
}

// StatsToMetrics converts a StatsPayload to a pdata.Metrics
func (t *Translator) StatsToMetrics(sp *pb.StatsPayload) (pmetric.Metrics, error) {
bytes, err := proto.Marshal(sp)
if err != nil {
t.logger.Error("Failed to marshal stats payload", zap.Error(err))
return pmetric.NewMetrics(), err
}
mmx := pmetric.NewMetrics()
rmx := mmx.ResourceMetrics().AppendEmpty()
smx := rmx.ScopeMetrics().AppendEmpty()
mslice := smx.Metrics()
mx := mslice.AppendEmpty()
mx.SetName(keyStatsPayload)
sum := mx.SetEmptySum()
sum.SetIsMonotonic(false)
dp := sum.DataPoints().AppendEmpty()
byteSlice := dp.Attributes().PutEmptyBytes(keyStatsPayload)
byteSlice.Append(bytes...)
return mmx, nil
}
26 changes: 26 additions & 0 deletions pkg/otlp/metrics/statspayload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,32 @@ func TestConversion(t *testing.T) {
if found != len(want.Stats) {
t.Fatalf("Found %d/%d", found, len(want.Stats))
}
mx, err := trans.StatsToMetrics(want)
assert.NoError(t, err)
var results []*pb.StatsPayload
for i := 0; i < mx.ResourceMetrics().Len(); i++ {
rm := mx.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
sm := rm.ScopeMetrics().At(j)
for k := 0; k < sm.Metrics().Len(); k++ {
md := sm.Metrics().At(k)
// these metrics are an APM Stats payload; consume it as such
for l := 0; l < md.Sum().DataPoints().Len(); l++ {
if payload, ok := md.Sum().DataPoints().At(l).Attributes().Get(keyStatsPayload); ok {

stats := &pb.StatsPayload{}
err = proto.Unmarshal(payload.Bytes().AsRaw(), stats)
assert.NoError(t, err)
results = append(results, stats)
}
}
assert.NoError(t, err)
}
}
}

assert.Len(t, results, 1)
assert.True(t, proto.Equal(want, results[0]))
})
}

Expand Down

0 comments on commit 01206f4

Please sign in to comment.