From 25c61a51407c6067ae9c23a8f9f565cdfdaf67ca Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Sun, 24 Mar 2024 10:31:27 +0700 Subject: [PATCH] add partitioning by specific tags --- exporter/kafkaexporter/README.md | 6 +- exporter/kafkaexporter/config.go | 10 +- exporter/kafkaexporter/factory.go | 6 + exporter/kafkaexporter/kafka_exporter.go | 4 +- exporter/kafkaexporter/marshaler_test.go | 2 +- exporter/kafkaexporter/pdata_marshaler.go | 19 ++- exporter/kafkaexporter/testdata/config.yaml | 6 +- pkg/pdatautil/hash.go | 20 ++++ pkg/pdatautil/hash_test.go | 124 ++++++++++++++++++++ 9 files changed, 185 insertions(+), 12 deletions(-) diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index f9069307bc5f..730be96df811 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -40,7 +40,11 @@ The following settings can be optionally configured: - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. - `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. -- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. +- `partition_metrics_by_resource_attributes` configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. + - `enabled`: (default = false) + - `attributes`: (default = []) the list of resource attributes to include in the partitioning key. If empty, all resource attributes are included. + - `service.name` + - `service.instance.id` - `auth` - `plain_text` - `username`: The username to use. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index baefca3ce23a..5cf7fa403cee 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -48,7 +48,7 @@ type Config struct { // trace ID as the message key by default. PartitionTracesByID bool `mapstructure:"partition_traces_by_id"` - PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"` + PartitionMetricsByResourceAttributes PartitionByResourceAttributes `mapstructure:"partition_metrics_by_resource_attributes"` // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. @@ -61,6 +61,14 @@ type Config struct { Authentication kafka.Authentication `mapstructure:"auth"` } +// PartitionByResourceAttributes defines configuration for partitioning by resource attributes. +type PartitionByResourceAttributes struct { + Enabled bool `mapstructure:"enabled"` + + // The list of resource attributes to use for partitioning, empty by default + Attributes []string `mapstructure:"attributes"` +} + // Metadata defines configuration for retrieving metadata from the broker. type Metadata struct { // Whether to maintain a full set of metadata for all topics, or just diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index e822b0a005f4..0909f6884abb 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -38,6 +38,8 @@ const ( defaultCompression = "none" // default from sarama.NewConfig() defaultFluxMaxMessages = 0 + // partitioning metrics by resource attributes is disabled by default + defaultPartitionMetricsByResourceAttributesEnabled = false ) // FactoryOption applies changes to kafkaExporterFactory. @@ -99,6 +101,10 @@ func createDefaultConfig() component.Config { // using an empty topic to track when it has not been set by user, default is based on traces or metrics. Topic: "", Encoding: defaultEncoding, + PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{ + Enabled: defaultPartitionMetricsByResourceAttributesEnabled, + Attributes: []string{}, + }, Metadata: Metadata{ Full: defaultMetadataFull, Retry: MetadataRetry{ diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index c6b82dcfc66a..0f26df3bc9e9 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -211,9 +211,9 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m if marshaler == nil { return nil, errUnrecognizedEncoding } - if config.PartitionMetricsByResourceAttributes { + if config.PartitionMetricsByResourceAttributes.Enabled { if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok { - keyableMarshaler.Key() + keyableMarshaler.Key(config.PartitionMetricsByResourceAttributes.Attributes) } } diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index 4b29b10252d8..fc1dc1aa5840 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -105,7 +105,7 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler) require.True(t, ok, "Must be a KeyableMetricsMarshaler") - keyableMarshaler.Key() + keyableMarshaler.Key([]string{}) msgs, err = keyableMarshaler.Marshal(metric, "KafkaTopicX") require.NoError(t, err, "Must have marshaled the data without error") diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 415546e6b205..e619fb1d6b39 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -46,18 +46,20 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha // for metrics messages type KeyableMetricsMarshaler interface { MetricsMarshaler - Key() + Key(attributes []string) } type pdataMetricsMarshaler struct { - marshaler pmetric.Marshaler - encoding string - keyed bool + marshaler pmetric.Marshaler + encoding string + keyed bool + keyAttributes []string } // Key configures the pdataMetricsMarshaler to set the message key on the kafka messages -func (p *pdataMetricsMarshaler) Key() { +func (p *pdataMetricsMarshaler) Key(attributes []string) { p.keyed = true + p.keyAttributes = attributes } func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) { @@ -67,7 +69,12 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar for i := 0; i < metrics.Len(); i++ { resourceMetrics := metrics.At(i) - var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) + var hash [16]byte + if len(p.keyAttributes) > 0 { + hash = pdatautil.MapHashSelectedKeys(resourceMetrics.Resource().Attributes(), p.keyAttributes) + } else { + hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) + } newMetrics := pmetric.NewMetrics() resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty()) diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index 7c89bea74ade..a6cfadd4faf5 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -13,7 +13,11 @@ kafka: required_acks: -1 # WaitForAll timeout: 10s partition_traces_by_id: true - partition_metrics_by_resource_attributes: true + partition_metrics_by_resource_attributes: + enabled: true + attributes: + - service.name + - service.instance.id auth: plain_text: username: jdoe diff --git a/pkg/pdatautil/hash.go b/pkg/pdatautil/hash.go index 650258445b4c..22ffb8880086 100644 --- a/pkg/pdatautil/hash.go +++ b/pkg/pdatautil/hash.go @@ -63,6 +63,26 @@ func MapHash(m pcommon.Map) [16]byte { return hw.hashSum128() } +// MapHashSelectedKeys return a hash for the provided map, using values of only provided keys. +// Order of hash calculation is determined by the order of keys. +func MapHashSelectedKeys(m pcommon.Map, keys []string) [16]byte { + if m.Len() == 0 || len(keys) == 0 { + return emptyHash + } + + hw := hashWriterPool.Get().(*hashWriter) + defer hashWriterPool.Put(hw) + hw.byteBuf = hw.byteBuf[:0] + + for _, k := range keys { + if v, ok := m.Get(k); ok { + hw.writeValueHash(v) + } + } + + return hw.hashSum128() +} + // ValueHash return a hash for the provided pcommon.Value. func ValueHash(v pcommon.Value) [16]byte { hw := hashWriterPool.Get().(*hashWriter) diff --git a/pkg/pdatautil/hash_test.go b/pkg/pdatautil/hash_test.go index a725676f37ac..453bdb4e22c8 100644 --- a/pkg/pdatautil/hash_test.go +++ b/pkg/pdatautil/hash_test.go @@ -150,6 +150,130 @@ func TestMapHash(t *testing.T) { } } +func TestMapHashSelectedKeys(t *testing.T) { + tests := []struct { + name string + maps []pcommon.Map + keys [][]string + equal bool + }{ + { + name: "empty_maps", + maps: []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}, + keys: [][]string{{}, {}}, + equal: true, + }, + { + name: "same_maps_different_order", + keys: [][]string{{"k1", "k2"}, {"k1", "k2"}}, + maps: func() []pcommon.Map { + m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} + m[0].PutStr("k1", "v1") + m[0].PutInt("k2", 1) + m[0].PutDouble("k3", 1) + m[0].PutBool("k4", true) + m[0].PutEmptyBytes("k5").FromRaw([]byte("abc")) + sl := m[0].PutEmptySlice("k6") + sl.AppendEmpty().SetStr("str") + sl.AppendEmpty().SetBool(true) + m0 := m[0].PutEmptyMap("k") + m0.PutInt("k1", 1) + m0.PutDouble("k2", 10) + + m1 := m[1].PutEmptyMap("k") + m1.PutDouble("k2", 10) + m1.PutInt("k1", 1) + m[1].PutEmptyBytes("k5").FromRaw([]byte("abc")) + m[1].PutBool("k4", true) + sl = m[1].PutEmptySlice("k6") + sl.AppendEmpty().SetStr("str") + sl.AppendEmpty().SetBool(true) + m[1].PutInt("k2", 1) + m[1].PutStr("k1", "v1") + m[1].PutDouble("k3", 1) + + return m + }(), + equal: true, + }, + { + name: "different_maps_having_same_keys", + keys: [][]string{{"k2", "k3"}, {"k2", "k3"}}, + maps: func() []pcommon.Map { + m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} + m[0].PutStr("k1", "v1") + m[0].PutInt("k2", 1) + m[0].PutDouble("k3", 1) + + m[1].PutInt("k2", 1) + m[1].PutDouble("k3", 1) + m[1].PutDouble("k4", 1) + + return m + }(), + equal: true, + }, + { + name: "same_maps_different_key_order", + keys: [][]string{{"k2", "k3"}, {"k3", "k2"}}, + maps: func() []pcommon.Map { + m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} + m[0].PutInt("k2", 1) + m[0].PutDouble("k3", 1) + + m[1].PutInt("k2", 1) + m[1].PutDouble("k3", 1) + + return m + }(), + equal: false, + }, + { + name: "same_maps_with_not_existing_keys", + keys: [][]string{{"k10", "k11"}, {"k10", "k11"}}, + maps: func() []pcommon.Map { + m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} + m[0].PutStr("k1", "v1") + + m[1].PutInt("k2", 1) + + return m + }(), + equal: true, + }, + { + name: "different_maps", + keys: [][]string{{"k2", "k3"}, {"k2", "k3"}}, + maps: func() []pcommon.Map { + m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} + m[0].PutInt("k2", 2) + m[0].PutDouble("k3", 2) + + m[1].PutInt("k2", 1) + m[1].PutDouble("k3", 1) + + return m + }(), + equal: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for i := 0; i < len(tt.maps); i++ { + for j := i + 1; j < len(tt.maps); j++ { + if tt.equal { + assert.Equal(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), + "maps %d %v and %d %v must have the same hash, then calculated with keys %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) + } else { + assert.NotEqual(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), + "maps %d %v and %d %v must have the different hashes, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) + } + } + } + }) + } +} + func TestValueHash(t *testing.T) { tests := []struct { name string