diff --git a/receiver/kafkareceiver/documentation.md b/receiver/kafkareceiver/documentation.md new file mode 100644 index 000000000000..6fc7ddff888e --- /dev/null +++ b/receiver/kafkareceiver/documentation.md @@ -0,0 +1,71 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# kafka + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### kafka_receiver_current_offset + +Current message offset + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### kafka_receiver_messages + +Number of received messages + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### kafka_receiver_offset_lag + +Current offset lag + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### kafka_receiver_partition_close + +Number of finished partitions + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### kafka_receiver_partition_start + +Number of started partitions + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### kafka_receiver_unmarshal_failed_log_records + +Number of log records failed to be unmarshaled + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### kafka_receiver_unmarshal_failed_metric_points + +Number of metric points failed to be unmarshaled + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### kafka_receiver_unmarshal_failed_spans + +Number of spans failed to be unmarshaled + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index 7a6a977a4096..777ba42b6739 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" @@ -75,8 +74,6 @@ func withLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption { // NewFactory creates Kafka receiver factory. func NewFactory(options ...FactoryOption) receiver.Factory { - _ = view.Register(metricViews()...) - f := &kafkaReceiverFactory{ tracesUnmarshalers: map[string]TracesUnmarshaler{}, metricsUnmarshalers: map[string]MetricsUnmarshaler{}, diff --git a/receiver/kafkareceiver/generated_component_telemetry_test.go b/receiver/kafkareceiver/generated_component_telemetry_test.go new file mode 100644 index 000000000000..ac441ec2611a --- /dev/null +++ b/receiver/kafkareceiver/generated_component_telemetry_test.go @@ -0,0 +1,76 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package kafkareceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +type componentTestTelemetry struct { + reader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider +} + +func (tt *componentTestTelemetry) NewSettings() receiver.Settings { + settings := receivertest.NewNopSettings() + settings.MeterProvider = tt.meterProvider + settings.ID = component.NewID(component.MustNewType("kafka")) + + return settings +} + +func setupTestTelemetry() componentTestTelemetry { + reader := sdkmetric.NewManualReader() + return componentTestTelemetry{ + reader: reader, + meterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)), + } +} + +func (tt *componentTestTelemetry) assertMetrics(t *testing.T, expected []metricdata.Metrics) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := tt.getMetric(want.Name, md) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), tt.len(md)) +} + +func (tt *componentTestTelemetry) getMetric(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func (tt *componentTestTelemetry) len(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} + +func (tt *componentTestTelemetry) Shutdown(ctx context.Context) error { + return tt.meterProvider.Shutdown(ctx) +} diff --git a/receiver/kafkareceiver/generated_package_test.go b/receiver/kafkareceiver/generated_package_test.go index 518d2734b105..78f0f8a86d11 100644 --- a/receiver/kafkareceiver/generated_package_test.go +++ b/receiver/kafkareceiver/generated_package_test.go @@ -9,5 +9,5 @@ import ( ) func TestMain(m *testing.M) { - goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + goleak.VerifyTestMain(m) } diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index a2698c502ee7..4bd74b8980dd 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -16,8 +16,8 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.103.0 github.com/openzipkin/zipkin-go v0.4.3 github.com/stretchr/testify v1.9.0 - go.opencensus.io v0.24.0 go.opentelemetry.io/collector/component v0.103.0 + go.opentelemetry.io/collector/config/configtelemetry v0.103.0 go.opentelemetry.io/collector/config/configtls v0.103.0 go.opentelemetry.io/collector/confmap v0.103.0 go.opentelemetry.io/collector/consumer v0.103.0 @@ -25,7 +25,9 @@ require ( go.opentelemetry.io/collector/pdata/testdata v0.103.0 go.opentelemetry.io/collector/receiver v0.103.0 go.opentelemetry.io/collector/semconv v0.103.0 + go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/metric v1.27.0 + go.opentelemetry.io/otel/sdk/metric v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 @@ -80,14 +82,11 @@ require ( go.opentelemetry.io/collector v0.103.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.10.0 // indirect go.opentelemetry.io/collector/config/configretry v0.103.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.103.0 // indirect go.opentelemetry.io/collector/exporter v0.103.0 // indirect go.opentelemetry.io/collector/extension v0.103.0 // indirect go.opentelemetry.io/collector/featuregate v1.10.0 // indirect - go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.24.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect diff --git a/receiver/kafkareceiver/go.sum b/receiver/kafkareceiver/go.sum index de488febc4c6..f67ad9d42f6c 100644 --- a/receiver/kafkareceiver/go.sum +++ b/receiver/kafkareceiver/go.sum @@ -1,5 +1,3 @@ -cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw= github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ= github.com/apache/thrift v0.20.0 h1:631+KvYbsBZxmuJjYwhezVsrfc/TbqtZV4QcxOX1fOI= @@ -10,11 +8,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -25,10 +20,6 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= -github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= -github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -42,32 +33,11 @@ github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsM github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= -github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= @@ -134,7 +104,6 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= -github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= @@ -166,8 +135,6 @@ github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gi github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= -go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/collector v0.103.0 h1:mssWo1y31p1F/SRsSBnVUX6YocgawCqM1blpE+hkWog= go.opentelemetry.io/collector v0.103.0/go.mod h1:mgqdTFB7QCYiOeEdJSSEktovPqy+2fw4oTKJzyeSB0U= go.opentelemetry.io/collector/component v0.103.0 h1:j52YAsp8EmqYUotVUwhovkqFZGuxArEkk65V4TI46NE= @@ -223,41 +190,28 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= -golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= -golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= -golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= -golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= -golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -279,10 +233,6 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -291,29 +241,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= -google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 h1:Q2RxlXqh1cgzzUgV261vBO2jI5R/3DD1J2pM0nI4NhU= google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= -google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= -google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -326,5 +257,3 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/receiver/kafkareceiver/header_extraction_test.go b/receiver/kafkareceiver/header_extraction_test.go index 57b2e94ff6ef..01e84538ea15 100644 --- a/receiver/kafkareceiver/header_extraction_test.go +++ b/receiver/kafkareceiver/header_extraction_test.go @@ -19,6 +19,8 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" ) func TestHeaderExtractionTraces(t *testing.T) { @@ -26,13 +28,16 @@ func TestHeaderExtractionTraces(t *testing.T) { ReceiverCreateSettings: receivertest.NewNopSettings(), }) require.NoError(t, err) + telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings().TelemetrySettings) + require.NoError(t, err) nextConsumer := &consumertest.TracesSink{} c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zaptest.NewLogger(t), - ready: make(chan bool), - nextConsumer: nextConsumer, - obsrecv: obsrecv, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zaptest.NewLogger(t), + ready: make(chan bool), + nextConsumer: nextConsumer, + obsrecv: obsrecv, + telemetryBuilder: telemetryBuilder, } headers := []string{"headerKey1", "headerKey2"} c.headerExtractor = &headerExtractor{ @@ -88,15 +93,18 @@ func TestHeaderExtractionLogs(t *testing.T) { ReceiverCreateSettings: receivertest.NewNopSettings(), }) require.NoError(t, err) + telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings().TelemetrySettings) + require.NoError(t, err) nextConsumer := &consumertest.LogsSink{} unmarshaler := newTextLogsUnmarshaler() unmarshaler, err = unmarshaler.WithEnc("utf-8") c := logsConsumerGroupHandler{ - unmarshaler: unmarshaler, - logger: zaptest.NewLogger(t), - ready: make(chan bool), - nextConsumer: nextConsumer, - obsrecv: obsrecv, + unmarshaler: unmarshaler, + logger: zaptest.NewLogger(t), + ready: make(chan bool), + nextConsumer: nextConsumer, + obsrecv: obsrecv, + telemetryBuilder: telemetryBuilder, } headers := []string{"headerKey1", "headerKey2"} c.headerExtractor = &headerExtractor{ @@ -147,13 +155,16 @@ func TestHeaderExtractionMetrics(t *testing.T) { ReceiverCreateSettings: receivertest.NewNopSettings(), }) require.NoError(t, err) + telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings().TelemetrySettings) + require.NoError(t, err) nextConsumer := &consumertest.MetricsSink{} c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zaptest.NewLogger(t), - ready: make(chan bool), - nextConsumer: nextConsumer, - obsrecv: obsrecv, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zaptest.NewLogger(t), + ready: make(chan bool), + nextConsumer: nextConsumer, + obsrecv: obsrecv, + telemetryBuilder: telemetryBuilder, } headers := []string{"headerKey1", "headerKey2"} c.headerExtractor = &headerExtractor{ diff --git a/receiver/kafkareceiver/internal/metadata/generated_telemetry.go b/receiver/kafkareceiver/internal/metadata/generated_telemetry.go index d60cbac9b5bc..81cc959523d2 100644 --- a/receiver/kafkareceiver/internal/metadata/generated_telemetry.go +++ b/receiver/kafkareceiver/internal/metadata/generated_telemetry.go @@ -3,9 +3,14 @@ package metadata import ( - "go.opentelemetry.io/collector/component" + "errors" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" ) func Meter(settings component.TelemetrySettings) metric.Meter { @@ -15,3 +20,92 @@ func Meter(settings component.TelemetrySettings) metric.Meter { func Tracer(settings component.TelemetrySettings) trace.Tracer { return settings.TracerProvider.Tracer("otelcol/kafkareceiver") } + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + KafkaReceiverCurrentOffset metric.Int64Gauge + KafkaReceiverMessages metric.Int64Counter + KafkaReceiverOffsetLag metric.Int64Gauge + KafkaReceiverPartitionClose metric.Int64Counter + KafkaReceiverPartitionStart metric.Int64Counter + KafkaReceiverUnmarshalFailedLogRecords metric.Int64Counter + KafkaReceiverUnmarshalFailedMetricPoints metric.Int64Counter + KafkaReceiverUnmarshalFailedSpans metric.Int64Counter + level configtelemetry.Level +} + +// telemetryBuilderOption applies changes to default builder. +type telemetryBuilderOption func(*TelemetryBuilder) + +// WithLevel sets the current telemetry level for the component. +func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.level = lvl + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{level: configtelemetry.LevelBasic} + for _, op := range options { + op(&builder) + } + var err, errs error + if builder.level >= configtelemetry.LevelBasic { + builder.meter = Meter(settings) + } else { + builder.meter = noop.Meter{} + } + builder.KafkaReceiverCurrentOffset, err = builder.meter.Int64Gauge( + "kafka_receiver_current_offset", + metric.WithDescription("Current message offset"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.KafkaReceiverMessages, err = builder.meter.Int64Counter( + "kafka_receiver_messages", + metric.WithDescription("Number of received messages"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.KafkaReceiverOffsetLag, err = builder.meter.Int64Gauge( + "kafka_receiver_offset_lag", + metric.WithDescription("Current offset lag"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.KafkaReceiverPartitionClose, err = builder.meter.Int64Counter( + "kafka_receiver_partition_close", + metric.WithDescription("Number of finished partitions"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.KafkaReceiverPartitionStart, err = builder.meter.Int64Counter( + "kafka_receiver_partition_start", + metric.WithDescription("Number of started partitions"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.KafkaReceiverUnmarshalFailedLogRecords, err = builder.meter.Int64Counter( + "kafka_receiver_unmarshal_failed_log_records", + metric.WithDescription("Number of log records failed to be unmarshaled"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.KafkaReceiverUnmarshalFailedMetricPoints, err = builder.meter.Int64Counter( + "kafka_receiver_unmarshal_failed_metric_points", + metric.WithDescription("Number of metric points failed to be unmarshaled"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.KafkaReceiverUnmarshalFailedSpans, err = builder.meter.Int64Counter( + "kafka_receiver_unmarshal_failed_spans", + metric.WithDescription("Number of spans failed to be unmarshaled"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/receiver/kafkareceiver/internal/metadata/generated_telemetry_test.go b/receiver/kafkareceiver/internal/metadata/generated_telemetry_test.go index eacf03726481..7ba4e89d2dae 100644 --- a/receiver/kafkareceiver/internal/metadata/generated_telemetry_test.go +++ b/receiver/kafkareceiver/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 29f2eaf403de..bb6da26c79c1 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -11,19 +11,23 @@ import ( "sync" "github.com/IBM/sarama" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" ) const ( transport = "kafka" + // TODO: update the following attributes to reflect semconv + attrInstanceName = "name" + attrPartition = "partition" ) var errInvalidInitialOffset = fmt.Errorf("invalid initial offset") @@ -37,7 +41,8 @@ type kafkaTracesConsumer struct { cancelConsumeLoop context.CancelFunc unmarshaler TracesUnmarshaler - settings receiver.Settings + settings receiver.Settings + telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking @@ -54,7 +59,8 @@ type kafkaMetricsConsumer struct { cancelConsumeLoop context.CancelFunc unmarshaler MetricsUnmarshaler - settings receiver.Settings + settings receiver.Settings + telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking @@ -71,7 +77,8 @@ type kafkaLogsConsumer struct { cancelConsumeLoop context.CancelFunc unmarshaler LogsUnmarshaler - settings receiver.Settings + settings receiver.Settings + telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking @@ -88,6 +95,11 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesU return nil, errUnrecognizedEncoding } + telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + return &kafkaTracesConsumer{ config: config, topics: []string{config.Topic}, @@ -98,6 +110,7 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesU messageMarking: config.MessageMarking, headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, + telemetryBuilder: telemetryBuilder, }, nil } @@ -153,6 +166,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, _ component.Host) error { autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: c.telemetryBuilder, } if c.headerExtraction { consumerGroup.headerExtractor = &headerExtractor{ @@ -201,6 +215,11 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler Metric return nil, errUnrecognizedEncoding } + telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + return &kafkaMetricsConsumer{ config: config, topics: []string{config.Topic}, @@ -211,6 +230,7 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler Metric messageMarking: config.MessageMarking, headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, + telemetryBuilder: telemetryBuilder, }, nil } @@ -240,6 +260,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, _ component.Host) error autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: c.telemetryBuilder, } if c.headerExtraction { metricsConsumerGroup.headerExtractor = &headerExtractor{ @@ -288,6 +309,11 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmar return nil, errUnrecognizedEncoding } + telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + return &kafkaLogsConsumer{ config: config, topics: []string{config.Topic}, @@ -298,6 +324,7 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmar messageMarking: config.MessageMarking, headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, + telemetryBuilder: telemetryBuilder, }, nil } @@ -327,6 +354,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, _ component.Host) error { autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: c.telemetryBuilder, } if c.headerExtraction { logsConsumerGroup.headerExtractor = &headerExtractor{ @@ -379,7 +407,8 @@ type tracesConsumerGroupHandler struct { logger *zap.Logger - obsrecv *receiverhelper.ObsReport + obsrecv *receiverhelper.ObsReport + telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking @@ -395,7 +424,8 @@ type metricsConsumerGroupHandler struct { logger *zap.Logger - obsrecv *receiverhelper.ObsReport + obsrecv *receiverhelper.ObsReport + telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking @@ -411,7 +441,8 @@ type logsConsumerGroupHandler struct { logger *zap.Logger - obsrecv *receiverhelper.ObsReport + obsrecv *receiverhelper.ObsReport + telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking @@ -426,14 +457,12 @@ func (c *tracesConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) c.readyCloser.Do(func() { close(c.ready) }) - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.Name())} - _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1)) + c.telemetryBuilder.KafkaReceiverPartitionStart.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) return nil } func (c *tracesConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.Name())} - _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1)) + c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) return nil } @@ -457,22 +486,18 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe } ctx := c.obsrecv.StartTracesOp(session.Context()) - statsTags := []tag.Mutator{ - tag.Upsert(tagInstanceName, c.id.String()), - tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))), - } - _ = stats.RecordWithTags(ctx, statsTags, - statMessageCount.M(1), - statMessageOffset.M(message.Offset), - statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1)) + attrs := attribute.NewSet( + attribute.String(attrInstanceName, c.id.String()), + attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))), + ) + c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs)) + c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, message.Offset, metric.WithAttributeSet(attrs)) + c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs)) traces, err := c.unmarshaler.Unmarshal(message.Value) if err != nil { c.logger.Error("failed to unmarshal message", zap.Error(err)) - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}, - statUnmarshalFailedSpans.M(1)) + c.telemetryBuilder.KafkaReceiverUnmarshalFailedSpans.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } @@ -509,14 +534,12 @@ func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) c.readyCloser.Do(func() { close(c.ready) }) - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.Name())} - _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1)) + c.telemetryBuilder.KafkaReceiverPartitionStart.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) return nil } func (c *metricsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.Name())} - _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1)) + c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) return nil } @@ -540,22 +563,18 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS } ctx := c.obsrecv.StartMetricsOp(session.Context()) - statsTags := []tag.Mutator{ - tag.Upsert(tagInstanceName, c.id.String()), - tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))), - } - _ = stats.RecordWithTags(ctx, statsTags, - statMessageCount.M(1), - statMessageOffset.M(message.Offset), - statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1)) + attrs := attribute.NewSet( + attribute.String(attrInstanceName, c.id.String()), + attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))), + ) + c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs)) + c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, message.Offset, metric.WithAttributeSet(attrs)) + c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs)) metrics, err := c.unmarshaler.Unmarshal(message.Value) if err != nil { c.logger.Error("failed to unmarshal message", zap.Error(err)) - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}, - statUnmarshalFailedMetricPoints.M(1)) + c.telemetryBuilder.KafkaReceiverUnmarshalFailedMetricPoints.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } @@ -592,18 +611,12 @@ func (c *logsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) er c.readyCloser.Do(func() { close(c.ready) }) - _ = stats.RecordWithTags( - session.Context(), - []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}, - statPartitionStart.M(1)) + c.telemetryBuilder.KafkaReceiverPartitionStart.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) return nil } func (c *logsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { - _ = stats.RecordWithTags( - session.Context(), - []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}, - statPartitionClose.M(1)) + c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) return nil } @@ -627,24 +640,18 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess } ctx := c.obsrecv.StartLogsOp(session.Context()) - statsTags := []tag.Mutator{ - tag.Upsert(tagInstanceName, c.id.String()), - tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))), - } - _ = stats.RecordWithTags( - ctx, - statsTags, - statMessageCount.M(1), - statMessageOffset.M(message.Offset), - statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1)) + attrs := attribute.NewSet( + attribute.String(attrInstanceName, c.id.String()), + attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))), + ) + c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs)) + c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, message.Offset, metric.WithAttributeSet(attrs)) + c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs)) logs, err := c.unmarshaler.Unmarshal(message.Value) if err != nil { c.logger.Error("failed to unmarshal message", zap.Error(err)) - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}, - statUnmarshalFailedLogRecords.M(1)) + c.telemetryBuilder.KafkaReceiverUnmarshalFailedLogRecords.Add(ctx, 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 64a69ddeb78f..87c237d559ad 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -13,7 +13,6 @@ import ( "github.com/IBM/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/consumertest" @@ -23,6 +22,8 @@ import ( "go.opentelemetry.io/collector/pdata/testdata" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -30,6 +31,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" ) func TestNewTracesReceiver_version_err(t *testing.T) { @@ -92,9 +94,10 @@ func TestNewTracesReceiver_initial_offset_err(t *testing.T) { func TestTracesReceiverStart(t *testing.T) { c := kafkaTracesConsumer{ - nextConsumer: consumertest.NewNop(), - settings: receivertest.NewNopSettings(), - consumerGroup: &testConsumerGroup{}, + nextConsumer: consumertest.NewNop(), + settings: receivertest.NewNopSettings(), + consumerGroup: &testConsumerGroup{}, + telemetryBuilder: nopTelemetryBuilder(t), } require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) @@ -102,16 +105,20 @@ func TestTracesReceiverStart(t *testing.T) { } func TestTracesReceiverStartConsume(t *testing.T) { + telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings().TelemetrySettings) + require.NoError(t, err) c := kafkaTracesConsumer{ - nextConsumer: consumertest.NewNop(), - settings: receivertest.NewNopSettings(), - consumerGroup: &testConsumerGroup{}, + nextConsumer: consumertest.NewNop(), + settings: receivertest.NewNopSettings(), + consumerGroup: &testConsumerGroup{}, + telemetryBuilder: telemetryBuilder, } ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err := c.consumeLoop(ctx, &tracesConsumerGroupHandler{ - ready: make(chan bool), + err = c.consumeLoop(ctx, &tracesConsumerGroupHandler{ + ready: make(chan bool), + telemetryBuilder: telemetryBuilder, }) assert.EqualError(t, err, context.Canceled.Error()) } @@ -124,9 +131,10 @@ func TestTracesReceiver_error(t *testing.T) { expectedErr := errors.New("handler error") c := kafkaTracesConsumer{ - nextConsumer: consumertest.NewNop(), - settings: settings, - consumerGroup: &testConsumerGroup{err: expectedErr}, + nextConsumer: consumertest.NewNop(), + settings: settings, + consumerGroup: &testConsumerGroup{err: expectedErr}, + telemetryBuilder: nopTelemetryBuilder(t), } require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) @@ -137,38 +145,30 @@ func TestTracesReceiver_error(t *testing.T) { } func TestTracesConsumerGroupHandler(t *testing.T) { - view.Unregister(metricViews()...) - views := metricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: telemetryBuilder, } testSession := testConsumerGroupSession{ctx: context.Background()} require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 0) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 1) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -187,20 +187,20 @@ func TestTracesConsumerGroupHandler(t *testing.T) { } func TestTracesConsumerGroupHandler_session_done(t *testing.T) { - view.Unregister(metricViews()...) - views := metricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: telemetryBuilder, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -208,18 +208,10 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) { require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 0) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 1) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -241,13 +233,17 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) { func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: telemetryBuilder, } wg := sync.WaitGroup{} @@ -263,6 +259,73 @@ func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} close(groupClaim.messageChan) wg.Wait() + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "kafka_receiver_offset_lag", + Unit: "1", + Description: "Current offset lag", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 3, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), + }, + }, + }, + }, + { + Name: "kafka_receiver_current_offset", + Unit: "1", + Description: "Current message offset", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 0, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), + }, + }, + }, + }, + { + Name: "kafka_receiver_messages", + Unit: "1", + Description: "Number of received messages", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), + }, + }, + }, + }, + { + Name: "kafka_receiver_unmarshal_failed_spans", + Unit: "1", + Description: "Number of spans failed to be unmarshaled", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String("name", "")), + }, + }, + }, + }, + }) } func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { @@ -270,12 +333,13 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(consumerError), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), } wg := sync.WaitGroup{} @@ -358,16 +422,20 @@ func TestNewMetricsReceiver_initial_offset_err(t *testing.T) { } func TestMetricsReceiverStartConsume(t *testing.T) { + telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings().TelemetrySettings) + require.NoError(t, err) c := kafkaMetricsConsumer{ - nextConsumer: consumertest.NewNop(), - settings: receivertest.NewNopSettings(), - consumerGroup: &testConsumerGroup{}, + nextConsumer: consumertest.NewNop(), + settings: receivertest.NewNopSettings(), + consumerGroup: &testConsumerGroup{}, + telemetryBuilder: telemetryBuilder, } ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err := c.consumeLoop(ctx, &logsConsumerGroupHandler{ - ready: make(chan bool), + err = c.consumeLoop(ctx, &logsConsumerGroupHandler{ + ready: make(chan bool), + telemetryBuilder: telemetryBuilder, }) assert.EqualError(t, err, context.Canceled.Error()) } @@ -380,9 +448,10 @@ func TestMetricsReceiver_error(t *testing.T) { expectedErr := errors.New("handler error") c := kafkaMetricsConsumer{ - nextConsumer: consumertest.NewNop(), - settings: settings, - consumerGroup: &testConsumerGroup{err: expectedErr}, + nextConsumer: consumertest.NewNop(), + settings: settings, + consumerGroup: &testConsumerGroup{err: expectedErr}, + telemetryBuilder: nopTelemetryBuilder(t), } require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) @@ -393,38 +462,30 @@ func TestMetricsReceiver_error(t *testing.T) { } func TestMetricsConsumerGroupHandler(t *testing.T) { - view.Unregister(metricViews()...) - views := metricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: telemetryBuilder, } testSession := testConsumerGroupSession{ctx: context.Background()} require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 0) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 1) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -443,20 +504,20 @@ func TestMetricsConsumerGroupHandler(t *testing.T) { } func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { - view.Unregister(metricViews()...) - views := metricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: telemetryBuilder, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -464,18 +525,10 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 0) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 1) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -496,13 +549,17 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: telemetryBuilder, } wg := sync.WaitGroup{} @@ -518,6 +575,73 @@ func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} close(groupClaim.messageChan) wg.Wait() + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "kafka_receiver_offset_lag", + Unit: "1", + Description: "Current offset lag", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 3, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), + }, + }, + }, + }, + { + Name: "kafka_receiver_current_offset", + Unit: "1", + Description: "Current message offset", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 0, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), + }, + }, + }, + }, + { + Name: "kafka_receiver_messages", + Unit: "1", + Description: "Number of received messages", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), + }, + }, + }, + }, + { + Name: "kafka_receiver_unmarshal_failed_metric_points", + Unit: "1", + Description: "Number of metric points failed to be unmarshaled", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String("name", "")), + }, + }, + }, + }, + }) } func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) { @@ -525,12 +649,13 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(consumerError), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), } wg := sync.WaitGroup{} @@ -614,9 +739,10 @@ func TestNewLogsReceiver_initial_offset_err(t *testing.T) { func TestLogsReceiverStart(t *testing.T) { c := kafkaLogsConsumer{ - nextConsumer: consumertest.NewNop(), - settings: receivertest.NewNopSettings(), - consumerGroup: &testConsumerGroup{}, + nextConsumer: consumertest.NewNop(), + settings: receivertest.NewNopSettings(), + consumerGroup: &testConsumerGroup{}, + telemetryBuilder: nopTelemetryBuilder(t), } require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) @@ -624,16 +750,20 @@ func TestLogsReceiverStart(t *testing.T) { } func TestLogsReceiverStartConsume(t *testing.T) { + telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings().TelemetrySettings) + require.NoError(t, err) c := kafkaLogsConsumer{ - nextConsumer: consumertest.NewNop(), - settings: receivertest.NewNopSettings(), - consumerGroup: &testConsumerGroup{}, + nextConsumer: consumertest.NewNop(), + settings: receivertest.NewNopSettings(), + consumerGroup: &testConsumerGroup{}, + telemetryBuilder: telemetryBuilder, } ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err := c.consumeLoop(ctx, &logsConsumerGroupHandler{ - ready: make(chan bool), + err = c.consumeLoop(ctx, &logsConsumerGroupHandler{ + ready: make(chan bool), + telemetryBuilder: telemetryBuilder, }) assert.EqualError(t, err, context.Canceled.Error()) } @@ -646,10 +776,11 @@ func TestLogsReceiver_error(t *testing.T) { expectedErr := errors.New("handler error") c := kafkaLogsConsumer{ - nextConsumer: consumertest.NewNop(), - settings: settings, - consumerGroup: &testConsumerGroup{err: expectedErr}, - config: *createDefaultConfig().(*Config), + nextConsumer: consumertest.NewNop(), + settings: settings, + consumerGroup: &testConsumerGroup{err: expectedErr}, + config: *createDefaultConfig().(*Config), + telemetryBuilder: nopTelemetryBuilder(t), } require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) @@ -660,38 +791,30 @@ func TestLogsReceiver_error(t *testing.T) { } func TestLogsConsumerGroupHandler(t *testing.T) { - view.Unregister(metricViews()...) - views := metricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: telemetryBuilder, } testSession := testConsumerGroupSession{ctx: context.Background()} require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 0) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 1) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -710,20 +833,20 @@ func TestLogsConsumerGroupHandler(t *testing.T) { } func TestLogsConsumerGroupHandler_session_done(t *testing.T) { - view.Unregister(metricViews()...) - views := metricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: telemetryBuilder, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -731,18 +854,10 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) { require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 0) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) + assertInternalTelemetry(t, tel, 1, 1) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -763,13 +878,17 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) { func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + tel := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewSettings().TelemetrySettings) + require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: telemetryBuilder, } wg := sync.WaitGroup{} @@ -785,6 +904,73 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} close(groupClaim.messageChan) wg.Wait() + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "kafka_receiver_offset_lag", + Unit: "1", + Description: "Current offset lag", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 3, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), + }, + }, + }, + }, + { + Name: "kafka_receiver_current_offset", + Unit: "1", + Description: "Current message offset", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 0, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), + }, + }, + }, + }, + { + Name: "kafka_receiver_messages", + Unit: "1", + Description: "Number of received messages", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), + }, + }, + }, + }, + { + Name: "kafka_receiver_unmarshal_failed_log_records", + Unit: "1", + Description: "Number of log records failed to be unmarshaled", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet(attribute.String("name", "")), + }, + }, + }, + }, + }) } func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { @@ -792,12 +978,13 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(consumerError), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), } wg := sync.WaitGroup{} @@ -863,12 +1050,13 @@ func TestLogsConsumerGroupHandler_unmarshal_text(t *testing.T) { require.NoError(t, err) sink := &consumertest.LogsSink{} c := logsConsumerGroupHandler{ - unmarshaler: unmarshaler, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: sink, - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, + unmarshaler: unmarshaler, + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: sink, + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), } wg := sync.WaitGroup{} @@ -1072,3 +1260,48 @@ func (t *testConsumerGroup) Resume(_ map[string][]int32) { func (t *testConsumerGroup) ResumeAll() { panic("implement me") } + +func assertInternalTelemetry(t *testing.T, tel componentTestTelemetry, partitionStart, partitionClose int64) { + var wantMetrics []metricdata.Metrics + if partitionStart > 0 { + wantMetrics = append(wantMetrics, metricdata.Metrics{ + Name: "kafka_receiver_partition_start", + Unit: "1", + Description: "Number of started partitions", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: partitionStart, + Attributes: attribute.NewSet(attribute.String("name", "")), + }, + }, + }, + }) + } + if partitionClose > 0 { + wantMetrics = append(wantMetrics, metricdata.Metrics{ + Name: "kafka_receiver_partition_close", + Unit: "1", + Description: "Number of finished partitions", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: partitionClose, + Attributes: attribute.NewSet(attribute.String("name", "")), + }, + }, + }, + }) + } + tel.assertMetrics(t, wantMetrics) +} + +func nopTelemetryBuilder(t *testing.T) *metadata.TelemetryBuilder { + telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings().TelemetrySettings) + require.NoError(t, err) + return telemetryBuilder +} diff --git a/receiver/kafkareceiver/metadata.yaml b/receiver/kafkareceiver/metadata.yaml index 8ccc15c28141..cc9e7dc3b64d 100644 --- a/receiver/kafkareceiver/metadata.yaml +++ b/receiver/kafkareceiver/metadata.yaml @@ -14,8 +14,60 @@ status: # TODO: Update the receiver to pass the tests tests: skip_lifecycle: true - goleak: - ignore: - top: - # See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. - - "go.opencensus.io/stats/view.(*worker).start" + +telemetry: + metrics: + kafka_receiver_messages: + enabled: true + description: Number of received messages + unit: "1" + sum: + value_type: int + monotonic: true + kafka_receiver_current_offset: + enabled: true + description: Current message offset + unit: "1" + gauge: + value_type: int + kafka_receiver_offset_lag: + enabled: true + description: Current offset lag + unit: "1" + gauge: + value_type: int + kafka_receiver_partition_start: + enabled: true + description: Number of started partitions + unit: "1" + sum: + value_type: int + monotonic: true + kafka_receiver_partition_close: + enabled: true + description: Number of finished partitions + unit: "1" + sum: + value_type: int + monotonic: true + kafka_receiver_unmarshal_failed_metric_points: + enabled: true + description: Number of metric points failed to be unmarshaled + unit: "1" + sum: + value_type: int + monotonic: true + kafka_receiver_unmarshal_failed_log_records: + enabled: true + description: Number of log records failed to be unmarshaled + unit: "1" + sum: + value_type: int + monotonic: true + kafka_receiver_unmarshal_failed_spans: + enabled: true + description: Number of spans failed to be unmarshaled + unit: "1" + sum: + value_type: int + monotonic: true diff --git a/receiver/kafkareceiver/metrics.go b/receiver/kafkareceiver/metrics.go deleted file mode 100644 index c1dc6d3e7c3f..000000000000 --- a/receiver/kafkareceiver/metrics.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" - -import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" -) - -var ( - tagInstanceName, _ = tag.NewKey("name") - tagPartition, _ = tag.NewKey("partition") - - statMessageCount = stats.Int64("kafka_receiver_messages", "Number of received messages", stats.UnitDimensionless) - statMessageOffset = stats.Int64("kafka_receiver_current_offset", "Current message offset", stats.UnitDimensionless) - statMessageOffsetLag = stats.Int64("kafka_receiver_offset_lag", "Current offset lag", stats.UnitDimensionless) - - statPartitionStart = stats.Int64("kafka_receiver_partition_start", "Number of started partitions", stats.UnitDimensionless) - statPartitionClose = stats.Int64("kafka_receiver_partition_close", "Number of finished partitions", stats.UnitDimensionless) - - statUnmarshalFailedMetricPoints = stats.Int64("kafka_receiver_unmarshal_failed_metric_points", "Number of metric points failed to be unmarshaled", stats.UnitDimensionless) - statUnmarshalFailedLogRecords = stats.Int64("kafka_receiver_unmarshal_failed_log_records", "Number of log records failed to be unmarshaled", stats.UnitDimensionless) - statUnmarshalFailedSpans = stats.Int64("kafka_receiver_unmarshal_failed_spans", "Number of spans failed to be unmarshaled", stats.UnitDimensionless) -) - -// metricViews return metric views for Kafka receiver. -func metricViews() []*view.View { - partitionAgnosticTagKeys := []tag.Key{tagInstanceName} - partitionSpecificTagKeys := []tag.Key{tagInstanceName, tagPartition} - - countMessages := &view.View{ - Name: statMessageCount.Name(), - Measure: statMessageCount, - Description: statMessageCount.Description(), - TagKeys: partitionSpecificTagKeys, - Aggregation: view.Sum(), - } - - lastValueOffset := &view.View{ - Name: statMessageOffset.Name(), - Measure: statMessageOffset, - Description: statMessageOffset.Description(), - TagKeys: partitionSpecificTagKeys, - Aggregation: view.LastValue(), - } - - lastValueOffsetLag := &view.View{ - Name: statMessageOffsetLag.Name(), - Measure: statMessageOffsetLag, - Description: statMessageOffsetLag.Description(), - TagKeys: partitionSpecificTagKeys, - Aggregation: view.LastValue(), - } - - countPartitionStart := &view.View{ - Name: statPartitionStart.Name(), - Measure: statPartitionStart, - Description: statPartitionStart.Description(), - TagKeys: partitionAgnosticTagKeys, - Aggregation: view.Sum(), - } - - countPartitionClose := &view.View{ - Name: statPartitionClose.Name(), - Measure: statPartitionClose, - Description: statPartitionClose.Description(), - TagKeys: partitionAgnosticTagKeys, - Aggregation: view.Sum(), - } - - countUnmarshalFailedMetricPoints := &view.View{ - Name: statUnmarshalFailedMetricPoints.Name(), - Measure: statUnmarshalFailedMetricPoints, - Description: statUnmarshalFailedMetricPoints.Description(), - TagKeys: partitionAgnosticTagKeys, - Aggregation: view.Sum(), - } - - countUnmarshalFailedLogRecords := &view.View{ - Name: statUnmarshalFailedLogRecords.Name(), - Measure: statUnmarshalFailedLogRecords, - Description: statUnmarshalFailedLogRecords.Description(), - TagKeys: partitionAgnosticTagKeys, - Aggregation: view.Sum(), - } - - countUnmarshalFailedSpans := &view.View{ - Name: statUnmarshalFailedSpans.Name(), - Measure: statUnmarshalFailedSpans, - Description: statUnmarshalFailedSpans.Description(), - TagKeys: partitionAgnosticTagKeys, - Aggregation: view.Sum(), - } - - return []*view.View{ - countMessages, - lastValueOffset, - lastValueOffsetLag, - countPartitionStart, - countPartitionClose, - countUnmarshalFailedMetricPoints, - countUnmarshalFailedLogRecords, - countUnmarshalFailedSpans, - } -} diff --git a/receiver/kafkareceiver/metrics_test.go b/receiver/kafkareceiver/metrics_test.go deleted file mode 100644 index 27e533aa771a..000000000000 --- a/receiver/kafkareceiver/metrics_test.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package kafkareceiver - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -type expectedView struct { - name string - tagCount int -} - -func TestMetrics(t *testing.T) { - metricViews := metricViews() - viewNames := []expectedView{ - {name: "kafka_receiver_messages", tagCount: 2}, - {name: "kafka_receiver_current_offset", tagCount: 2}, - {name: "kafka_receiver_offset_lag", tagCount: 2}, - {name: "kafka_receiver_partition_start", tagCount: 1}, - {name: "kafka_receiver_partition_close", tagCount: 1}, - {name: "kafka_receiver_unmarshal_failed_metric_points", tagCount: 1}, - {name: "kafka_receiver_unmarshal_failed_log_records", tagCount: 1}, - {name: "kafka_receiver_unmarshal_failed_spans", tagCount: 1}, - } - - for i, expectedView := range viewNames { - assert.Equal(t, expectedView.name, metricViews[i].Name) - assert.Equal(t, expectedView.tagCount, len(metricViews[i].TagKeys)) - } -}