Skip to content

Commit

Permalink
Merge branch 'main' into fix/vcenter-tls
Browse files Browse the repository at this point in the history
  • Loading branch information
dehaansa authored Nov 21, 2024
2 parents 1bc7f34 + b6b8679 commit b7a14fd
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 45 deletions.
27 changes: 27 additions & 0 deletions .chloggen/dd-config-api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/datadog

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Refactor the API that provides metrics translator"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36474]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: "This is API change only and does not affect end users"

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
25 changes: 23 additions & 2 deletions exporter/datadogexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
otlpmetrics "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
Expand All @@ -29,9 +30,20 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics/sketches"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub"
datadogconfig "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/datadog/config"
)

var metricRemappingDisableddFeatureGate = featuregate.GlobalRegistry().MustRegister(
"exporter.datadogexporter.metricremappingdisabled",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled the Datadog Exporter remaps OpenTelemetry semantic conventions to Datadog semantic conventions. This feature gate is only for internal use."),
featuregate.WithRegisterReferenceURL("https://docs.datadoghq.com/opentelemetry/schema_semantics/metrics_mapping/"),
)

// isMetricRemappingDisabled returns true if the datadogexporter should generate Datadog-compliant metrics from OpenTelemetry metrics
func isMetricRemappingDisabled() bool {
return metricRemappingDisableddFeatureGate.IsEnabled()
}

