From 1380a2a33d7508ba38654cf404cc27a99241f69a Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Mon, 19 Feb 2024 15:47:52 +0700 Subject: [PATCH 01/15] Impelement partitioning for OTLP metrics --- ...afka-exporter-key-by-metric-resources.yaml | 27 ++++++++ exporter/kafkaexporter/README.md | 1 + exporter/kafkaexporter/config.go | 2 + exporter/kafkaexporter/config_test.go | 23 ++++--- exporter/kafkaexporter/kafka_exporter.go | 6 ++ exporter/kafkaexporter/marshaler_test.go | 44 +++++++++++++ exporter/kafkaexporter/pdata_marshaler.go | 62 +++++++++++++++---- exporter/kafkaexporter/testdata/config.yaml | 1 + .../coreinternal/resourceutil/resourceutil.go | 32 ++++++++++ .../resourceutil/resourceutil_test.go | 47 ++++++++++++++ 10 files changed, 222 insertions(+), 23 deletions(-) create mode 100644 .chloggen/kafka-exporter-key-by-metric-resources.yaml create mode 100644 internal/coreinternal/resourceutil/resourceutil.go create mode 100644 internal/coreinternal/resourceutil/resourceutil_test.go diff --git a/.chloggen/kafka-exporter-key-by-metric-resources.yaml b/.chloggen/kafka-exporter-key-by-metric-resources.yaml new file mode 100644 index 000000000000..bd815330efbb --- /dev/null +++ b/.chloggen/kafka-exporter-key-by-metric-resources.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkaexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add an ability to publish kafka messages with message key based on metric resource attributes - it will allow partitioning metrics in Kafka. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29433, 30666] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index cda2445c3e34..f9069307bc5f 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -40,6 +40,7 @@ The following settings can be optionally configured: - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. - `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. +- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. - `auth` - `plain_text` - `username`: The username to use. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index daa387bfb91f..baefca3ce23a 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -48,6 +48,8 @@ type Config struct { // trace ID as the message key by default. PartitionTracesByID bool `mapstructure:"partition_traces_by_id"` + PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"` + // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. Metadata Metadata `mapstructure:"metadata"` diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index bb94729b125c..99e4ee39689f 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -55,11 +55,12 @@ func TestLoadConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Topic: "spans", - Encoding: "otlp_proto", - PartitionTracesByID: true, - Brokers: []string{"foo:123", "bar:456"}, - ClientID: "test_client_id", + Topic: "spans", + Encoding: "otlp_proto", + PartitionTracesByID: true, + PartitionMetricsByResourceAttributes: true, + Brokers: []string{"foo:123", "bar:456"}, + ClientID: "test_client_id", Authentication: kafka.Authentication{ PlainText: &kafka.PlainTextConfig{ Username: "jdoe", @@ -109,11 +110,12 @@ func TestLoadConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Topic: "spans", - Encoding: "otlp_proto", - PartitionTracesByID: true, - Brokers: []string{"foo:123", "bar:456"}, - ClientID: "test_client_id", + Topic: "spans", + Encoding: "otlp_proto", + PartitionTracesByID: true, + PartitionMetricsByResourceAttributes: true, + Brokers: []string{"foo:123", "bar:456"}, + ClientID: "test_client_id", Authentication: kafka.Authentication{ PlainText: &kafka.PlainTextConfig{ Username: "jdoe", @@ -165,6 +167,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, + PartitionMetricsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", ResolveCanonicalBootstrapServersOnly: true, diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 47417f5ee4c9..69b2360bb40e 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -171,6 +171,12 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m if marshaler == nil { return nil, errUnrecognizedEncoding } + if config.PartitionMetricsByResourceAttributes { + if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok { + keyableMarshaler.Key() + } + } + producer, err := newSaramaProducer(config) if err != nil { return nil, err diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index 0d0cfba637f5..69d77339480e 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -6,6 +6,7 @@ package kafkaexporter import ( "encoding/json" "fmt" + "go.opentelemetry.io/collector/pdata/pmetric" "testing" "time" @@ -71,6 +72,49 @@ func TestDefaultLogsMarshalers(t *testing.T) { } } +func TestOTLPMetricsJsonMarshaling(t *testing.T) { + now := time.Unix(1, 0) + + metric := pmetric.NewMetrics() + r := pcommon.NewResource() + r.Attributes().PutStr("service.name", "my_service_name") + r.Attributes().PutStr("service.instance.id", "kek_x_1") + r.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) + + rm := metric.ResourceMetrics().At(0) + rm.SetSchemaUrl(conventions.SchemaURL) + + sm := rm.ScopeMetrics().AppendEmpty() + pmetric.NewScopeMetrics() + m := sm.Metrics().AppendEmpty() + m.SetEmptyGauge() + m.Gauge().DataPoints().AppendEmpty().SetStartTimestamp(pcommon.NewTimestampFromTime(now)) + m.Gauge().DataPoints().At(0).Attributes().PutStr("gauage_attribute", "attr") + m.Gauge().DataPoints().At(0).SetDoubleValue(1.0) + + r1 := pcommon.NewResource() + r1.Attributes().PutStr("service.name", "my_service_name") + r1.Attributes().PutStr("service.instance.id", "kek_x_2") + r1.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) + + standardMarshaler := metricsMarshalers()["otlp_json"] + msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX") + require.NoError(t, err, "Must have marshaled the data without error") + require.Len(t, msgs, 1, "Expected number of messages in the message") + require.Equal(t, nil, msgs[0].Key) + + keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler) + require.True(t, ok, "Must be a KeyableMetricsMarshaler") + keyableMarshaler.Key() + + msgs, err = keyableMarshaler.Marshal(metric, "KafkaTopicX") + require.NoError(t, err, "Must have marshaled the data without error") + require.Len(t, msgs, 2, "Expected number of messages in the message") + + require.Equal(t, sarama.ByteEncoder("90e74a8334a89993bd3f6ad05f9ca02438032a78d4399fb6fecf6c94fcdb13ef"), msgs[0].Key) + require.Equal(t, sarama.ByteEncoder("55e1113a2eace57b91ef58911d811c28e936365f03ac068e8ce23090d9ea748f"), msgs[1].Key) +} + func TestOTLPTracesJsonMarshaling(t *testing.T) { t.Parallel() diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index d9e38dd52caf..13a3e371e46d 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -5,12 +5,12 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect import ( "github.com/IBM/sarama" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/resourceutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" ) type pdataLogsMarshaler struct { @@ -42,22 +42,58 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha } } +// KeyableMetricsMarshaler is an extension of the MetricsMarshaler interface intended to provide partition key capabilities +// for metrics messages +type KeyableMetricsMarshaler interface { + MetricsMarshaler + Key() +} + type pdataMetricsMarshaler struct { marshaler pmetric.Marshaler encoding string + keyed bool +} + +// Key configures the pdataMetricsMarshaler to set the message key on the kafka messages +func (p *pdataMetricsMarshaler) Key() { + p.keyed = true } func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) { - bts, err := p.marshaler.MarshalMetrics(ld) - if err != nil { - return nil, err - } - return []*sarama.ProducerMessage{ - { + var msgs []*sarama.ProducerMessage + if p.keyed { + metrics := ld.ResourceMetrics() + + for i := 0; i < metrics.Len(); i++ { + resourceMetrics := metrics.At(i) + hash := resourceutil.CalculateResourceAttributesHash(resourceMetrics.Resource()) + + newMetrics := pmetric.NewMetrics() + resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty()) + + bts, err := p.marshaler.MarshalMetrics(newMetrics) + if err != nil { + return nil, err + } + msgs = append(msgs, &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(bts), + Key: sarama.ByteEncoder(hash), + }) + } + } else { + bts, err := p.marshaler.MarshalMetrics(ld) + if err != nil { + return nil, err + } + msgs = append(msgs, &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(bts), - }, - }, nil + }) + } + + return msgs, nil } func (p pdataMetricsMarshaler) Encoding() string { @@ -65,13 +101,13 @@ func (p pdataMetricsMarshaler) Encoding() string { } func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string) MetricsMarshaler { - return pdataMetricsMarshaler{ + return &pdataMetricsMarshaler{ marshaler: marshaler, encoding: encoding, } } -// KeyableTracesMarshaler is an extension of the TracesMarshaler interface inteded to provide partition key capabilities +// KeyableTracesMarshaler is an extension of the TracesMarshaler interface intended to provide partition key capabilities // for trace messages type KeyableTracesMarshaler interface { TracesMarshaler diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index 15624b521b10..7c89bea74ade 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -13,6 +13,7 @@ kafka: required_acks: -1 # WaitForAll timeout: 10s partition_traces_by_id: true + partition_metrics_by_resource_attributes: true auth: plain_text: username: jdoe diff --git a/internal/coreinternal/resourceutil/resourceutil.go b/internal/coreinternal/resourceutil/resourceutil.go new file mode 100644 index 000000000000..b2077bd5114d --- /dev/null +++ b/internal/coreinternal/resourceutil/resourceutil.go @@ -0,0 +1,32 @@ +package resourceutil + +import ( + "crypto/sha256" + "encoding/hex" + "go.opentelemetry.io/collector/pdata/pcommon" + "sort" +) + +type keyValueLabelPair struct { + Key string + Value string +} + +func CalculateResourceAttributesHash(resourceMetrics pcommon.Resource) string { + var pairs []keyValueLabelPair + resourceMetrics.Attributes().Range(func(k string, v pcommon.Value) bool { + pairs = append(pairs, keyValueLabelPair{Key: k, Value: v.AsString()}) + return true + }) + + sort.SliceStable(pairs, func(i, j int) bool { + return pairs[i].Key < pairs[j].Key + }) + + h := sha256.New() + for _, pair := range pairs { + h.Write([]byte(pair.Key)) + h.Write([]byte(pair.Value)) + } + return hex.EncodeToString(h.Sum(nil)) +} diff --git a/internal/coreinternal/resourceutil/resourceutil_test.go b/internal/coreinternal/resourceutil/resourceutil_test.go new file mode 100644 index 000000000000..565285806030 --- /dev/null +++ b/internal/coreinternal/resourceutil/resourceutil_test.go @@ -0,0 +1,47 @@ +package resourceutil + +import ( + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "testing" +) + +func TestHashEmptyResource(t *testing.T) { + r := pcommon.NewResource() + + assert.EqualValues(t, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", CalculateResourceAttributesHash(r)) +} + +func TestHashSimpleResource(t *testing.T) { + r := pcommon.NewResource() + r.Attributes().PutStr("k1", "v1") + r.Attributes().PutStr("k2", "v2") + + assert.EqualValues(t, "3590bbad8f8a328dbbd5d01c35d8a5fab92c3588cf7e468e995c31d45a51cbef", CalculateResourceAttributesHash(r)) +} + +func TestHashReorderedAttributes(t *testing.T) { + r1 := pcommon.NewResource() + r1.Attributes().PutStr("k1", "v1") + r1.Attributes().PutStr("k2", "v2") + + r2 := pcommon.NewResource() + r2.Attributes().PutStr("k2", "v2") + r2.Attributes().PutStr("k1", "v1") + + assert.EqualValues(t, CalculateResourceAttributesHash(r1), CalculateResourceAttributesHash(r2)) +} + +func TestHashDifferentAttributeValues(t *testing.T) { + r := pcommon.NewResource() + r.Attributes().PutBool("k1", false) + r.Attributes().PutDouble("k2", 1.0) + r.Attributes().PutEmpty("k3") + r.Attributes().PutEmptyBytes("k4") + r.Attributes().PutEmptyMap("k5") + r.Attributes().PutEmptySlice("k6") + r.Attributes().PutInt("k7", 1) + r.Attributes().PutStr("k8", "v8") + + assert.EqualValues(t, "46852adab1751045942d67dace7c88665ec0e68b7f4b81a33bb05e5b954a8e57", CalculateResourceAttributesHash(r)) +} From 82c9e6479c7e3d0132f11bfef0113baf31148b09 Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Mon, 18 Mar 2024 13:47:27 +0700 Subject: [PATCH 02/15] 1 --- internal/coreinternal/resourceutil/resourceutil.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/coreinternal/resourceutil/resourceutil.go b/internal/coreinternal/resourceutil/resourceutil.go index b2077bd5114d..9d0e2e9d191f 100644 --- a/internal/coreinternal/resourceutil/resourceutil.go +++ b/internal/coreinternal/resourceutil/resourceutil.go @@ -1,4 +1,4 @@ -package resourceutil +package resourceutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/resourceutil" import ( "crypto/sha256" From deca15570fa5771759e9764f6150a792829d3b60 Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Wed, 20 Mar 2024 16:05:56 +0700 Subject: [PATCH 03/15] Use existing hash function --- exporter/kafkaexporter/marshaler_test.go | 6 +-- exporter/kafkaexporter/pdata_marshaler.go | 6 +-- .../coreinternal/resourceutil/resourceutil.go | 32 ------------- .../resourceutil/resourceutil_test.go | 47 ------------------- 4 files changed, 6 insertions(+), 85 deletions(-) delete mode 100644 internal/coreinternal/resourceutil/resourceutil.go delete mode 100644 internal/coreinternal/resourceutil/resourceutil_test.go diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index 69d77339480e..4b29b10252d8 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -93,8 +93,8 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { m.Gauge().DataPoints().At(0).SetDoubleValue(1.0) r1 := pcommon.NewResource() - r1.Attributes().PutStr("service.name", "my_service_name") r1.Attributes().PutStr("service.instance.id", "kek_x_2") + r1.Attributes().PutStr("service.name", "my_service_name") r1.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) standardMarshaler := metricsMarshalers()["otlp_json"] @@ -111,8 +111,8 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { require.NoError(t, err, "Must have marshaled the data without error") require.Len(t, msgs, 2, "Expected number of messages in the message") - require.Equal(t, sarama.ByteEncoder("90e74a8334a89993bd3f6ad05f9ca02438032a78d4399fb6fecf6c94fcdb13ef"), msgs[0].Key) - require.Equal(t, sarama.ByteEncoder("55e1113a2eace57b91ef58911d811c28e936365f03ac068e8ce23090d9ea748f"), msgs[1].Key) + require.Equal(t, sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16}, msgs[0].Key) + require.Equal(t, sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44}, msgs[1].Key) } func TestOTLPTracesJsonMarshaling(t *testing.T) { diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 13a3e371e46d..415546e6b205 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -5,9 +5,9 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect import ( "github.com/IBM/sarama" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/resourceutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -67,7 +67,7 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar for i := 0; i < metrics.Len(); i++ { resourceMetrics := metrics.At(i) - hash := resourceutil.CalculateResourceAttributesHash(resourceMetrics.Resource()) + var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) newMetrics := pmetric.NewMetrics() resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty()) @@ -79,7 +79,7 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar msgs = append(msgs, &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(bts), - Key: sarama.ByteEncoder(hash), + Key: sarama.ByteEncoder(hash[:]), }) } } else { diff --git a/internal/coreinternal/resourceutil/resourceutil.go b/internal/coreinternal/resourceutil/resourceutil.go deleted file mode 100644 index 9d0e2e9d191f..000000000000 --- a/internal/coreinternal/resourceutil/resourceutil.go +++ /dev/null @@ -1,32 +0,0 @@ -package resourceutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/resourceutil" - -import ( - "crypto/sha256" - "encoding/hex" - "go.opentelemetry.io/collector/pdata/pcommon" - "sort" -) - -type keyValueLabelPair struct { - Key string - Value string -} - -func CalculateResourceAttributesHash(resourceMetrics pcommon.Resource) string { - var pairs []keyValueLabelPair - resourceMetrics.Attributes().Range(func(k string, v pcommon.Value) bool { - pairs = append(pairs, keyValueLabelPair{Key: k, Value: v.AsString()}) - return true - }) - - sort.SliceStable(pairs, func(i, j int) bool { - return pairs[i].Key < pairs[j].Key - }) - - h := sha256.New() - for _, pair := range pairs { - h.Write([]byte(pair.Key)) - h.Write([]byte(pair.Value)) - } - return hex.EncodeToString(h.Sum(nil)) -} diff --git a/internal/coreinternal/resourceutil/resourceutil_test.go b/internal/coreinternal/resourceutil/resourceutil_test.go deleted file mode 100644 index 565285806030..000000000000 --- a/internal/coreinternal/resourceutil/resourceutil_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package resourceutil - -import ( - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/pdata/pcommon" - "testing" -) - -func TestHashEmptyResource(t *testing.T) { - r := pcommon.NewResource() - - assert.EqualValues(t, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", CalculateResourceAttributesHash(r)) -} - -func TestHashSimpleResource(t *testing.T) { - r := pcommon.NewResource() - r.Attributes().PutStr("k1", "v1") - r.Attributes().PutStr("k2", "v2") - - assert.EqualValues(t, "3590bbad8f8a328dbbd5d01c35d8a5fab92c3588cf7e468e995c31d45a51cbef", CalculateResourceAttributesHash(r)) -} - -func TestHashReorderedAttributes(t *testing.T) { - r1 := pcommon.NewResource() - r1.Attributes().PutStr("k1", "v1") - r1.Attributes().PutStr("k2", "v2") - - r2 := pcommon.NewResource() - r2.Attributes().PutStr("k2", "v2") - r2.Attributes().PutStr("k1", "v1") - - assert.EqualValues(t, CalculateResourceAttributesHash(r1), CalculateResourceAttributesHash(r2)) -} - -func TestHashDifferentAttributeValues(t *testing.T) { - r := pcommon.NewResource() - r.Attributes().PutBool("k1", false) - r.Attributes().PutDouble("k2", 1.0) - r.Attributes().PutEmpty("k3") - r.Attributes().PutEmptyBytes("k4") - r.Attributes().PutEmptyMap("k5") - r.Attributes().PutEmptySlice("k6") - r.Attributes().PutInt("k7", 1) - r.Attributes().PutStr("k8", "v8") - - assert.EqualValues(t, "46852adab1751045942d67dace7c88665ec0e68b7f4b81a33bb05e5b954a8e57", CalculateResourceAttributesHash(r)) -} From ccd1fefac8d0a69aed97930239a80837aa1d5639 Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Wed, 20 Mar 2024 16:08:22 +0700 Subject: [PATCH 04/15] Add github issue --- .chloggen/kafka-exporter-key-by-metric-resources.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/kafka-exporter-key-by-metric-resources.yaml b/.chloggen/kafka-exporter-key-by-metric-resources.yaml index bd815330efbb..21207a81524c 100644 --- a/.chloggen/kafka-exporter-key-by-metric-resources.yaml +++ b/.chloggen/kafka-exporter-key-by-metric-resources.yaml @@ -10,7 +10,7 @@ component: kafkaexporter note: add an ability to publish kafka messages with message key based on metric resource attributes - it will allow partitioning metrics in Kafka. # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [29433, 30666] +issues: [29433, 30666, 31675] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. From 25c61a51407c6067ae9c23a8f9f565cdfdaf67ca Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Sun, 24 Mar 2024 10:31:27 +0700 Subject: [PATCH 05/15] add partitioning by specific tags --- exporter/kafkaexporter/README.md | 6 +- exporter/kafkaexporter/config.go | 10 +- exporter/kafkaexporter/factory.go | 6 + exporter/kafkaexporter/kafka_exporter.go | 4 +- exporter/kafkaexporter/marshaler_test.go | 2 +- exporter/kafkaexporter/pdata_marshaler.go | 19 ++- exporter/kafkaexporter/testdata/config.yaml | 6 +- pkg/pdatautil/hash.go | 20 ++++ pkg/pdatautil/hash_test.go | 124 ++++++++++++++++++++ 9 files changed, 185 insertions(+), 12 deletions(-) diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index f9069307bc5f..730be96df811 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -40,7 +40,11 @@ The following settings can be optionally configured: - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. - `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. -- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. +- `partition_metrics_by_resource_attributes` configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. + - `enabled`: (default = false) + - `attributes`: (default = []) the list of resource attributes to include in the partitioning key. If empty, all resource attributes are included. + - `service.name` + - `service.instance.id` - `auth` - `plain_text` - `username`: The username to use. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index baefca3ce23a..5cf7fa403cee 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -48,7 +48,7 @@ type Config struct { // trace ID as the message key by default. PartitionTracesByID bool `mapstructure:"partition_traces_by_id"` - PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"` + PartitionMetricsByResourceAttributes PartitionByResourceAttributes `mapstructure:"partition_metrics_by_resource_attributes"` // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. @@ -61,6 +61,14 @@ type Config struct { Authentication kafka.Authentication `mapstructure:"auth"` } +// PartitionByResourceAttributes defines configuration for partitioning by resource attributes. +type PartitionByResourceAttributes struct { + Enabled bool `mapstructure:"enabled"` + + // The list of resource attributes to use for partitioning, empty by default + Attributes []string `mapstructure:"attributes"` +} + // Metadata defines configuration for retrieving metadata from the broker. type Metadata struct { // Whether to maintain a full set of metadata for all topics, or just diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index e822b0a005f4..0909f6884abb 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -38,6 +38,8 @@ const ( defaultCompression = "none" // default from sarama.NewConfig() defaultFluxMaxMessages = 0 + // partitioning metrics by resource attributes is disabled by default + defaultPartitionMetricsByResourceAttributesEnabled = false ) // FactoryOption applies changes to kafkaExporterFactory. @@ -99,6 +101,10 @@ func createDefaultConfig() component.Config { // using an empty topic to track when it has not been set by user, default is based on traces or metrics. Topic: "", Encoding: defaultEncoding, + PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{ + Enabled: defaultPartitionMetricsByResourceAttributesEnabled, + Attributes: []string{}, + }, Metadata: Metadata{ Full: defaultMetadataFull, Retry: MetadataRetry{ diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index c6b82dcfc66a..0f26df3bc9e9 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -211,9 +211,9 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m if marshaler == nil { return nil, errUnrecognizedEncoding } - if config.PartitionMetricsByResourceAttributes { + if config.PartitionMetricsByResourceAttributes.Enabled { if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok { - keyableMarshaler.Key() + keyableMarshaler.Key(config.PartitionMetricsByResourceAttributes.Attributes) } } diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index 4b29b10252d8..fc1dc1aa5840 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -105,7 +105,7 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler) require.True(t, ok, "Must be a KeyableMetricsMarshaler") - keyableMarshaler.Key() + keyableMarshaler.Key([]string{}) msgs, err = keyableMarshaler.Marshal(metric, "KafkaTopicX") require.NoError(t, err, "Must have marshaled the data without error") diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 415546e6b205..e619fb1d6b39 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -46,18 +46,20 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha // for metrics messages type KeyableMetricsMarshaler interface { MetricsMarshaler - Key() + Key(attributes []string) } type pdataMetricsMarshaler struct { - marshaler pmetric.Marshaler - encoding string - keyed bool + marshaler pmetric.Marshaler + encoding string + keyed bool + keyAttributes []string } // Key configures the pdataMetricsMarshaler to set the message key on the kafka messages -func (p *pdataMetricsMarshaler) Key() { +func (p *pdataMetricsMarshaler) Key(attributes []string) { p.keyed = true + p.keyAttributes = attributes } func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) { @@ -67,7 +69,12 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar for i := 0; i < metrics.Len(); i++ { resourceMetrics := metrics.At(i) - var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) + var hash [16]byte + if len(p.keyAttributes) > 0 { + hash = pdatautil.MapHashSelectedKeys(resourceMetrics.Resource().Attributes(), p.keyAttributes) + } else { + hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) + } newMetrics := pmetric.NewMetrics() resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty()) diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index 7c89bea74ade..a6cfadd4faf5 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -13,7 +13,11 @@ kafka: required_acks: -1 # WaitForAll timeout: 10s partition_traces_by_id: true - partition_metrics_by_resource_attributes: true + partition_metrics_by_resource_attributes: + enabled: true + attributes: + - service.name + - service.instance.id auth: plain_text: username: jdoe diff --git a/pkg/pdatautil/hash.go b/pkg/pdatautil/hash.go index 650258445b4c..22ffb8880086 100644 --- a/pkg/pdatautil/hash.go +++ b/pkg/pdatautil/hash.go @@ -63,6 +63,26 @@ func MapHash(m pcommon.Map) [16]byte { return hw.hashSum128() } +// MapHashSelectedKeys return a hash for the provided map, using values of only provided keys. +// Order of hash calculation is determined by the order of keys. +func MapHashSelectedKeys(m pcommon.Map, keys []string) [16]byte { + if m.Len() == 0 || len(keys) == 0 { + return emptyHash + } + + hw := hashWriterPool.Get().(*hashWriter) + defer hashWriterPool.Put(hw) + hw.byteBuf = hw.byteBuf[:0] + + for _, k := range keys { + if v, ok := m.Get(k); ok { + hw.writeValueHash(v) + } + } + + return hw.hashSum128() +} + // ValueHash return a hash for the provided pcommon.Value. func ValueHash(v pcommon.Value) [16]byte { hw := hashWriterPool.Get().(*hashWriter) diff --git a/pkg/pdatautil/hash_test.go b/pkg/pdatautil/hash_test.go index a725676f37ac..453bdb4e22c8 100644 --- a/pkg/pdatautil/hash_test.go +++ b/pkg/pdatautil/hash_test.go @@ -150,6 +150,130 @@ func TestMapHash(t *testing.T) { } } +func TestMapHashSelectedKeys(t *testing.T) { + tests := []struct { + name string + maps []pcommon.Map + keys [][]string + equal bool + }{ + { + name: "empty_maps", + maps: []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}, + keys: [][]string{{}, {}}, + equal: true, + }, + { + name: "same_maps_different_order", + keys: [][]string{{"k1", "k2"}, {"k1", "k2"}}, + maps: func() []pcommon.Map { + m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} + m[0].PutStr("k1", "v1") + m[0].PutInt("k2", 1) + m[0].PutDouble("k3", 1) + m[0].PutBool("k4", true) + m[0].PutEmptyBytes("k5").FromRaw([]byte("abc")) + sl := m[0].PutEmptySlice("k6") + sl.AppendEmpty().SetStr("str") + sl.AppendEmpty().SetBool(true) + m0 := m[0].PutEmptyMap("k") + m0.PutInt("k1", 1) + m0.PutDouble("k2", 10) + + m1 := m[1].PutEmptyMap("k") + m1.PutDouble("k2", 10) + m1.PutInt("k1", 1) + m[1].PutEmptyBytes("k5").FromRaw([]byte("abc")) + m[1].PutBool("k4", true) + sl = m[1].PutEmptySlice("k6") + sl.AppendEmpty().SetStr("str") + sl.AppendEmpty().SetBool(true) + m[1].PutInt("k2", 1) + m[1].PutStr("k1", "v1") + m[1].PutDouble("k3", 1) + + return m + }(), + equal: true, + }, + { + name: "different_maps_having_same_keys", + keys: [][]string{{"k2", "k3"}, {"k2", "k3"}}, + maps: func() []pcommon.Map { + m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} + m[0].PutStr("k1", "v1") + m[0].PutInt("k2", 1) + m[0].PutDouble("k3", 1) + + m[1].PutInt("k2", 1) + m[1].PutDouble("k3", 1) + m[1].PutDouble("k4", 1) + + return m + }(), + equal: true, + }, + { + name: "same_maps_different_key_order", + keys: [][]string{{"k2", "k3"}, {"k3", "k2"}}, + maps: func() []pcommon.Map { + m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} + m[0].PutInt("k2", 1) + m[0].PutDouble("k3", 1) + + m[1].PutInt("k2", 1) + m[1].PutDouble("k3", 1) + + return m + }(), + equal: false, + }, + { + name: "same_maps_with_not_existing_keys", + keys: [][]string{{"k10", "k11"}, {"k10", "k11"}}, + maps: func() []pcommon.Map { + m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} + m[0].PutStr("k1", "v1") + + m[1].PutInt("k2", 1) + + return m + }(), + equal: true, + }, + { + name: "different_maps", + keys: [][]string{{"k2", "k3"}, {"k2", "k3"}}, + maps: func() []pcommon.Map { + m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} + m[0].PutInt("k2", 2) + m[0].PutDouble("k3", 2) + + m[1].PutInt("k2", 1) + m[1].PutDouble("k3", 1) + + return m + }(), + equal: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for i := 0; i < len(tt.maps); i++ { + for j := i + 1; j < len(tt.maps); j++ { + if tt.equal { + assert.Equal(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), + "maps %d %v and %d %v must have the same hash, then calculated with keys %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) + } else { + assert.NotEqual(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), + "maps %d %v and %d %v must have the different hashes, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) + } + } + } + }) + } +} + func TestValueHash(t *testing.T) { tests := []struct { name string From 9b4bea7660f4d44017b4d6eae3efb69b48900d1d Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Sun, 24 Mar 2024 12:33:36 +0700 Subject: [PATCH 06/15] Add more tests --- exporter/kafkaexporter/config_test.go | 6 +- exporter/kafkaexporter/marshaler_test.go | 127 ++++++++++++++------ exporter/kafkaexporter/testdata/config.yaml | 4 +- 3 files changed, 92 insertions(+), 45 deletions(-) diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index 99e4ee39689f..c77bf6551a43 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -58,7 +58,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, - PartitionMetricsByResourceAttributes: true, + PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true}, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", Authentication: kafka.Authentication{ @@ -113,7 +113,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, - PartitionMetricsByResourceAttributes: true, + PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true}, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", Authentication: kafka.Authentication{ @@ -167,7 +167,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, - PartitionMetricsByResourceAttributes: true, + PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true}, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", ResolveCanonicalBootstrapServersOnly: true, diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index fc1dc1aa5840..a11506631706 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -73,46 +73,93 @@ func TestDefaultLogsMarshalers(t *testing.T) { } func TestOTLPMetricsJsonMarshaling(t *testing.T) { - now := time.Unix(1, 0) - - metric := pmetric.NewMetrics() - r := pcommon.NewResource() - r.Attributes().PutStr("service.name", "my_service_name") - r.Attributes().PutStr("service.instance.id", "kek_x_1") - r.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) - - rm := metric.ResourceMetrics().At(0) - rm.SetSchemaUrl(conventions.SchemaURL) - - sm := rm.ScopeMetrics().AppendEmpty() - pmetric.NewScopeMetrics() - m := sm.Metrics().AppendEmpty() - m.SetEmptyGauge() - m.Gauge().DataPoints().AppendEmpty().SetStartTimestamp(pcommon.NewTimestampFromTime(now)) - m.Gauge().DataPoints().At(0).Attributes().PutStr("gauage_attribute", "attr") - m.Gauge().DataPoints().At(0).SetDoubleValue(1.0) - - r1 := pcommon.NewResource() - r1.Attributes().PutStr("service.instance.id", "kek_x_2") - r1.Attributes().PutStr("service.name", "my_service_name") - r1.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) - - standardMarshaler := metricsMarshalers()["otlp_json"] - msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX") - require.NoError(t, err, "Must have marshaled the data without error") - require.Len(t, msgs, 1, "Expected number of messages in the message") - require.Equal(t, nil, msgs[0].Key) - - keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler) - require.True(t, ok, "Must be a KeyableMetricsMarshaler") - keyableMarshaler.Key([]string{}) - - msgs, err = keyableMarshaler.Marshal(metric, "KafkaTopicX") - require.NoError(t, err, "Must have marshaled the data without error") - require.Len(t, msgs, 2, "Expected number of messages in the message") - - require.Equal(t, sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16}, msgs[0].Key) - require.Equal(t, sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44}, msgs[1].Key) + tests := []struct { + name string + keyEnabled bool + attributes []string + messagePartitionKeys []sarama.Encoder + }{ + { + name: "partitioning_disabled", + keyEnabled: false, + attributes: []string{}, + messagePartitionKeys: []sarama.Encoder{nil}, + }, + { + name: "partitioning_disabled_keys_are_not_empty", + keyEnabled: false, + attributes: []string{"service.name"}, + messagePartitionKeys: []sarama.Encoder{nil}, + }, + { + name: "partitioning_enabled", + keyEnabled: true, + attributes: []string{}, + messagePartitionKeys: []sarama.Encoder{ + sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16}, + sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44}, + }, + }, + { + name: "partitioning_enabled_with_keys", + keyEnabled: true, + attributes: []string{"service.instance.id"}, + messagePartitionKeys: []sarama.Encoder{ + sarama.ByteEncoder{0xf9, 0x1e, 0x59, 0x41, 0xb5, 0x16, 0xfa, 0xdf, 0xc1, 0x79, 0xa3, 0x54, 0x68, 0x1d, 0xb6, 0xc8}, + sarama.ByteEncoder{0x47, 0xac, 0xe2, 0x30, 0xd, 0x72, 0xd1, 0x82, 0xa5, 0xd, 0xe3, 0xa4, 0x64, 0xd3, 0x6b, 0xb5}, + }, + }, + { + name: "partitioning_enabled_keys_do_not_exist", + keyEnabled: true, + attributes: []string{"non_existing_key"}, + messagePartitionKeys: []sarama.Encoder{ + sarama.ByteEncoder{0x99, 0xe9, 0xd8, 0x51, 0x37, 0xdb, 0x46, 0xef, 0xfe, 0x7c, 0x8e, 0x2d, 0x85, 0x35, 0xce, 0xeb}, + sarama.ByteEncoder{0x99, 0xe9, 0xd8, 0x51, 0x37, 0xdb, 0x46, 0xef, 0xfe, 0x7c, 0x8e, 0x2d, 0x85, 0x35, 0xce, 0xeb}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := pmetric.NewMetrics() + r := pcommon.NewResource() + r.Attributes().PutStr("service.name", "my_service_name") + r.Attributes().PutStr("service.instance.id", "kek_x_1") + r.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) + + rm := metric.ResourceMetrics().At(0) + rm.SetSchemaUrl(conventions.SchemaURL) + + sm := rm.ScopeMetrics().AppendEmpty() + pmetric.NewScopeMetrics() + m := sm.Metrics().AppendEmpty() + m.SetEmptyGauge() + m.Gauge().DataPoints().AppendEmpty().SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(1, 0))) + m.Gauge().DataPoints().At(0).Attributes().PutStr("gauage_attribute", "attr") + m.Gauge().DataPoints().At(0).SetDoubleValue(1.0) + + r1 := pcommon.NewResource() + r1.Attributes().PutStr("service.instance.id", "kek_x_2") + r1.Attributes().PutStr("service.name", "my_service_name") + r1.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) + + standardMarshaler := metricsMarshalers()["otlp_json"] + keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler) + require.True(t, ok, "Must be a KeyableMetricsMarshaler") + if tt.keyEnabled { + keyableMarshaler.Key(tt.attributes) + } + + msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX") + require.NoError(t, err, "Must have marshaled the data without error") + + require.Len(t, msgs, len(tt.messagePartitionKeys), "Number of messages must be %d, but was %d", len(tt.messagePartitionKeys), len(msgs)) + + for i := 0; i < len(tt.messagePartitionKeys); i++ { + require.Equal(t, tt.messagePartitionKeys[i], msgs[i].Key, "message %d has incorrect key", i) + } + }) + } } func TestOTLPTracesJsonMarshaling(t *testing.T) { diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index a6cfadd4faf5..3ceaa60a741b 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -16,8 +16,8 @@ kafka: partition_metrics_by_resource_attributes: enabled: true attributes: - - service.name - - service.instance.id + - k1 + - k2 auth: plain_text: username: jdoe From 244ca0385afdcac9e5745356162b3d7e9f6d9c48 Mon Sep 17 00:00:00 2001 From: Evgeniy Zuikin Date: Tue, 26 Mar 2024 07:46:28 +0700 Subject: [PATCH 07/15] Update exporter/kafkaexporter/README.md Co-authored-by: Curtis Robert --- exporter/kafkaexporter/README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index 730be96df811..18bd214e1bff 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -43,8 +43,6 @@ The following settings can be optionally configured: - `partition_metrics_by_resource_attributes` configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. - `enabled`: (default = false) - `attributes`: (default = []) the list of resource attributes to include in the partitioning key. If empty, all resource attributes are included. - - `service.name` - - `service.instance.id` - `auth` - `plain_text` - `username`: The username to use. From b4e20e91b73cbd9dbca9911e4835be3fb3f7ffa3 Mon Sep 17 00:00:00 2001 From: Evgeniy Zuikin Date: Tue, 26 Mar 2024 07:54:39 +0700 Subject: [PATCH 08/15] Update pkg/pdatautil/hash_test.go Co-authored-by: Curtis Robert --- pkg/pdatautil/hash_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pdatautil/hash_test.go b/pkg/pdatautil/hash_test.go index 453bdb4e22c8..e15c74e7fa8b 100644 --- a/pkg/pdatautil/hash_test.go +++ b/pkg/pdatautil/hash_test.go @@ -266,7 +266,7 @@ func TestMapHashSelectedKeys(t *testing.T) { "maps %d %v and %d %v must have the same hash, then calculated with keys %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) } else { assert.NotEqual(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), - "maps %d %v and %d %v must have the different hashes, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) + "maps %d %v and %d %v must have different hashes, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) } } } From 925ed8be0b05edf3baad2bb9e72c913fbd9cf00a Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Tue, 26 Mar 2024 07:55:11 +0700 Subject: [PATCH 09/15] add missed %v --- pkg/pdatautil/hash_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pdatautil/hash_test.go b/pkg/pdatautil/hash_test.go index e15c74e7fa8b..1cb655762a74 100644 --- a/pkg/pdatautil/hash_test.go +++ b/pkg/pdatautil/hash_test.go @@ -263,7 +263,7 @@ func TestMapHashSelectedKeys(t *testing.T) { for j := i + 1; j < len(tt.maps); j++ { if tt.equal { assert.Equal(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), - "maps %d %v and %d %v must have the same hash, then calculated with keys %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) + "maps %d %v and %d %v must have the same hash, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) } else { assert.NotEqual(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), "maps %d %v and %d %v must have different hashes, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) From b66537aab1f8dbecd25295bf630526edefd54bc4 Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Fri, 26 Apr 2024 18:43:02 +0700 Subject: [PATCH 10/15] rollback partitioning by specific resource attributes --- exporter/kafkaexporter/README.md | 4 +- exporter/kafkaexporter/config.go | 10 +- exporter/kafkaexporter/config_test.go | 6 +- exporter/kafkaexporter/factory.go | 9 +- exporter/kafkaexporter/kafka_exporter.go | 4 +- exporter/kafkaexporter/marshaler_test.go | 29 +---- exporter/kafkaexporter/pdata_marshaler.go | 19 +-- exporter/kafkaexporter/testdata/config.yaml | 6 +- pkg/pdatautil/hash.go | 20 ---- pkg/pdatautil/hash_test.go | 124 -------------------- 10 files changed, 18 insertions(+), 213 deletions(-) diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index a1b9f718d297..2ac969d72edf 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -36,9 +36,7 @@ The following settings can be optionally configured: - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. - `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. -- `partition_metrics_by_resource_attributes` configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. - - `enabled`: (default = false) - - `attributes`: (default = []) the list of resource attributes to include in the partitioning key. If empty, all resource attributes are included. +- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. - `auth` - `plain_text` - `username`: The username to use. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index 5cf7fa403cee..baefca3ce23a 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -48,7 +48,7 @@ type Config struct { // trace ID as the message key by default. PartitionTracesByID bool `mapstructure:"partition_traces_by_id"` - PartitionMetricsByResourceAttributes PartitionByResourceAttributes `mapstructure:"partition_metrics_by_resource_attributes"` + PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"` // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. @@ -61,14 +61,6 @@ type Config struct { Authentication kafka.Authentication `mapstructure:"auth"` } -// PartitionByResourceAttributes defines configuration for partitioning by resource attributes. -type PartitionByResourceAttributes struct { - Enabled bool `mapstructure:"enabled"` - - // The list of resource attributes to use for partitioning, empty by default - Attributes []string `mapstructure:"attributes"` -} - // Metadata defines configuration for retrieving metadata from the broker. type Metadata struct { // Whether to maintain a full set of metadata for all topics, or just diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index 9e4dba77def4..4b43f948ebe3 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -58,7 +58,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, - PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true}, + PartitionMetricsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", Authentication: kafka.Authentication{ @@ -113,7 +113,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, - PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true}, + PartitionMetricsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", Authentication: kafka.Authentication{ @@ -167,7 +167,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, - PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true}, + PartitionMetricsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", ResolveCanonicalBootstrapServersOnly: true, diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index 0909f6884abb..d990a17dab8d 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -99,12 +99,9 @@ func createDefaultConfig() component.Config { Brokers: []string{defaultBroker}, ClientID: defaultClientID, // using an empty topic to track when it has not been set by user, default is based on traces or metrics. - Topic: "", - Encoding: defaultEncoding, - PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{ - Enabled: defaultPartitionMetricsByResourceAttributesEnabled, - Attributes: []string{}, - }, + Topic: "", + Encoding: defaultEncoding, + PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled, Metadata: Metadata{ Full: defaultMetadataFull, Retry: MetadataRetry{ diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 0f26df3bc9e9..c6b82dcfc66a 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -211,9 +211,9 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m if marshaler == nil { return nil, errUnrecognizedEncoding } - if config.PartitionMetricsByResourceAttributes.Enabled { + if config.PartitionMetricsByResourceAttributes { if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok { - keyableMarshaler.Key(config.PartitionMetricsByResourceAttributes.Attributes) + keyableMarshaler.Key() } } diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index a11506631706..97b110fd945c 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -76,48 +76,21 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { tests := []struct { name string keyEnabled bool - attributes []string messagePartitionKeys []sarama.Encoder }{ { name: "partitioning_disabled", keyEnabled: false, - attributes: []string{}, - messagePartitionKeys: []sarama.Encoder{nil}, - }, - { - name: "partitioning_disabled_keys_are_not_empty", - keyEnabled: false, - attributes: []string{"service.name"}, messagePartitionKeys: []sarama.Encoder{nil}, }, { name: "partitioning_enabled", keyEnabled: true, - attributes: []string{}, messagePartitionKeys: []sarama.Encoder{ sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16}, sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44}, }, }, - { - name: "partitioning_enabled_with_keys", - keyEnabled: true, - attributes: []string{"service.instance.id"}, - messagePartitionKeys: []sarama.Encoder{ - sarama.ByteEncoder{0xf9, 0x1e, 0x59, 0x41, 0xb5, 0x16, 0xfa, 0xdf, 0xc1, 0x79, 0xa3, 0x54, 0x68, 0x1d, 0xb6, 0xc8}, - sarama.ByteEncoder{0x47, 0xac, 0xe2, 0x30, 0xd, 0x72, 0xd1, 0x82, 0xa5, 0xd, 0xe3, 0xa4, 0x64, 0xd3, 0x6b, 0xb5}, - }, - }, - { - name: "partitioning_enabled_keys_do_not_exist", - keyEnabled: true, - attributes: []string{"non_existing_key"}, - messagePartitionKeys: []sarama.Encoder{ - sarama.ByteEncoder{0x99, 0xe9, 0xd8, 0x51, 0x37, 0xdb, 0x46, 0xef, 0xfe, 0x7c, 0x8e, 0x2d, 0x85, 0x35, 0xce, 0xeb}, - sarama.ByteEncoder{0x99, 0xe9, 0xd8, 0x51, 0x37, 0xdb, 0x46, 0xef, 0xfe, 0x7c, 0x8e, 0x2d, 0x85, 0x35, 0xce, 0xeb}, - }, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -147,7 +120,7 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler) require.True(t, ok, "Must be a KeyableMetricsMarshaler") if tt.keyEnabled { - keyableMarshaler.Key(tt.attributes) + keyableMarshaler.Key() } msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX") diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index e619fb1d6b39..415546e6b205 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -46,20 +46,18 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha // for metrics messages type KeyableMetricsMarshaler interface { MetricsMarshaler - Key(attributes []string) + Key() } type pdataMetricsMarshaler struct { - marshaler pmetric.Marshaler - encoding string - keyed bool - keyAttributes []string + marshaler pmetric.Marshaler + encoding string + keyed bool } // Key configures the pdataMetricsMarshaler to set the message key on the kafka messages -func (p *pdataMetricsMarshaler) Key(attributes []string) { +func (p *pdataMetricsMarshaler) Key() { p.keyed = true - p.keyAttributes = attributes } func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) { @@ -69,12 +67,7 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar for i := 0; i < metrics.Len(); i++ { resourceMetrics := metrics.At(i) - var hash [16]byte - if len(p.keyAttributes) > 0 { - hash = pdatautil.MapHashSelectedKeys(resourceMetrics.Resource().Attributes(), p.keyAttributes) - } else { - hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) - } + var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) newMetrics := pmetric.NewMetrics() resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty()) diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index 3ceaa60a741b..7c89bea74ade 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -13,11 +13,7 @@ kafka: required_acks: -1 # WaitForAll timeout: 10s partition_traces_by_id: true - partition_metrics_by_resource_attributes: - enabled: true - attributes: - - k1 - - k2 + partition_metrics_by_resource_attributes: true auth: plain_text: username: jdoe diff --git a/pkg/pdatautil/hash.go b/pkg/pdatautil/hash.go index 8484510c6a6e..6826de769b89 100644 --- a/pkg/pdatautil/hash.go +++ b/pkg/pdatautil/hash.go @@ -63,26 +63,6 @@ func MapHash(m pcommon.Map) [16]byte { return hw.hashSum128() } -// MapHashSelectedKeys return a hash for the provided map, using values of only provided keys. -// Order of hash calculation is determined by the order of keys. -func MapHashSelectedKeys(m pcommon.Map, keys []string) [16]byte { - if m.Len() == 0 || len(keys) == 0 { - return emptyHash - } - - hw := hashWriterPool.Get().(*hashWriter) - defer hashWriterPool.Put(hw) - hw.byteBuf = hw.byteBuf[:0] - - for _, k := range keys { - if v, ok := m.Get(k); ok { - hw.writeValueHash(v) - } - } - - return hw.hashSum128() -} - // ValueHash return a hash for the provided pcommon.Value. func ValueHash(v pcommon.Value) [16]byte { hw := hashWriterPool.Get().(*hashWriter) diff --git a/pkg/pdatautil/hash_test.go b/pkg/pdatautil/hash_test.go index 1cb655762a74..a725676f37ac 100644 --- a/pkg/pdatautil/hash_test.go +++ b/pkg/pdatautil/hash_test.go @@ -150,130 +150,6 @@ func TestMapHash(t *testing.T) { } } -func TestMapHashSelectedKeys(t *testing.T) { - tests := []struct { - name string - maps []pcommon.Map - keys [][]string - equal bool - }{ - { - name: "empty_maps", - maps: []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}, - keys: [][]string{{}, {}}, - equal: true, - }, - { - name: "same_maps_different_order", - keys: [][]string{{"k1", "k2"}, {"k1", "k2"}}, - maps: func() []pcommon.Map { - m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} - m[0].PutStr("k1", "v1") - m[0].PutInt("k2", 1) - m[0].PutDouble("k3", 1) - m[0].PutBool("k4", true) - m[0].PutEmptyBytes("k5").FromRaw([]byte("abc")) - sl := m[0].PutEmptySlice("k6") - sl.AppendEmpty().SetStr("str") - sl.AppendEmpty().SetBool(true) - m0 := m[0].PutEmptyMap("k") - m0.PutInt("k1", 1) - m0.PutDouble("k2", 10) - - m1 := m[1].PutEmptyMap("k") - m1.PutDouble("k2", 10) - m1.PutInt("k1", 1) - m[1].PutEmptyBytes("k5").FromRaw([]byte("abc")) - m[1].PutBool("k4", true) - sl = m[1].PutEmptySlice("k6") - sl.AppendEmpty().SetStr("str") - sl.AppendEmpty().SetBool(true) - m[1].PutInt("k2", 1) - m[1].PutStr("k1", "v1") - m[1].PutDouble("k3", 1) - - return m - }(), - equal: true, - }, - { - name: "different_maps_having_same_keys", - keys: [][]string{{"k2", "k3"}, {"k2", "k3"}}, - maps: func() []pcommon.Map { - m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} - m[0].PutStr("k1", "v1") - m[0].PutInt("k2", 1) - m[0].PutDouble("k3", 1) - - m[1].PutInt("k2", 1) - m[1].PutDouble("k3", 1) - m[1].PutDouble("k4", 1) - - return m - }(), - equal: true, - }, - { - name: "same_maps_different_key_order", - keys: [][]string{{"k2", "k3"}, {"k3", "k2"}}, - maps: func() []pcommon.Map { - m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} - m[0].PutInt("k2", 1) - m[0].PutDouble("k3", 1) - - m[1].PutInt("k2", 1) - m[1].PutDouble("k3", 1) - - return m - }(), - equal: false, - }, - { - name: "same_maps_with_not_existing_keys", - keys: [][]string{{"k10", "k11"}, {"k10", "k11"}}, - maps: func() []pcommon.Map { - m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} - m[0].PutStr("k1", "v1") - - m[1].PutInt("k2", 1) - - return m - }(), - equal: true, - }, - { - name: "different_maps", - keys: [][]string{{"k2", "k3"}, {"k2", "k3"}}, - maps: func() []pcommon.Map { - m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} - m[0].PutInt("k2", 2) - m[0].PutDouble("k3", 2) - - m[1].PutInt("k2", 1) - m[1].PutDouble("k3", 1) - - return m - }(), - equal: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for i := 0; i < len(tt.maps); i++ { - for j := i + 1; j < len(tt.maps); j++ { - if tt.equal { - assert.Equal(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), - "maps %d %v and %d %v must have the same hash, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) - } else { - assert.NotEqual(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), - "maps %d %v and %d %v must have different hashes, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) - } - } - } - }) - } -} - func TestValueHash(t *testing.T) { tests := []struct { name string From 5c5502ddf3811bcf6f85f830e37f635d3eccf7e0 Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Fri, 26 Apr 2024 18:56:05 +0700 Subject: [PATCH 11/15] Bump go version to 1.21.9 to fix GO-2024-2687 --- exporter/kafkaexporter/go.mod | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index 35b6d602464f..fbaa815fa98d 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -1,6 +1,6 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter -go 1.21.0 +go 1.21.9 require ( github.com/IBM/sarama v1.43.1 @@ -10,6 +10,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.99.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.99.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.99.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.99.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.99.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.99.0 github.com/openzipkin/zipkin-go v0.4.2 From 62039278af984453274ceb2fb455b5f07ca5478c Mon Sep 17 00:00:00 2001 From: Evgeniy Zuikin Date: Fri, 26 Apr 2024 23:15:09 +0700 Subject: [PATCH 12/15] Update exporter/kafkaexporter/go.mod Co-authored-by: Curtis Robert --- exporter/kafkaexporter/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index fbaa815fa98d..1ec42f4660ff 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -1,6 +1,6 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter -go 1.21.9 +go 1.21.0 require ( github.com/IBM/sarama v1.43.1 From e56a50e3873e56a79711a1074693f9086bc7484f Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Mon, 29 Apr 2024 10:51:50 +0700 Subject: [PATCH 13/15] lint --- exporter/kafkaexporter/marshaler_test.go | 2 +- exporter/kafkaexporter/pdata_marshaler.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index 97b110fd945c..5f40379d75a2 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -6,7 +6,6 @@ package kafkaexporter import ( "encoding/json" "fmt" - "go.opentelemetry.io/collector/pdata/pmetric" "testing" "time" @@ -15,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" ) diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 415546e6b205..544bd579a1f4 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -5,12 +5,13 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect import ( "github.com/IBM/sarama" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" ) type pdataLogsMarshaler struct { From 878460789bd4edcf1bfcc86ffe88629467222481 Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Mon, 29 Apr 2024 11:59:48 +0700 Subject: [PATCH 14/15] tidy --- receiver/kafkareceiver/go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index fa4562a1811e..adcb79ddec9a 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -64,6 +64,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.99.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.99.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.0 // indirect From d161c6b726593a7ce0e0b4764e6d663c8d5e2237 Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Mon, 29 Apr 2024 13:45:18 +0700 Subject: [PATCH 15/15] do not mutate messages --- exporter/kafkaexporter/pdata_marshaler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 544bd579a1f4..3429cdd8316e 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -71,7 +71,7 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) newMetrics := pmetric.NewMetrics() - resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty()) + resourceMetrics.CopyTo(newMetrics.ResourceMetrics().AppendEmpty()) bts, err := p.marshaler.MarshalMetrics(newMetrics) if err != nil {