From 142c36d16e3e0deffe94609d29f92084b9e4e455 Mon Sep 17 00:00:00 2001 From: Kirill 'Earwin' Zakharenko Date: Wed, 9 Oct 2024 06:34:41 +0100 Subject: [PATCH] [exporter/clickhouseexporter] Sort attribute maps before insertion (#33634) --- ...clickhouseexporter-ordered-attributes.yaml | 27 +++++ exporter/clickhouseexporter/exporter_logs.go | 17 +-- .../clickhouseexporter/exporter_logs_test.go | 9 +- .../clickhouseexporter/exporter_metrics.go | 2 +- .../clickhouseexporter/exporter_traces.go | 36 ++---- .../internal/exponential_histogram_metrics.go | 15 +-- .../internal/gauge_metrics.go | 15 +-- .../internal/histogram_metrics.go | 15 +-- .../internal/metrics_model.go | 21 ++-- .../internal/metrics_model_test.go | 21 ++-- .../internal/orderedmap/orderedmap.go | 114 ++++++++++++++++++ .../internal/sum_metrics.go | 15 +-- .../internal/summary_metrics.go | 15 +-- 13 files changed, 215 insertions(+), 107 deletions(-) create mode 100644 .chloggen/clickhouseexporter-ordered-attributes.yaml create mode 100644 exporter/clickhouseexporter/internal/orderedmap/orderedmap.go diff --git a/.chloggen/clickhouseexporter-ordered-attributes.yaml b/.chloggen/clickhouseexporter-ordered-attributes.yaml new file mode 100644 index 000000000000..2275aef4e0c7 --- /dev/null +++ b/.chloggen/clickhouseexporter-ordered-attributes.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: clickhouseexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Exporter now sorts attribute maps' keys during INSERT, yielding better compression and predictable aggregates" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33634] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/clickhouseexporter/exporter_logs.go b/exporter/clickhouseexporter/exporter_logs.go index a4987457420a..fc07418a7ce7 100644 --- a/exporter/clickhouseexporter/exporter_logs.go +++ b/exporter/clickhouseexporter/exporter_logs.go @@ -10,8 +10,8 @@ import ( "time" _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver. + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" conventions "go.opentelemetry.io/collector/semconv/v1.27.0" "go.uber.org/zap" @@ -77,7 +77,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { logs := ld.ResourceLogs().At(i) res := logs.Resource() resURL := logs.SchemaUrl() - resAttr := attributesToMap(res.Attributes()) + resAttr := internal.AttributesToMap(res.Attributes()) if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { serviceName = v.Str() } @@ -87,7 +87,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { scopeURL := logs.ScopeLogs().At(j).SchemaUrl() scopeName := logs.ScopeLogs().At(j).Scope().Name() scopeVersion := logs.ScopeLogs().At(j).Scope().Version() - scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes()) + scopeAttr := internal.AttributesToMap(logs.ScopeLogs().At(j).Scope().Attributes()) for k := 0; k < rs.Len(); k++ { r := rs.At(k) @@ -97,7 +97,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { timestamp = r.ObservedTimestamp() } - logAttr := attributesToMap(r.Attributes()) + logAttr := internal.AttributesToMap(r.Attributes()) _, err = statement.ExecContext(ctx, timestamp.AsTime(), traceutil.TraceIDToHexOrEmptyString(r.TraceID()), @@ -129,15 +129,6 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { return err } -func attributesToMap(attributes pcommon.Map) map[string]string { - m := make(map[string]string, attributes.Len()) - attributes.Range(func(k string, v pcommon.Value) bool { - m[k] = v.AsString() - return true - }) - return m -} - const ( // language=ClickHouse SQL createLogsTableSQL = ` diff --git a/exporter/clickhouseexporter/exporter_logs_test.go b/exporter/clickhouseexporter/exporter_logs_test.go index 7388e68f243c..ce28ef9468c4 100644 --- a/exporter/clickhouseexporter/exporter_logs_test.go +++ b/exporter/clickhouseexporter/exporter_logs_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal/orderedmap" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -94,9 +95,9 @@ func TestExporter_pushLogsData(t *testing.T) { initClickhouseTestServer(t, func(query string, values []driver.Value) error { if strings.HasPrefix(query, "INSERT") { require.Equal(t, "https://opentelemetry.io/schemas/1.4.0", values[8]) - require.Equal(t, map[string]string{ + require.Equal(t, orderedmap.FromMap(map[string]string{ "service.name": "test-service", - }, values[9]) + }), values[9]) } return nil }) @@ -109,9 +110,9 @@ func TestExporter_pushLogsData(t *testing.T) { require.Equal(t, "https://opentelemetry.io/schemas/1.7.0", values[10]) require.Equal(t, "io.opentelemetry.contrib.clickhouse", values[11]) require.Equal(t, "1.0.0", values[12]) - require.Equal(t, map[string]string{ + require.Equal(t, orderedmap.FromMap(map[string]string{ "lib": "clickhouse", - }, values[13]) + }), values[13]) } return nil }) diff --git a/exporter/clickhouseexporter/exporter_metrics.go b/exporter/clickhouseexporter/exporter_metrics.go index 66f67b5cc065..be5696a01855 100644 --- a/exporter/clickhouseexporter/exporter_metrics.go +++ b/exporter/clickhouseexporter/exporter_metrics.go @@ -77,7 +77,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric metricsMap := internal.NewMetricsModel(e.tablesConfig) for i := 0; i < md.ResourceMetrics().Len(); i++ { metrics := md.ResourceMetrics().At(i) - resAttr := attributesToMap(metrics.Resource().Attributes()) + resAttr := metrics.Resource().Attributes() for j := 0; j < metrics.ScopeMetrics().Len(); j++ { rs := metrics.ScopeMetrics().At(j).Metrics() scopeInstr := metrics.ScopeMetrics().At(j).Scope() diff --git a/exporter/clickhouseexporter/exporter_traces.go b/exporter/clickhouseexporter/exporter_traces.go index 193a1ad0fd8e..fc26bd713418 100644 --- a/exporter/clickhouseexporter/exporter_traces.go +++ b/exporter/clickhouseexporter/exporter_traces.go @@ -11,6 +11,8 @@ import ( "time" _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver. + "github.com/ClickHouse/clickhouse-go/v2/lib/column" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.27.0" @@ -74,18 +76,15 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er for i := 0; i < td.ResourceSpans().Len(); i++ { spans := td.ResourceSpans().At(i) res := spans.Resource() - resAttr := attributesToMap(res.Attributes()) - var serviceName string - if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { - serviceName = v.Str() - } + resAttr := internal.AttributesToMap(res.Attributes()) + serviceName, _ := res.Attributes().Get(conventions.AttributeServiceName) for j := 0; j < spans.ScopeSpans().Len(); j++ { rs := spans.ScopeSpans().At(j).Spans() scopeName := spans.ScopeSpans().At(j).Scope().Name() scopeVersion := spans.ScopeSpans().At(j).Scope().Version() for k := 0; k < rs.Len(); k++ { r := rs.At(k) - spanAttr := attributesToMap(r.Attributes()) + spanAttr := internal.AttributesToMap(r.Attributes()) status := r.Status() eventTimes, eventNames, eventAttrs := convertEvents(r.Events()) linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs := convertLinks(r.Links()) @@ -97,7 +96,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er r.TraceState().AsRaw(), r.Name(), r.Kind().String(), - serviceName, + serviceName.AsString(), resAttr, scopeName, scopeVersion, @@ -127,36 +126,25 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er return err } -func convertEvents(events ptrace.SpanEventSlice) ([]time.Time, []string, []map[string]string) { - var ( - times []time.Time - names []string - attrs []map[string]string - ) +func convertEvents(events ptrace.SpanEventSlice) (times []time.Time, names []string, attrs []column.IterableOrderedMap) { for i := 0; i < events.Len(); i++ { event := events.At(i) times = append(times, event.Timestamp().AsTime()) names = append(names, event.Name()) - attrs = append(attrs, attributesToMap(event.Attributes())) + attrs = append(attrs, internal.AttributesToMap(event.Attributes())) } - return times, names, attrs + return } -func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []map[string]string) { - var ( - traceIDs []string - spanIDs []string - states []string - attrs []map[string]string - ) +func convertLinks(links ptrace.SpanLinkSlice) (traceIDs []string, spanIDs []string, states []string, attrs []column.IterableOrderedMap) { for i := 0; i < links.Len(); i++ { link := links.At(i) traceIDs = append(traceIDs, traceutil.TraceIDToHexOrEmptyString(link.TraceID())) spanIDs = append(spanIDs, traceutil.SpanIDToHexOrEmptyString(link.SpanID())) states = append(states, link.TraceState().AsRaw()) - attrs = append(attrs, attributesToMap(link.Attributes())) + attrs = append(attrs, internal.AttributesToMap(link.Attributes())) } - return traceIDs, spanIDs, states, attrs + return } const ( diff --git a/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go b/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go index 82d93dcf366f..064e12a2b234 100644 --- a/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go +++ b/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go @@ -130,27 +130,24 @@ func (e *expHistogramMetrics) insert(ctx context.Context, db *sql.DB) error { }() for _, model := range e.expHistogramModels { - var serviceName string - if v, ok := model.metadata.ResAttr[conventions.AttributeServiceName]; ok { - serviceName = v - } + serviceName, _ := model.metadata.ResAttr.Get(conventions.AttributeServiceName) for i := 0; i < model.expHistogram.DataPoints().Len(); i++ { dp := model.expHistogram.DataPoints().At(i) attrs, times, values, traceIDs, spanIDs := convertExemplars(dp.Exemplars()) _, err = statement.ExecContext(ctx, - model.metadata.ResAttr, + AttributesToMap(model.metadata.ResAttr), model.metadata.ResURL, model.metadata.ScopeInstr.Name(), model.metadata.ScopeInstr.Version(), - attributesToMap(model.metadata.ScopeInstr.Attributes()), + AttributesToMap(model.metadata.ScopeInstr.Attributes()), model.metadata.ScopeInstr.DroppedAttributesCount(), model.metadata.ScopeURL, - serviceName, + serviceName.AsString(), model.metricName, model.metricDescription, model.metricUnit, - attributesToMap(dp.Attributes()), + AttributesToMap(dp.Attributes()), dp.StartTimestamp().AsTime(), dp.Timestamp().AsTime(), dp.Count(), @@ -190,7 +187,7 @@ func (e *expHistogramMetrics) insert(ctx context.Context, db *sql.DB) error { return nil } -func (e *expHistogramMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { +func (e *expHistogramMetrics) Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { expHistogram, ok := metrics.(pmetric.ExponentialHistogram) if !ok { return fmt.Errorf("metrics param is not type of ExponentialHistogram") diff --git a/exporter/clickhouseexporter/internal/gauge_metrics.go b/exporter/clickhouseexporter/internal/gauge_metrics.go index 596f3fa654ed..e2fbfe2dc365 100644 --- a/exporter/clickhouseexporter/internal/gauge_metrics.go +++ b/exporter/clickhouseexporter/internal/gauge_metrics.go @@ -109,27 +109,24 @@ func (g *gaugeMetrics) insert(ctx context.Context, db *sql.DB) error { }() for _, model := range g.gaugeModels { - var serviceName string - if v, ok := model.metadata.ResAttr[conventions.AttributeServiceName]; ok { - serviceName = v - } + serviceName, _ := model.metadata.ResAttr.Get(conventions.AttributeServiceName) for i := 0; i < model.gauge.DataPoints().Len(); i++ { dp := model.gauge.DataPoints().At(i) attrs, times, values, traceIDs, spanIDs := convertExemplars(dp.Exemplars()) _, err = statement.ExecContext(ctx, - model.metadata.ResAttr, + AttributesToMap(model.metadata.ResAttr), model.metadata.ResURL, model.metadata.ScopeInstr.Name(), model.metadata.ScopeInstr.Version(), - attributesToMap(model.metadata.ScopeInstr.Attributes()), + AttributesToMap(model.metadata.ScopeInstr.Attributes()), model.metadata.ScopeInstr.DroppedAttributesCount(), model.metadata.ScopeURL, - serviceName, + serviceName.AsString(), model.metricName, model.metricDescription, model.metricUnit, - attributesToMap(dp.Attributes()), + AttributesToMap(dp.Attributes()), dp.StartTimestamp().AsTime(), dp.Timestamp().AsTime(), getValue(dp.IntValue(), dp.DoubleValue(), dp.ValueType()), @@ -155,7 +152,7 @@ func (g *gaugeMetrics) insert(ctx context.Context, db *sql.DB) error { return nil } -func (g *gaugeMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { +func (g *gaugeMetrics) Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { gauge, ok := metrics.(pmetric.Gauge) if !ok { return fmt.Errorf("metrics param is not type of Gauge") diff --git a/exporter/clickhouseexporter/internal/histogram_metrics.go b/exporter/clickhouseexporter/internal/histogram_metrics.go index 544e019cf8a3..f3374b655ba2 100644 --- a/exporter/clickhouseexporter/internal/histogram_metrics.go +++ b/exporter/clickhouseexporter/internal/histogram_metrics.go @@ -121,27 +121,24 @@ func (h *histogramMetrics) insert(ctx context.Context, db *sql.DB) error { }() for _, model := range h.histogramModel { - var serviceName string - if v, ok := model.metadata.ResAttr[conventions.AttributeServiceName]; ok { - serviceName = v - } + serviceName, _ := model.metadata.ResAttr.Get(conventions.AttributeServiceName) for i := 0; i < model.histogram.DataPoints().Len(); i++ { dp := model.histogram.DataPoints().At(i) attrs, times, values, traceIDs, spanIDs := convertExemplars(dp.Exemplars()) _, err = statement.ExecContext(ctx, - model.metadata.ResAttr, + AttributesToMap(model.metadata.ResAttr), model.metadata.ResURL, model.metadata.ScopeInstr.Name(), model.metadata.ScopeInstr.Version(), - attributesToMap(model.metadata.ScopeInstr.Attributes()), + AttributesToMap(model.metadata.ScopeInstr.Attributes()), model.metadata.ScopeInstr.DroppedAttributesCount(), model.metadata.ScopeURL, - serviceName, + serviceName.AsString(), model.metricName, model.metricDescription, model.metricUnit, - attributesToMap(dp.Attributes()), + AttributesToMap(dp.Attributes()), dp.StartTimestamp().AsTime(), dp.Timestamp().AsTime(), dp.Count(), @@ -177,7 +174,7 @@ func (h *histogramMetrics) insert(ctx context.Context, db *sql.DB) error { return nil } -func (h *histogramMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { +func (h *histogramMetrics) Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { histogram, ok := metrics.(pmetric.Histogram) if !ok { return fmt.Errorf("metrics param is not type of Histogram") diff --git a/exporter/clickhouseexporter/internal/metrics_model.go b/exporter/clickhouseexporter/internal/metrics_model.go index 978a36d75be8..24ce132f738d 100644 --- a/exporter/clickhouseexporter/internal/metrics_model.go +++ b/exporter/clickhouseexporter/internal/metrics_model.go @@ -9,6 +9,8 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/ClickHouse/clickhouse-go/v2/lib/column" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal/orderedmap" "strings" "sync" @@ -38,14 +40,14 @@ type MetricTypeConfig struct { // any type of metrics need implement it. type MetricsModel interface { // Add used to bind MetricsMetaData to a specific metric then put them into a slice - Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error + Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error // insert is used to insert metric data to clickhouse insert(ctx context.Context, db *sql.DB) error } // MetricsMetaData contain specific metric data type MetricsMetaData struct { - ResAttr map[string]string + ResAttr pcommon.Map ResURL string ScopeURL string ScopeInstr pcommon.InstrumentationScope @@ -118,7 +120,7 @@ func convertExemplars(exemplars pmetric.ExemplarSlice) (clickhouse.ArraySet, cli ) for i := 0; i < exemplars.Len(); i++ { exemplar := exemplars.At(i) - attrs = append(attrs, attributesToMap(exemplar.FilteredAttributes())) + attrs = append(attrs, AttributesToMap(exemplar.FilteredAttributes())) times = append(times, exemplar.Timestamp().AsTime()) values = append(values, getValue(exemplar.IntValue(), exemplar.DoubleValue(), exemplar.ValueType())) @@ -165,13 +167,12 @@ func getValue(intValue int64, floatValue float64, dataType any) float64 { } } -func attributesToMap(attributes pcommon.Map) map[string]string { - m := make(map[string]string, attributes.Len()) - attributes.Range(func(k string, v pcommon.Value) bool { - m[k] = v.AsString() - return true - }) - return m +func AttributesToMap(attributes pcommon.Map) column.IterableOrderedMap { + return orderedmap.CollectN(func(yield func(string, string) bool) { + attributes.Range(func(k string, v pcommon.Value) bool { + return yield(k, v.AsString()) + }) + }, attributes.Len()) } func convertSliceToArraySet[T any](slice []T) clickhouse.ArraySet { diff --git a/exporter/clickhouseexporter/internal/metrics_model_test.go b/exporter/clickhouseexporter/internal/metrics_model_test.go index 83d983bb8987..f3cbce4d6caf 100644 --- a/exporter/clickhouseexporter/internal/metrics_model_test.go +++ b/exporter/clickhouseexporter/internal/metrics_model_test.go @@ -4,6 +4,7 @@ package internal import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal/orderedmap" "testing" "time" @@ -20,15 +21,15 @@ func Test_attributesToMap(t *testing.T) { attributes.PutBool("bool", true) attributes.PutInt("int", 0) attributes.PutDouble("double", 0.0) - result := attributesToMap(attributes) + result := AttributesToMap(attributes) require.Equal( t, - map[string]string{ + orderedmap.FromMap(map[string]string{ "key": "value", "bool": "true", "int": "0", "double": "0", - }, + }), result, ) } @@ -59,7 +60,7 @@ func Test_convertExemplars(t *testing.T) { exemplar.FilteredAttributes().PutStr("key2", "value2") attrs, times, values, traceIDs, spanIDs := convertExemplars(exemplars) - require.Equal(t, clickhouse.ArraySet{map[string]string{"key1": "value1", "key2": "value2"}}, attrs) + require.Equal(t, clickhouse.ArraySet{orderedmap.FromMap(map[string]string{"key1": "value1", "key2": "value2"})}, attrs) require.Equal(t, clickhouse.ArraySet{time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)}, times) require.Equal(t, clickhouse.ArraySet{0.0}, values) require.Equal(t, clickhouse.ArraySet{"00000000000000000000000000000000"}, traceIDs) @@ -71,7 +72,7 @@ func Test_convertExemplars(t *testing.T) { exemplar.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1672218930, 0))) attrs, times, values, traceIDs, spanIDs := convertExemplars(exemplars) - require.Equal(t, clickhouse.ArraySet{map[string]string{}}, attrs) + require.Equal(t, clickhouse.ArraySet{orderedmap.FromMap(map[string]string{})}, attrs) require.Equal(t, clickhouse.ArraySet{time.Unix(1672218930, 0).UTC()}, times) require.Equal(t, clickhouse.ArraySet{0.0}, values) require.Equal(t, clickhouse.ArraySet{"00000000000000000000000000000000"}, traceIDs) @@ -83,7 +84,7 @@ func Test_convertExemplars(t *testing.T) { exemplar.SetDoubleValue(15.0) attrs, times, values, traceIDs, spanIDs := convertExemplars(exemplars) - require.Equal(t, clickhouse.ArraySet{map[string]string{}}, attrs) + require.Equal(t, clickhouse.ArraySet{orderedmap.FromMap(map[string]string{})}, attrs) require.Equal(t, clickhouse.ArraySet{time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)}, times) require.Equal(t, clickhouse.ArraySet{15.0}, values) require.Equal(t, clickhouse.ArraySet{"00000000000000000000000000000000"}, traceIDs) @@ -95,7 +96,7 @@ func Test_convertExemplars(t *testing.T) { exemplar.SetIntValue(20) attrs, times, values, traceIDs, spanIDs := convertExemplars(exemplars) - require.Equal(t, clickhouse.ArraySet{map[string]string{}}, attrs) + require.Equal(t, clickhouse.ArraySet{orderedmap.FromMap(map[string]string{})}, attrs) require.Equal(t, clickhouse.ArraySet{time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)}, times) require.Equal(t, clickhouse.ArraySet{20.0}, values) require.Equal(t, clickhouse.ArraySet{"00000000000000000000000000000000"}, traceIDs) @@ -107,7 +108,7 @@ func Test_convertExemplars(t *testing.T) { exemplar.SetSpanID([8]byte{1, 2, 3, 4}) attrs, times, values, traceIDs, spanIDs := convertExemplars(exemplars) - require.Equal(t, clickhouse.ArraySet{map[string]string{}}, attrs) + require.Equal(t, clickhouse.ArraySet{orderedmap.FromMap(map[string]string{})}, attrs) require.Equal(t, clickhouse.ArraySet{time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)}, times) require.Equal(t, clickhouse.ArraySet{0.0}, values) require.Equal(t, clickhouse.ArraySet{"00000000000000000000000000000000"}, traceIDs) @@ -119,7 +120,7 @@ func Test_convertExemplars(t *testing.T) { exemplar.SetTraceID([16]byte{1, 2, 3, 4}) attrs, times, values, traceIDs, spanIDs := convertExemplars(exemplars) - require.Equal(t, clickhouse.ArraySet{map[string]string{}}, attrs) + require.Equal(t, clickhouse.ArraySet{orderedmap.FromMap(map[string]string{})}, attrs) require.Equal(t, clickhouse.ArraySet{time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)}, times) require.Equal(t, clickhouse.ArraySet{0.0}, values) require.Equal(t, clickhouse.ArraySet{"01020304000000000000000000000000"}, traceIDs) @@ -146,7 +147,7 @@ func Test_convertExemplars(t *testing.T) { exemplar.SetTraceID([16]byte{1, 2, 3, 5}) attrs, times, values, traceIDs, spanIDs := convertExemplars(exemplars) - require.Equal(t, clickhouse.ArraySet{map[string]string{"key1": "value1", "key2": "value2"}, map[string]string{"key3": "value3", "key4": "value4"}}, attrs) + require.Equal(t, clickhouse.ArraySet{orderedmap.FromMap(map[string]string{"key1": "value1", "key2": "value2"}), orderedmap.FromMap(map[string]string{"key3": "value3", "key4": "value4"})}, attrs) require.Equal(t, clickhouse.ArraySet{time.Unix(1672218930, 0).UTC(), time.Unix(1672219930, 0).UTC()}, times) require.Equal(t, clickhouse.ArraySet{20.0, 16.0}, values) require.Equal(t, clickhouse.ArraySet{"01020304000000000000000000000000", "01020305000000000000000000000000"}, traceIDs) diff --git a/exporter/clickhouseexporter/internal/orderedmap/orderedmap.go b/exporter/clickhouseexporter/internal/orderedmap/orderedmap.go new file mode 100644 index 000000000000..93d69453d06c --- /dev/null +++ b/exporter/clickhouseexporter/internal/orderedmap/orderedmap.go @@ -0,0 +1,114 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package orderedmap // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal/orderedmap" + +import ( + "cmp" + "github.com/ClickHouse/clickhouse-go/v2/lib/column" + "slices" +) + +// Map is a simple implementation of [column.IterableOrderedMap] interface. +// It is intended to be used as a serdes wrapper for map[K]V and not as a general purpose container. +type Map[K comparable, V any] []entry[K, V] + +type entry[K comparable, V any] struct { + key K + value V +} + +type iterator[K comparable, V any] struct { + om Map[K, V] + i int +} + +func FromMap[M ~map[K]V, K cmp.Ordered, V any](m M) *Map[K, V] { + return FromMapFunc(m, cmp.Compare) +} + +func FromMapFunc[M ~map[K]V, K comparable, V any](m M, compare func(K, K) int) *Map[K, V] { + om := Map[K, V](make([]entry[K, V], 0, len(m))) + for k, v := range m { + om.Put(k, v) + } + slices.SortFunc(om, func(i, j entry[K, V]) int { return compare(i.key, j.key) }) + return &om +} + +// Collect creates a Map from an iter.Seq2[K,V] iterator. +func Collect[K cmp.Ordered, V any](seq func(yield func(K, V) bool)) *Map[K, V] { + return CollectFunc(seq, cmp.Compare) +} + +// CollectN creates a Map, pre-sized for n entries, from an iter.Seq2[K,V] iterator. +func CollectN[K cmp.Ordered, V any](seq func(yield func(K, V) bool), n int) *Map[K, V] { + return CollectNFunc(seq, n, cmp.Compare) +} + +// CollectFunc creates a Map from an iter.Seq2[K,V] iterator with a custom compare function. +func CollectFunc[K comparable, V any](seq func(yield func(K, V) bool), compare func(K, K) int) *Map[K, V] { + return CollectNFunc(seq, 8, compare) +} + +// CollectNFunc creates a Map, pre-sized for n entries, from an iter.Seq2[K,V] iterator with a custom compare function. +func CollectNFunc[K comparable, V any](seq func(yield func(K, V) bool), n int, compare func(K, K) int) *Map[K, V] { + om := Map[K, V](make([]entry[K, V], 0, n)) + seq(func(k K, v V) bool { + om.Put(k, v) + return true + }) + slices.SortFunc(om, func(i, j entry[K, V]) int { return compare(i.key, j.key) }) + return &om +} + +func (om *Map[K, V]) ToMap() map[K]V { + m := make(map[K]V, len(*om)) + for _, e := range *om { + m[e.key] = e.value + } + return m +} + +// Put is part of [column.IterableOrderedMap] interface, it expects to be called by the driver itself, +// provides no type safety and expects the keys to be given in order. +// It is recommended to use [FromMap] and [Collect] to initialize [Map]. +func (om *Map[K, V]) Put(key any, value any) { + *om = append(*om, entry[K, V]{key.(K), value.(V)}) +} + +// All is an iter.Seq[K,V] iterator that yields all key-value pairs in order. +func (om *Map[K, V]) All(yield func(k K, v V) bool) { + for _, e := range *om { + if !yield(e.key, e.value) { + return + } + } +} + +// Keys is an iter.Seq[K] iterator that yields all keys in order. +func (om *Map[K, V]) Keys(yield func(k K) bool) { + for _, e := range *om { + if !yield(e.key) { + return + } + } +} + +// Values is an iter.Seq[V] iterator that yields all values in key order. +func (om *Map[K, V]) Values(yield func(v V) bool) { + for _, e := range *om { + if !yield(e.value) { + return + } + } +} + +// Iterator is part of [column.IterableOrderedMap] interface, it expects to be called by the driver itself. +func (om *Map[K, V]) Iterator() column.MapIterator { return &iterator[K, V]{om: *om, i: -1} } + +func (i *iterator[K, V]) Next() bool { i.i++; return i.i < len(i.om) } + +func (i *iterator[K, V]) Key() any { return i.om[i.i].key } + +func (i *iterator[K, V]) Value() any { return i.om[i.i].value } diff --git a/exporter/clickhouseexporter/internal/sum_metrics.go b/exporter/clickhouseexporter/internal/sum_metrics.go index 4da0faa5cb3a..89455f8e3048 100644 --- a/exporter/clickhouseexporter/internal/sum_metrics.go +++ b/exporter/clickhouseexporter/internal/sum_metrics.go @@ -113,27 +113,24 @@ func (s *sumMetrics) insert(ctx context.Context, db *sql.DB) error { }() for _, model := range s.sumModel { - var serviceName string - if v, ok := model.metadata.ResAttr[conventions.AttributeServiceName]; ok { - serviceName = v - } + serviceName, _ := model.metadata.ResAttr.Get(conventions.AttributeServiceName) for i := 0; i < model.sum.DataPoints().Len(); i++ { dp := model.sum.DataPoints().At(i) attrs, times, values, traceIDs, spanIDs := convertExemplars(dp.Exemplars()) _, err = statement.ExecContext(ctx, - model.metadata.ResAttr, + AttributesToMap(model.metadata.ResAttr), model.metadata.ResURL, model.metadata.ScopeInstr.Name(), model.metadata.ScopeInstr.Version(), - attributesToMap(model.metadata.ScopeInstr.Attributes()), + AttributesToMap(model.metadata.ScopeInstr.Attributes()), model.metadata.ScopeInstr.DroppedAttributesCount(), model.metadata.ScopeURL, - serviceName, + serviceName.AsString(), model.metricName, model.metricDescription, model.metricUnit, - attributesToMap(dp.Attributes()), + AttributesToMap(dp.Attributes()), dp.StartTimestamp().AsTime(), dp.Timestamp().AsTime(), getValue(dp.IntValue(), dp.DoubleValue(), dp.ValueType()), @@ -165,7 +162,7 @@ func (s *sumMetrics) insert(ctx context.Context, db *sql.DB) error { return nil } -func (s *sumMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { +func (s *sumMetrics) Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { sum, ok := metrics.(pmetric.Sum) if !ok { return fmt.Errorf("metrics param is not type of Sum") diff --git a/exporter/clickhouseexporter/internal/summary_metrics.go b/exporter/clickhouseexporter/internal/summary_metrics.go index 3182ffee452c..d98197c12b2e 100644 --- a/exporter/clickhouseexporter/internal/summary_metrics.go +++ b/exporter/clickhouseexporter/internal/summary_metrics.go @@ -103,28 +103,25 @@ func (s *summaryMetrics) insert(ctx context.Context, db *sql.DB) error { _ = statement.Close() }() for _, model := range s.summaryModel { - var serviceName string - if v, ok := model.metadata.ResAttr[conventions.AttributeServiceName]; ok { - serviceName = v - } + serviceName, _ := model.metadata.ResAttr.Get(conventions.AttributeServiceName) for i := 0; i < model.summary.DataPoints().Len(); i++ { dp := model.summary.DataPoints().At(i) quantiles, values := convertValueAtQuantile(dp.QuantileValues()) _, err = statement.ExecContext(ctx, - model.metadata.ResAttr, + AttributesToMap(model.metadata.ResAttr), model.metadata.ResURL, model.metadata.ScopeInstr.Name(), model.metadata.ScopeInstr.Version(), - attributesToMap(model.metadata.ScopeInstr.Attributes()), + AttributesToMap(model.metadata.ScopeInstr.Attributes()), model.metadata.ScopeInstr.DroppedAttributesCount(), model.metadata.ScopeURL, - serviceName, + serviceName.AsString(), model.metricName, model.metricDescription, model.metricUnit, - attributesToMap(dp.Attributes()), + AttributesToMap(dp.Attributes()), dp.StartTimestamp().AsTime(), dp.Timestamp().AsTime(), dp.Count(), @@ -153,7 +150,7 @@ func (s *summaryMetrics) insert(ctx context.Context, db *sql.DB) error { return nil } -func (s *summaryMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { +func (s *summaryMetrics) Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { summary, ok := metrics.(pmetric.Summary) if !ok { return fmt.Errorf("metrics param is not type of Summary")