type metricsExporter struct {
params exporter.Settings
cfg *Config
Expand Down Expand Up @@ -61,7 +73,16 @@ func newMetricsExporter(
metadataReporter *inframetadata.Reporter,
statsOut chan []byte,
) (*metricsExporter, error) {
tr, err := datadogconfig.TranslatorFromConfig(params.TelemetrySettings, cfg.Metrics, attrsTranslator, sourceProvider, statsOut)
options := cfg.Metrics.ToTranslatorOpts()
options = append(options, otlpmetrics.WithFallbackSourceProvider(sourceProvider))
options = append(options, otlpmetrics.WithStatsOut(statsOut))
if isMetricRemappingDisabled() {
params.TelemetrySettings.Logger.Warn("Metric remapping is disabled in the Datadog exporter. OpenTelemetry metrics must be mapped to Datadog semantics before metrics are exported to Datadog (ex: via a processor).")
} else {
options = append(options, otlpmetrics.WithRemapping())
}

tr, err := otlpmetrics.NewTranslator(params.TelemetrySettings, attrsTranslator, options...)
if err != nil {
return nil, err
}
Expand Down
51 changes: 44 additions & 7 deletions exporter/elasticsearchexporter/integrationtest/datareceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand All @@ -32,12 +33,17 @@ import (
)

const (
// TestLogsIndex is used by the mock ES data receiver to indentify log events.
// TestLogsIndex is used by the mock ES data receiver to identify log events.
// Exporter LogsIndex configuration must be configured with TestLogsIndex for
// the data receiver to work properly
TestLogsIndex = "logs-test-idx"

// TestTracesIndex is used by the mock ES data receiver to indentify trace
// TestMetricsIndex is used by the mock ES data receiver to identify metric events.
// Exporter MetricsIndex configuration must be configured with TestMetricsIndex for
// the data receiver to work properly
TestMetricsIndex = "metrics-test-idx"

// TestTracesIndex is used by the mock ES data receiver to identify trace
// events. Exporter TracesIndex configuration must be configured with
// TestTracesIndex for the data receiver to work properly
TestTracesIndex = "traces-test-idx"
Expand Down Expand Up @@ -79,11 +85,12 @@ func withBatcherEnabled(enabled bool) dataReceiverOption {
}
}

func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error {
func (es *esDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
factory := receiver.NewFactory(
component.MustNewType("mockelasticsearch"),
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment),
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelDevelopment),
receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment),
)
esURL, err := url.Parse(es.endpoint)
Expand All @@ -101,13 +108,18 @@ func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consu
if err != nil {
return fmt.Errorf("failed to create logs receiver: %w", err)
}
metricsReceiver, err := factory.CreateMetrics(context.Background(), set, cfg, mc)
if err != nil {
return fmt.Errorf("failed to create metrics receiver: %w", err)
}
tracesReceiver, err := factory.CreateTraces(context.Background(), set, cfg, tc)
if err != nil {
return fmt.Errorf("failed to create traces receiver: %w", err)
}

// Since we use SharedComponent both receivers should be same
require.Same(es.t, logsReceiver, tracesReceiver)
require.Same(es.t, logsReceiver, metricsReceiver)
es.receiver = logsReceiver

return es.receiver.Start(context.Background(), componenttest.NewNopHost())
Expand All @@ -126,15 +138,22 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
elasticsearch:
endpoints: [%s]
logs_index: %s
logs_dynamic_index:
enabled: false
metrics_index: %s
metrics_dynamic_index:
enabled: false
traces_index: %s
traces_dynamic_index:
enabled: false
sending_queue:
enabled: true
retry:
enabled: true
initial_interval: 100ms
max_interval: 1s
max_requests: 10000`,
es.endpoint, TestLogsIndex, TestTracesIndex,
es.endpoint, TestLogsIndex, TestMetricsIndex, TestTracesIndex,
)

if es.batcherEnabled == nil {
Expand Down Expand Up @@ -189,6 +208,19 @@ func createLogsReceiver(
return receiver, nil
}

func createMetricsReceiver(
_ context.Context,
params receiver.Settings,
rawCfg component.Config,
next consumer.Metrics,
) (receiver.Metrics, error) {
receiver := receivers.GetOrAdd(rawCfg, func() component.Component {
return newMockESReceiver(params, rawCfg.(*config))
})
receiver.Unwrap().(*mockESReceiver).metricsConsumer = next
return receiver, nil
}

func createTracesReceiver(
_ context.Context,
params receiver.Settings,
Expand All @@ -206,8 +238,9 @@ type mockESReceiver struct {
params receiver.Settings
config *config

tracesConsumer consumer.Traces
logsConsumer consumer.Logs
tracesConsumer consumer.Traces
logsConsumer consumer.Logs
metricsConsumer consumer.Metrics

server *http.Server
}
Expand All @@ -231,10 +264,12 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error

// Ideally bulk request items should be converted to the corresponding event record
// however, since we only assert count for now there is no need to do the actual
// translation. Instead we use a pre-initialized empty logs and traces model to
// translation. Instead we use a pre-initialized empty models to
// reduce allocation impact on tests and benchmarks.
emptyLogs := plog.NewLogs()
emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
emptyMetrics := pmetric.NewMetrics()
emptyMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
emptyTrace := ptrace.NewTraces()
emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()

Expand All @@ -260,6 +295,8 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error
switch item.Index {
case TestLogsIndex:
consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs)
case TestMetricsIndex:
consumeErr = es.metricsConsumer.ConsumeMetrics(context.Background(), emptyMetrics)
case TestTracesIndex:
consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func BenchmarkExporter(b *testing.B) {
for _, eventType := range []string{"logs", "traces"} {
for _, eventType := range []string{"logs", "metrics", "traces"} {
for _, mappingMode := range []string{"none", "ecs", "raw"} {
for _, tc := range []struct {
name string
Expand All @@ -41,6 +41,8 @@ func BenchmarkExporter(b *testing.B) {
switch eventType {
case "logs":
benchmarkLogs(b, tc.batchSize, mappingMode)
case "metrics":
benchmarkMetrics(b, tc.batchSize, mappingMode)
case "traces":
benchmarkTraces(b, tc.batchSize, mappingMode)
}
Expand Down Expand Up @@ -79,6 +81,35 @@ func benchmarkLogs(b *testing.B, batchSize int, mappingMode string) {
require.NoError(b, exporter.Shutdown(ctx))
}

func benchmarkMetrics(b *testing.B, batchSize int, mappingMode string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

exporterSettings := exportertest.NewNopSettings()
exporterSettings.TelemetrySettings.Logger = zaptest.NewLogger(b, zaptest.Level(zap.WarnLevel))
runnerCfg := prepareBenchmark(b, batchSize, mappingMode)
exporter, err := runnerCfg.factory.CreateMetrics(
ctx, exporterSettings, runnerCfg.esCfg,
)
require.NoError(b, err)
require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost()))

b.ReportAllocs()
b.ResetTimer()
b.StopTimer()
for i := 0; i < b.N; i++ {
metrics, _ := runnerCfg.provider.GenerateMetrics()
b.StartTimer()
require.NoError(b, exporter.ConsumeMetrics(ctx, metrics))
b.StopTimer()
}
b.ReportMetric(
float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(),
"events/s",
)
require.NoError(b, exporter.Shutdown(ctx))
}

func benchmarkTraces(b *testing.B, batchSize int, mappingMode string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -134,7 +165,11 @@ func prepareBenchmark(
cfg.esCfg.Mapping.Mode = mappingMode
cfg.esCfg.Endpoints = []string{receiver.endpoint}
cfg.esCfg.LogsIndex = TestLogsIndex
cfg.esCfg.LogsDynamicIndex.Enabled = false
cfg.esCfg.MetricsIndex = TestMetricsIndex
cfg.esCfg.MetricsDynamicIndex.Enabled = false
cfg.esCfg.TracesIndex = TestTracesIndex
cfg.esCfg.TracesDynamicIndex.Enabled = false
cfg.esCfg.Flush.Interval = 10 * time.Millisecond
cfg.esCfg.NumWorkers = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func TestExporter(t *testing.T) {
for _, eventType := range []string{"logs", "traces"} {
for _, eventType := range []string{"logs", "metrics", "traces"} {
for _, tc := range []struct {
name string

Expand Down Expand Up @@ -68,6 +68,8 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool
switch eventType {
case "logs":
sender = testbed.NewOTLPLogsDataSender(host, port)
case "metrics":
sender = testbed.NewOTLPMetricDataSender(host, port)
case "traces":
sender = testbed.NewOTLPTraceDataSender(host, port)
default:
Expand Down
30 changes: 3 additions & 27 deletions pkg/datadog/config/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@ import (
"encoding"
"fmt"

"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
otlpmetrics "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/featuregate"
)

// MetricsConfig defines the metrics exporter specific configuration options
Expand Down Expand Up @@ -212,29 +208,10 @@ type MetricsExporterConfig struct {
InstrumentationScopeMetadataAsTags bool `mapstructure:"instrumentation_scope_metadata_as_tags"`
}

var metricRemappingDisableddFeatureGate = featuregate.GlobalRegistry().MustRegister(
"exporter.datadogexporter.metricremappingdisabled",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled the Datadog Exporter remaps OpenTelemetry semantic conventions to Datadog semantic conventions. This feature gate is only for internal use."),
featuregate.WithRegisterReferenceURL("https://docs.datadoghq.com/opentelemetry/schema_semantics/metrics_mapping/"),
)

// isMetricRemappingDisabled returns true if the datadogexporter should generate Datadog-compliant metrics from OpenTelemetry metrics
func isMetricRemappingDisabled() bool {
return metricRemappingDisableddFeatureGate.IsEnabled()
}

// TranslatorFromConfig creates a new metrics translator from the exporter
func TranslatorFromConfig(set component.TelemetrySettings, mcfg MetricsConfig, attrsTranslator *attributes.Translator, sourceProvider source.Provider, statsOut chan []byte) (*otlpmetrics.Translator, error) {
// ToTranslatorOpts returns a list of metrics translator options from the metrics config
func (mcfg MetricsConfig) ToTranslatorOpts() []otlpmetrics.TranslatorOption {
options := []otlpmetrics.TranslatorOption{
otlpmetrics.WithDeltaTTL(mcfg.DeltaTTL),
otlpmetrics.WithFallbackSourceProvider(sourceProvider),
}

if isMetricRemappingDisabled() {
set.Logger.Warn("Metric remapping is disabled in the Datadog exporter. OpenTelemetry metrics must be mapped to Datadog semantics before metrics are exported to Datadog (ex: via a processor).")
} else {
options = append(options, otlpmetrics.WithRemapping())
}

if mcfg.HistConfig.SendAggregations {
Expand Down Expand Up @@ -262,6 +239,5 @@ func TranslatorFromConfig(set component.TelemetrySettings, mcfg MetricsConfig, a
options = append(options, otlpmetrics.WithInitialCumulMonoValueMode(
otlpmetrics.InitialCumulMonoValueMode(mcfg.SumConfig.InitialCumulativeMonotonicMode)))

options = append(options, otlpmetrics.WithStatsOut(statsOut))
return otlpmetrics.NewTranslator(set, attrsTranslator, options...)
return options
}
4 changes: 1 addition & 3 deletions pkg/datadog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.22.0

require (
github.com/DataDog/datadog-agent/pkg/util/hostname/validate v0.59.0
github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.21.0
github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.21.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.114.0
Expand All @@ -17,14 +16,14 @@ require (
go.opentelemetry.io/collector/config/configtls v1.20.0
go.opentelemetry.io/collector/confmap v1.20.0
go.opentelemetry.io/collector/exporter v0.114.0
go.opentelemetry.io/collector/featuregate v1.20.0
go.uber.org/zap v1.27.0
)

require (
github.com/DataDog/datadog-agent/pkg/proto v0.52.0-devel // indirect
github.com/DataDog/datadog-agent/pkg/util/log v0.59.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/scrubber v0.59.0 // indirect
github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.21.0 // indirect
github.com/DataDog/opentelemetry-mapping-go/pkg/quantile v0.21.0 // indirect
github.com/DataDog/sketches-go v1.4.4 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
Expand All @@ -40,7 +39,6 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
Expand Down
Loading

0 comments on commit b7a14fd

Please sign in to comment.