Skip to content

Commit

Permalink
add partitioning by specific tags
Browse files Browse the repository at this point in the history
  • Loading branch information
SHaaD94 committed Mar 24, 2024
1 parent e536f98 commit 25c61a5
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 12 deletions.
6 changes: 5 additions & 1 deletion exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ The following settings can be optionally configured:
- The following encodings are valid *only* for **logs**.
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `partition_metrics_by_resource_attributes` configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `enabled`: (default = false)
- `attributes`: (default = []) the list of resource attributes to include in the partitioning key. If empty, all resource attributes are included.
- `service.name`
- `service.instance.id`
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
10 changes: 9 additions & 1 deletion exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Config struct {
// trace ID as the message key by default.
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`

PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`
PartitionMetricsByResourceAttributes PartitionByResourceAttributes `mapstructure:"partition_metrics_by_resource_attributes"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Expand All @@ -61,6 +61,14 @@ type Config struct {
Authentication kafka.Authentication `mapstructure:"auth"`
}

// PartitionByResourceAttributes defines configuration for partitioning by resource attributes.
type PartitionByResourceAttributes struct {
Enabled bool `mapstructure:"enabled"`

// The list of resource attributes to use for partitioning, empty by default
Attributes []string `mapstructure:"attributes"`
}

// Metadata defines configuration for retrieving metadata from the broker.
type Metadata struct {
// Whether to maintain a full set of metadata for all topics, or just
Expand Down
6 changes: 6 additions & 0 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
defaultCompression = "none"
// default from sarama.NewConfig()
defaultFluxMaxMessages = 0
// partitioning metrics by resource attributes is disabled by default
defaultPartitionMetricsByResourceAttributesEnabled = false
)

// FactoryOption applies changes to kafkaExporterFactory.
Expand Down Expand Up @@ -99,6 +101,10 @@ func createDefaultConfig() component.Config {
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
Topic: "",
Encoding: defaultEncoding,
PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{
Enabled: defaultPartitionMetricsByResourceAttributesEnabled,
Attributes: []string{},
},
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand Down
4 changes: 2 additions & 2 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
if config.PartitionMetricsByResourceAttributes {
if config.PartitionMetricsByResourceAttributes.Enabled {
if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok {
keyableMarshaler.Key()
keyableMarshaler.Key(config.PartitionMetricsByResourceAttributes.Attributes)
}
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) {

keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler)
require.True(t, ok, "Must be a KeyableMetricsMarshaler")
keyableMarshaler.Key()
keyableMarshaler.Key([]string{})

msgs, err = keyableMarshaler.Marshal(metric, "KafkaTopicX")
require.NoError(t, err, "Must have marshaled the data without error")
Expand Down
19 changes: 13 additions & 6 deletions exporter/kafkaexporter/pdata_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,20 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha
// for metrics messages
type KeyableMetricsMarshaler interface {
MetricsMarshaler
Key()
Key(attributes []string)
}

type pdataMetricsMarshaler struct {
marshaler pmetric.Marshaler
encoding string
keyed bool
marshaler pmetric.Marshaler
encoding string
keyed bool
keyAttributes []string
}

// Key configures the pdataMetricsMarshaler to set the message key on the kafka messages
func (p *pdataMetricsMarshaler) Key() {
func (p *pdataMetricsMarshaler) Key(attributes []string) {
p.keyed = true
p.keyAttributes = attributes
}

func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) {
Expand All @@ -67,7 +69,12 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar

for i := 0; i < metrics.Len(); i++ {
resourceMetrics := metrics.At(i)
var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes())
var hash [16]byte
if len(p.keyAttributes) > 0 {
hash = pdatautil.MapHashSelectedKeys(resourceMetrics.Resource().Attributes(), p.keyAttributes)
} else {
hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes())
}

newMetrics := pmetric.NewMetrics()
resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty())
Expand Down
6 changes: 5 additions & 1 deletion exporter/kafkaexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ kafka:
required_acks: -1 # WaitForAll
timeout: 10s
partition_traces_by_id: true
partition_metrics_by_resource_attributes: true
partition_metrics_by_resource_attributes:
enabled: true
attributes:
- service.name
- service.instance.id
auth:
plain_text:
username: jdoe
Expand Down
20 changes: 20 additions & 0 deletions pkg/pdatautil/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ func MapHash(m pcommon.Map) [16]byte {
return hw.hashSum128()
}

// MapHashSelectedKeys return a hash for the provided map, using values of only provided keys.
// Order of hash calculation is determined by the order of keys.
func MapHashSelectedKeys(m pcommon.Map, keys []string) [16]byte {
if m.Len() == 0 || len(keys) == 0 {
return emptyHash
}

hw := hashWriterPool.Get().(*hashWriter)
defer hashWriterPool.Put(hw)
hw.byteBuf = hw.byteBuf[:0]

for _, k := range keys {
if v, ok := m.Get(k); ok {
hw.writeValueHash(v)
}
}

return hw.hashSum128()
}

// ValueHash return a hash for the provided pcommon.Value.
func ValueHash(v pcommon.Value) [16]byte {
hw := hashWriterPool.Get().(*hashWriter)
Expand Down
124 changes: 124 additions & 0 deletions pkg/pdatautil/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,130 @@ func TestMapHash(t *testing.T) {
}
}

func TestMapHashSelectedKeys(t *testing.T) {
tests := []struct {
name string
maps []pcommon.Map
keys [][]string
equal bool
}{
{
name: "empty_maps",
maps: []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()},
keys: [][]string{{}, {}},
equal: true,
},
{
name: "same_maps_different_order",
keys: [][]string{{"k1", "k2"}, {"k1", "k2"}},
maps: func() []pcommon.Map {
m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}
m[0].PutStr("k1", "v1")
m[0].PutInt("k2", 1)
m[0].PutDouble("k3", 1)
m[0].PutBool("k4", true)
m[0].PutEmptyBytes("k5").FromRaw([]byte("abc"))
sl := m[0].PutEmptySlice("k6")
sl.AppendEmpty().SetStr("str")
sl.AppendEmpty().SetBool(true)
m0 := m[0].PutEmptyMap("k")
m0.PutInt("k1", 1)
m0.PutDouble("k2", 10)

m1 := m[1].PutEmptyMap("k")
m1.PutDouble("k2", 10)
m1.PutInt("k1", 1)
m[1].PutEmptyBytes("k5").FromRaw([]byte("abc"))
m[1].PutBool("k4", true)
sl = m[1].PutEmptySlice("k6")
sl.AppendEmpty().SetStr("str")
sl.AppendEmpty().SetBool(true)
m[1].PutInt("k2", 1)
m[1].PutStr("k1", "v1")
m[1].PutDouble("k3", 1)

return m
}(),
equal: true,
},
{
name: "different_maps_having_same_keys",
keys: [][]string{{"k2", "k3"}, {"k2", "k3"}},
maps: func() []pcommon.Map {
m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}
m[0].PutStr("k1", "v1")
m[0].PutInt("k2", 1)
m[0].PutDouble("k3", 1)

m[1].PutInt("k2", 1)
m[1].PutDouble("k3", 1)
m[1].PutDouble("k4", 1)

return m
}(),
equal: true,
},
{
name: "same_maps_different_key_order",
keys: [][]string{{"k2", "k3"}, {"k3", "k2"}},
maps: func() []pcommon.Map {
m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}
m[0].PutInt("k2", 1)
m[0].PutDouble("k3", 1)

m[1].PutInt("k2", 1)
m[1].PutDouble("k3", 1)

return m
}(),
equal: false,
},
{
name: "same_maps_with_not_existing_keys",
keys: [][]string{{"k10", "k11"}, {"k10", "k11"}},
maps: func() []pcommon.Map {
m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}
m[0].PutStr("k1", "v1")

m[1].PutInt("k2", 1)

return m
}(),
equal: true,
},
{
name: "different_maps",
keys: [][]string{{"k2", "k3"}, {"k2", "k3"}},
maps: func() []pcommon.Map {
m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}
m[0].PutInt("k2", 2)
m[0].PutDouble("k3", 2)

m[1].PutInt("k2", 1)
m[1].PutDouble("k3", 1)

return m
}(),
equal: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for i := 0; i < len(tt.maps); i++ {
for j := i + 1; j < len(tt.maps); j++ {
if tt.equal {
assert.Equal(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]),
"maps %d %v and %d %v must have the same hash, then calculated with keys %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j])
} else {
assert.NotEqual(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]),
"maps %d %v and %d %v must have the different hashes, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j])
}
}
}
})
}
}

func TestValueHash(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 25c61a5

Please sign in to comment.