From 3bb68607e9720a48e413da7d988ed6ce6654d2b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Thu, 4 Aug 2022 16:54:20 +0000 Subject: [PATCH 1/6] Require the storage to be explicitly set for persistent queue (#5784) Co-authored-by: Bogdan Drutu --- CHANGELOG.md | 1 + exporter/exporterhelper/README.md | 13 ++-- .../queued_retry_experimental.go | 45 ++++++------- .../queued_retry_experimental_test.go | 65 +++++++++++++------ 4 files changed, 76 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c4128e3e236..dbfec61bcce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### πŸ›‘ Breaking changes πŸ›‘ +- Require the storage to be explicitly set for the (experimental) persistent queue (#5784) - Remove deprecated `confighttp.HTTPClientSettings.ToClientWithHost` (#5803) - Remove deprecated component stability helpers (#5802): - `component.WithTracesExporterAndStabilityLevel` diff --git a/exporter/exporterhelper/README.md b/exporter/exporterhelper/README.md index 51bd4383096..5fbb9dd5731 100644 --- a/exporter/exporterhelper/README.md +++ b/exporter/exporterhelper/README.md @@ -36,16 +36,13 @@ The following configuration options can be modified: With this build tag set, additional configuration option can be enabled: - `sending_queue` - - `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension + - `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for the persistent queue (note, `enable_unstable` build tag needs to be enabled first, see below for more details) The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which, similarly as for in-memory buffering, defaults to 5000 batches). -When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using -[file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage). -If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and -the exporting is continued. +When persistent queue is enabled, the batches are being buffered using the provided storage extension - [filestorage] is a popular and safe choice. If the collector instance is killed while having some items in the persistent queue, on restart the items will be be picked and the exporting is continued. ``` β”Œβ”€Consumer #1─┐ @@ -93,9 +90,9 @@ exporters: otlp: endpoint: sending_queue: - persistent_storage_enabled: true + storage: file_storage/otc extensions: - file_storage: + file_storage/otc: directory: /var/lib/storage/otc timeout: 10s service: @@ -112,3 +109,5 @@ service: exporters: [otlp] ``` + +[filestorage]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage diff --git a/exporter/exporterhelper/queued_retry_experimental.go b/exporter/exporterhelper/queued_retry_experimental.go index 983650b1305..773911c0396 100644 --- a/exporter/exporterhelper/queued_retry_experimental.go +++ b/exporter/exporterhelper/queued_retry_experimental.go @@ -44,8 +44,9 @@ type QueueSettings struct { NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` - // PersistentStorageEnabled describes whether persistence via a file storage extension is enabled - PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *config.ComponentID `mapstructure:"storage"` } // NewDefaultQueueSettings returns the default settings for QueueSettings. @@ -57,8 +58,7 @@ func NewDefaultQueueSettings() QueueSettings { // This is a pretty decent value for production. // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, // multiply that by the number of requests per seconds. - QueueSize: 5000, - PersistentStorageEnabled: false, + QueueSize: 5000, } } @@ -76,8 +76,8 @@ func (qCfg *QueueSettings) Validate() error { } var ( - errNoStorageClient = errors.New("no storage client extension found") - errMultipleStorageClients = errors.New("multiple storage extensions found") + errNoStorageClient = errors.New("no storage client extension found") + errWrongExtensionType = errors.New("requested extension is not a storage extension") ) type queuedRetrySender struct { @@ -120,7 +120,7 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu onTemporaryFailure: qrs.onTemporaryFailure, } - if !qCfg.PersistentStorageEnabled { + if qCfg.StorageID == nil { qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {}) } // The Persistent Queue is initialized separately as it needs extra information about the component @@ -128,38 +128,39 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu return qrs } -func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) { - var storageExtension storage.Extension - for _, ext := range host.GetExtensions() { - if se, ok := ext.(storage.Extension); ok { - if storageExtension != nil { - return nil, errMultipleStorageClients - } - storageExtension = se +func getStorageExtension(extensions map[config.ComponentID]component.Extension, storageID config.ComponentID) (storage.Extension, error) { + if ext, found := extensions[storageID]; found { + if storageExt, ok := ext.(storage.Extension); ok { + return storageExt, nil } + return nil, errWrongExtensionType } + return nil, errNoStorageClient +} - if storageExtension == nil { - return nil, errNoStorageClient +func toStorageClient(ctx context.Context, storageID config.ComponentID, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) { + extension, err := getStorageExtension(host.GetExtensions(), storageID) + if err != nil { + return nil, err } - client, err := storageExtension.GetClient(ctx, component.KindExporter, id, string(signal)) + client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal)) if err != nil { return nil, err } - return &client, err + return client, err } // initializePersistentQueue uses extra information for initialization available from component.Host func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if qrs.cfg.PersistentStorageEnabled { - storageClient, err := getStorageClient(ctx, host, qrs.id, qrs.signal) + if qrs.cfg.StorageID != nil { + storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal) if err != nil { return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler) + qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true diff --git a/exporter/exporterhelper/queued_retry_experimental_test.go b/exporter/exporterhelper/queued_retry_experimental_test.go index f8c80083011..9deb641b34a 100644 --- a/exporter/exporterhelper/queued_retry_experimental_test.go +++ b/exporter/exporterhelper/queued_retry_experimental_test.go @@ -70,31 +70,32 @@ func TestGetRetrySettings(t *testing.T) { desc string storage storage.Extension numStorages int - storageEnabled bool + storageIndex int expectedError error getClientError error }{ { - desc: "no storage selected", - numStorages: 0, - expectedError: errNoStorageClient, + desc: "obtain storage extension by name", + numStorages: 2, + storageIndex: 0, + expectedError: nil, }, { - desc: "obtain default storage extension", - numStorages: 1, - storageEnabled: true, - expectedError: nil, + desc: "fail on not existing storage extension", + numStorages: 2, + storageIndex: 100, + expectedError: errNoStorageClient, }, { - desc: "fail on obtaining default storage extension", - numStorages: 2, - storageEnabled: true, - expectedError: errMultipleStorageClients, + desc: "invalid extension type", + numStorages: 2, + storageIndex: 100, + expectedError: errNoStorageClient, }, { desc: "fail on error getting storage client from extension", numStorages: 1, - storageEnabled: true, + storageIndex: 0, expectedError: getStorageClientError, getClientError: getStorageClientError, }, @@ -102,7 +103,8 @@ func TestGetRetrySettings(t *testing.T) { for _, tC := range testCases { t.Run(tC.desc, func(t *testing.T) { - // prepare + storageID := config.NewComponentIDWithName("file_storage", strconv.Itoa(tC.storageIndex)) + var extensions = map[config.ComponentID]component.Extension{} for i := 0; i < tC.numStorages; i++ { extensions[config.NewComponentIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError} @@ -111,7 +113,7 @@ func TestGetRetrySettings(t *testing.T) { ownerID := config.NewComponentID("foo_exporter") // execute - client, err := getStorageClient(context.Background(), host, ownerID, config.TracesDataType) + client, err := toStorageClient(context.Background(), storageID, host, ownerID, config.TracesDataType) // verify if tC.expectedError != nil { @@ -125,6 +127,29 @@ func TestGetRetrySettings(t *testing.T) { } } +func TestInvalidStorageExtensionType(t *testing.T) { + storageID := config.NewComponentIDWithName("extension", "extension") + + // make a test extension + factory := componenttest.NewNopExtensionFactory() + extConfig := factory.CreateDefaultConfig() + settings := componenttest.NewNopExtensionCreateSettings() + extension, err := factory.CreateExtension(context.Background(), settings, extConfig) + assert.NoError(t, err) + var extensions = map[config.ComponentID]component.Extension{ + storageID: extension, + } + host := &mockHost{ext: extensions} + ownerID := config.NewComponentID("foo_exporter") + + // execute + client, err := toStorageClient(context.Background(), storageID, host, ownerID, config.TracesDataType) + + // we should get an error about the extension type + assert.ErrorIs(t, err, errWrongExtensionType) + assert.Nil(t, client) +} + // if requeueing is enabled, we eventually retry even if we failed at first func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg := NewDefaultQueueSettings() @@ -182,12 +207,13 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) qCfg := NewDefaultQueueSettings() - qCfg.PersistentStorageEnabled = true // enable persistence + storageID := config.NewComponentIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) var extensions = map[config.ComponentID]component.Extension{ - config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{}, + storageID: &mockStorageExtension{}, } host := &mockHost{ext: extensions} @@ -203,12 +229,13 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) qCfg := NewDefaultQueueSettings() - qCfg.PersistentStorageEnabled = true // enable persistence + storageID := config.NewComponentIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) var extensions = map[config.ComponentID]component.Extension{ - config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{GetClientError: storageError}, + storageID: &mockStorageExtension{GetClientError: storageError}, } host := &mockHost{ext: extensions} From 73bf13f1090dabf9e8543e7c28039957d454df81 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 4 Aug 2022 10:13:04 -0700 Subject: [PATCH 2/6] Improve pdata test coverage (#5798) Signed-off-by: Bogdan --- pdata/internal/common.go | 3 -- pdata/internal/common_test.go | 55 ++++++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/pdata/internal/common.go b/pdata/internal/common.go index c83cd414f4d..5c342b7127e 100644 --- a/pdata/internal/common.go +++ b/pdata/internal/common.go @@ -180,9 +180,6 @@ func newValueFromRaw(iv interface{}) Value { // Type returns the type of the value for this Value. // Calling this function on zero-initialized Value will cause a panic. func (v Value) Type() ValueType { - if v.orig.Value == nil { - return ValueTypeEmpty - } switch v.orig.Value.(type) { case *otlpcommon.AnyValue_StringValue: return ValueTypeString diff --git a/pdata/internal/common_test.go b/pdata/internal/common_test.go index 724dd0c579c..3f2ad80dca1 100644 --- a/pdata/internal/common_test.go +++ b/pdata/internal/common_test.go @@ -78,6 +78,7 @@ func TestAttributeValueType(t *testing.T) { assert.EqualValues(t, "MAP", ValueTypeMap.String()) assert.EqualValues(t, "SLICE", ValueTypeSlice.String()) assert.EqualValues(t, "BYTES", ValueTypeBytes.String()) + assert.EqualValues(t, "", ValueType(100).String()) } func TestAttributeValueMap(t *testing.T) { @@ -181,6 +182,7 @@ func TestNilOrigSetAttributeValue(t *testing.T) { func TestAttributeValueEqual(t *testing.T) { av1 := NewValueEmpty() + assert.True(t, av1.Equal(av1)) // nolint:gocritic av2 := NewValueEmpty() assert.True(t, av1.Equal(av2)) @@ -732,27 +734,29 @@ func TestMap_Clear(t *testing.T) { } func TestMap_RemoveIf(t *testing.T) { - rawMap := map[string]interface{}{ - "k_string": "123", - "k_int": int64(123), - "k_double": float64(1.23), - "k_bool": true, - "k_empty": nil, - "k_bytes": []byte{}, - } - am := NewMapFromRaw(rawMap) - assert.Equal(t, 6, am.Len()) + am := NewMap() + am.UpsertString("k_string", "123") + am.UpsertInt("k_int", int64(123)) + am.UpsertDouble("k_double", float64(1.23)) + am.UpsertBool("k_bool", true) + am.Upsert("k_empty", NewValueEmpty()) + + assert.Equal(t, 5, am.Len()) am.RemoveIf(func(key string, val Value) bool { return key == "k_int" || val.Type() == ValueTypeBool }) - assert.Equal(t, 4, am.Len()) + assert.Equal(t, 3, am.Len()) _, exists := am.Get("k_string") assert.True(t, exists) - _, exists = am.Get("k_bool") - assert.False(t, exists) _, exists = am.Get("k_int") assert.False(t, exists) + _, exists = am.Get("k_double") + assert.True(t, exists) + _, exists = am.Get("k_bool") + assert.False(t, exists) + _, exists = am.Get("k_empty") + assert.True(t, exists) } func BenchmarkAttributeValue_CopyTo(b *testing.B) { @@ -1288,6 +1292,15 @@ func TestNewValueFromRaw(t *testing.T) { return m }(), }, + { + name: "empty map", + input: map[string]interface{}{}, + expected: func() Value { + m := NewValueMap() + NewMapFromRaw(map[string]interface{}{}).CopyTo(m.MapVal()) + return m + }(), + }, { name: "slice", input: []interface{}{"v1", "v2"}, @@ -1297,6 +1310,22 @@ func TestNewValueFromRaw(t *testing.T) { return s })(), }, + { + name: "empty slice", + input: []interface{}{}, + expected: (func() Value { + s := NewValueSlice() + NewSliceFromRaw([]interface{}{}).CopyTo(s.SliceVal()) + return s + })(), + }, + { + name: "invalid value", + input: ValueTypeDouble, + expected: (func() Value { + return NewValueString("") + })(), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From dc421948cd491c2eabfb2e376644a32c8c62547d Mon Sep 17 00:00:00 2001 From: Adam Boguszewski <108867528+aboguszewski-sumo@users.noreply.github.com> Date: Thu, 4 Aug 2022 19:13:41 +0200 Subject: [PATCH 3/6] Add missing dashes to descriptions of flags (#5812) Config and set flags had misleading descriptions that contained examples with one dash before the flag instead of two. Co-authored-by: Bogdan Drutu --- service/flags.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/flags.go b/service/flags.go index 5ea7c61d4e7..04bd36a8694 100644 --- a/service/flags.go +++ b/service/flags.go @@ -48,12 +48,12 @@ func flags() *flag.FlagSet { flagSet := new(flag.FlagSet) flagSet.Var(new(stringArrayValue), configFlag, "Locations to the config file(s), note that only a"+ - " single location can be set per flag entry e.g. `-config=file:/path/to/first --config=file:path/to/second`.") + " single location can be set per flag entry e.g. `--config=file:/path/to/first --config=file:path/to/second`.") flagSet.Var(new(stringArrayValue), setFlag, "Set arbitrary component config property. The component has to be defined in the config file and the flag"+ " has a higher precedence. Array config properties are overridden and maps are joined, note that only a single"+ - " (first) array property can be set e.g. -set=processors.attributes.actions.key=some_key. Example --set=processors.batch.timeout=2s") + " (first) array property can be set e.g. --set=processors.attributes.actions.key=some_key. Example --set=processors.batch.timeout=2s") flagSet.Var( gatesList, From 869dad4022c7ebcc50247d6967c3de12ddf79ce4 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 4 Aug 2022 12:33:07 -0700 Subject: [PATCH 4/6] Remove unnecessary internal package, used only in one place (#5820) Signed-off-by: Bogdan --- .../err_or_sink_consumer.go | 81 ------------------- receiver/otlpreceiver/otlp_test.go | 69 ++++++++++++++-- 2 files changed, 63 insertions(+), 87 deletions(-) delete mode 100644 internal/internalconsumertest/err_or_sink_consumer.go diff --git a/internal/internalconsumertest/err_or_sink_consumer.go b/internal/internalconsumertest/err_or_sink_consumer.go deleted file mode 100644 index 6936e07246d..00000000000 --- a/internal/internalconsumertest/err_or_sink_consumer.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internalconsumertest // import "go.opentelemetry.io/collector/internal/internalconsumertest" - -import ( - "context" - "sync" - - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" -) - -type ErrOrSinkConsumer struct { - *consumertest.TracesSink - *consumertest.MetricsSink - mu sync.Mutex - consumeError error // to be returned by ConsumeTraces, if set -} - -// SetConsumeError sets an error that will be returned by the Consume function. -func (esc *ErrOrSinkConsumer) SetConsumeError(err error) { - esc.mu.Lock() - defer esc.mu.Unlock() - esc.consumeError = err -} - -func (esc *ErrOrSinkConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -// ConsumeTraces stores traces to this sink. -func (esc *ErrOrSinkConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - esc.mu.Lock() - defer esc.mu.Unlock() - - if esc.consumeError != nil { - return esc.consumeError - } - - return esc.TracesSink.ConsumeTraces(ctx, td) -} - -// ConsumeMetrics stores metrics to this sink. -func (esc *ErrOrSinkConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - esc.mu.Lock() - defer esc.mu.Unlock() - - if esc.consumeError != nil { - return esc.consumeError - } - - return esc.MetricsSink.ConsumeMetrics(ctx, md) -} - -// Reset deletes any stored in the sinks, resets error to nil. -func (esc *ErrOrSinkConsumer) Reset() { - esc.mu.Lock() - defer esc.mu.Unlock() - - esc.consumeError = nil - if esc.TracesSink != nil { - esc.TracesSink.Reset() - } - if esc.MetricsSink != nil { - esc.MetricsSink.Reset() - } -} diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index b5fdede1048..908045fad94 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "net" "net/http" + "sync" "testing" "time" @@ -45,11 +46,11 @@ import ( "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/internal/internalconsumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/internal/testutil" "go.opentelemetry.io/collector/obsreport/obsreporttest" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" semconv "go.opentelemetry.io/collector/semconv/v1.5.0" @@ -161,7 +162,7 @@ func TestJsonHttp(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. - sink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} + sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} ocr := newHTTPReceiver(t, addr, sink, nil) require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") @@ -333,7 +334,7 @@ func TestHandleInvalidRequests(t *testing.T) { require.NoError(t, err) } -func testHTTPJSONRequest(t *testing.T, url string, sink *internalconsumertest.ErrOrSinkConsumer, encoding string, expectedErr error) { +func testHTTPJSONRequest(t *testing.T, url string, sink *errOrSinkConsumer, encoding string, expectedErr error) { var buf *bytes.Buffer var err error switch encoding { @@ -412,7 +413,7 @@ func TestProtoHttp(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. - tSink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} + tSink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} ocr := newHTTPReceiver(t, addr, tSink, consumertest.NewNop()) require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") @@ -462,7 +463,7 @@ func createHTTPProtobufRequest( func testHTTPProtobufRequest( t *testing.T, url string, - tSink *internalconsumertest.ErrOrSinkConsumer, + tSink *errOrSinkConsumer, encoding string, traceBytes []byte, expectedErr error, @@ -665,7 +666,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - sink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} + sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} ocr := newGRPCReceiver(t, exporter.receiverTag, addr, sink, nil) require.NotNil(t, ocr) @@ -996,3 +997,59 @@ func exportTraces(cc *grpc.ClientConn, td ptrace.Traces) error { return err } + +type errOrSinkConsumer struct { + *consumertest.TracesSink + *consumertest.MetricsSink + mu sync.Mutex + consumeError error // to be returned by ConsumeTraces, if set +} + +// SetConsumeError sets an error that will be returned by the Consume function. +func (esc *errOrSinkConsumer) SetConsumeError(err error) { + esc.mu.Lock() + defer esc.mu.Unlock() + esc.consumeError = err +} + +func (esc *errOrSinkConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// ConsumeTraces stores traces to this sink. +func (esc *errOrSinkConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + esc.mu.Lock() + defer esc.mu.Unlock() + + if esc.consumeError != nil { + return esc.consumeError + } + + return esc.TracesSink.ConsumeTraces(ctx, td) +} + +// ConsumeMetrics stores metrics to this sink. +func (esc *errOrSinkConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + esc.mu.Lock() + defer esc.mu.Unlock() + + if esc.consumeError != nil { + return esc.consumeError + } + + return esc.MetricsSink.ConsumeMetrics(ctx, md) +} + +// Reset deletes any stored in the sinks, resets error to nil. +func (esc *errOrSinkConsumer) Reset() { + esc.mu.Lock() + defer esc.mu.Unlock() + + esc.consumeError = nil + if esc.TracesSink != nil { + esc.TracesSink.Reset() + } + if esc.MetricsSink != nil { + esc.MetricsSink.Reset() + } +} From 1c1a668faf2fed4ac3489c945a6fd7d4e86b6f27 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 4 Aug 2022 14:01:53 -0700 Subject: [PATCH 5/6] Add support in the confmap.Resolver to expand embedded config URIs inside configuration. (#4742) Signed-off-by: Bogdan Drutu --- CHANGELOG.md | 1 + confmap/README.md | 26 +++- confmap/provider.go | 45 +++++- confmap/provider/internal/provider.go | 2 +- confmap/provider/internal/provider_test.go | 9 +- confmap/resolver.go | 110 +++++++++++++-- confmap/resolver_test.go | 133 +++++++++++++++++- .../expand-with-all-env-with-source.yaml | 11 ++ confmap/testdata/expand-with-all-env.yaml | 11 ++ confmap/testdata/expand-with-no-env.yaml | 10 ++ confmap/testdata/expand-with-partial-env.yaml | 10 ++ 11 files changed, 336 insertions(+), 32 deletions(-) create mode 100644 confmap/testdata/expand-with-all-env-with-source.yaml create mode 100644 confmap/testdata/expand-with-all-env.yaml create mode 100644 confmap/testdata/expand-with-no-env.yaml create mode 100644 confmap/testdata/expand-with-partial-env.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index dbfec61bcce..bcf657f9fab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -192,6 +192,7 @@ There isn't a valid core binary for this release. Use v0.57.2 instead. - Use OpenCensus `metric` package for process metrics instead of `stats` package (#5486) - Update OTLP to v0.18.0 (#5530) - Log histogram min/max fields with `logging` exporter (#5520) +- Add support in the `confmap.Resolver` to expand embedded config URIs inside configuration (#4742) ### 🧰 Bug fixes 🧰 diff --git a/confmap/README.md b/confmap/README.md index c6051429e90..db71340c2b1 100644 --- a/confmap/README.md +++ b/confmap/README.md @@ -33,23 +33,35 @@ The `Resolver` receives as input a set of `Providers`, a list of `Converters`, a `configURI` that will be used to generate the resulting, or effective, configuration in the form of a `Conf`, that can be used by code that is oblivious to the usage of `Providers` and `Converters`. +`Providers` are used to provide an entire configuration when the `configURI` is given directly to the `Resolver`, +or an individual value (partial configuration) when the `configURI` is embedded into the `Conf` as a values using +the syntax `${configURI}`. + ```terminal Resolver Provider - β”‚ β”‚ Resolve β”‚ β”‚ ────────────────►│ β”‚ β”‚ β”‚ β”Œβ”€ β”‚ Retrieve β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Ίβ”‚ - β”‚ β”‚ β”‚ + β”‚ β”‚ Conf β”‚ β”‚ │◄────────────────────────── - foreach β”‚ β”‚ β”‚ + foreach β”‚ β”‚ β”‚ configURI β”‚ β”œβ”€β”€β”€β” β”‚ β”‚ β”‚ β”‚Merge β”‚ β”‚ β”‚β—„β”€β”€β”˜ β”‚ + └─ β”‚ β”‚ + β”Œβ”€ β”‚ Retrieve β”‚ + β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Ίβ”‚ + β”‚ β”‚ Partial Conf Value β”‚ + β”‚ │◄────────────────────────── + foreach β”‚ β”‚ β”‚ + embedded β”‚ β”‚ β”‚ + configURI β”‚ β”œβ”€β”€β”€β” β”‚ + β”‚ β”‚ β”‚Replace β”‚ + β”‚ β”‚β—„β”€β”€β”˜ β”‚ └─ β”‚ β”‚ β”‚ Converter β”‚ - β”‚ β”‚ β”‚ β”Œβ”€ β”‚ Convert β”‚ β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Ίβ”‚ β”‚ foreach β”‚ β”‚ β”‚ β”‚ @@ -57,15 +69,15 @@ that can be used by code that is oblivious to the usage of `Providers` and `Conv └─ β”‚ β”‚ β”‚ β”‚ ◄───────────────── β”‚ - β”‚ β”‚ ``` The `Resolve` method proceeds in the following steps: 1. Start with an empty "result" of `Conf` type. 2. For each config URI retrieves individual configurations, and merges it into the "result". -2. For each "Converter", call "Convert" for the "result". -4. Return the "result", aka effective, configuration. +3. For each embedded config URI retrieves individual value, and replaces it into the "result". +4. For each "Converter", call "Convert" for the "result". +5. Return the "result", aka effective, configuration. ### Watching for Updates After the configuration was processed, the `Resolver` can be used as a single point to watch for updates in the diff --git a/confmap/provider.go b/confmap/provider.go index 2b2801cc7c8..9dfa2b4adb6 100644 --- a/confmap/provider.go +++ b/confmap/provider.go @@ -16,6 +16,7 @@ package confmap // import "go.opentelemetry.io/collector/confmap" import ( "context" + "fmt" ) // Provider is an interface that helps to retrieve a config map and watch for any @@ -81,7 +82,7 @@ type ChangeEvent struct { // Retrieved holds the result of a call to the Retrieve method of a Provider object. type Retrieved struct { - conf *Conf + rawConf interface{} closeFunc CloseFunc } @@ -101,17 +102,39 @@ func WithRetrievedClose(closeFunc CloseFunc) RetrievedOption { } // NewRetrieved returns a new Retrieved instance that contains the data from the raw deserialized config. -func NewRetrieved(rawConf map[string]interface{}, opts ...RetrievedOption) (Retrieved, error) { +// The rawConf can be one of the following types: +// - Primitives: int, int32, int64, float32, float64, bool, string; +// - []interface{}; +// - map[string]interface{}; +func NewRetrieved(rawConf interface{}, opts ...RetrievedOption) (Retrieved, error) { + if err := checkRawConfType(rawConf); err != nil { + return Retrieved{}, err + } set := retrievedSettings{} for _, opt := range opts { opt(&set) } - return Retrieved{conf: NewFromStringMap(rawConf), closeFunc: set.closeFunc}, nil + return Retrieved{rawConf: rawConf, closeFunc: set.closeFunc}, nil } // AsConf returns the retrieved configuration parsed as a Conf. -func (r Retrieved) AsConf() (*Conf, error) { - return r.conf, nil +func (r *Retrieved) AsConf() (*Conf, error) { + if r.rawConf == nil { + return New(), nil + } + val, ok := r.rawConf.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("retrieved value (type=%T) cannot be used as a Conf", r.rawConf) + } + return NewFromStringMap(val), nil +} + +// AsRaw returns the retrieved configuration parsed as an interface{} which can be one of the following types: +// - Primitives: int, int32, int64, float32, float64, bool, string; +// - []interface{} - every member follows the same rules as the given interface{}; +// - map[string]interface{} - every value follows the same rules as the given interface{}; +func (r *Retrieved) AsRaw() (interface{}, error) { + return r.rawConf, nil } // Close and release any watchers that Provider.Retrieve may have created. @@ -129,3 +152,15 @@ func (r Retrieved) Close(ctx context.Context) error { // CloseFunc a function equivalent to Retrieved.Close. type CloseFunc func(context.Context) error + +func checkRawConfType(rawConf interface{}) error { + if rawConf == nil { + return nil + } + switch rawConf.(type) { + case int, int32, int64, float32, float64, bool, string, []interface{}, map[string]interface{}: + return nil + default: + return fmt.Errorf("unsupported type=%T for retrieved config", rawConf) + } +} diff --git a/confmap/provider/internal/provider.go b/confmap/provider/internal/provider.go index e3f6776dbc5..a1f62c5a06e 100644 --- a/confmap/provider/internal/provider.go +++ b/confmap/provider/internal/provider.go @@ -24,7 +24,7 @@ import ( // * yamlBytes the yaml bytes that will be deserialized. // * opts specifies options associated with this Retrieved value, such as CloseFunc. func NewRetrievedFromYAML(yamlBytes []byte, opts ...confmap.RetrievedOption) (confmap.Retrieved, error) { - var rawConf map[string]interface{} + var rawConf interface{} if err := yaml.Unmarshal(yamlBytes, &rawConf); err != nil { return confmap.Retrieved{}, err } diff --git a/confmap/provider/internal/provider_test.go b/confmap/provider/internal/provider_test.go index 0bd0b74a35f..023fa3347fe 100644 --- a/confmap/provider/internal/provider_test.go +++ b/confmap/provider/internal/provider_test.go @@ -46,10 +46,13 @@ func TestNewRetrievedFromYAMLWithOptions(t *testing.T) { func TestNewRetrievedFromYAMLInvalidYAMLBytes(t *testing.T) { _, err := NewRetrievedFromYAML([]byte("[invalid:,")) - require.Error(t, err) + assert.Error(t, err) } func TestNewRetrievedFromYAMLInvalidAsMap(t *testing.T) { - _, err := NewRetrievedFromYAML([]byte("string")) - require.Error(t, err) + ret, err := NewRetrievedFromYAML([]byte("string")) + require.NoError(t, err) + + _, err = ret.AsConf() + assert.Error(t, err) } diff --git a/confmap/resolver.go b/confmap/resolver.go index b97bcef4827..a36fae6652c 100644 --- a/confmap/resolver.go +++ b/confmap/resolver.go @@ -38,6 +38,8 @@ type Resolver struct { sync.Mutex closers []CloseFunc watcher chan error + + enableExpand bool } // ResolverSettings are the settings to configure the behavior of the Resolver. @@ -115,20 +117,14 @@ func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) { // For backwards compatibility: // - empty url scheme means "file". // - "^[A-z]:" also means "file" - scheme := "file" - if idx := strings.Index(uri, ":"); idx != -1 && !driverLetterRegexp.MatchString(uri) { - scheme = uri[:idx] - } else { - uri = scheme + ":" + uri - } - p, ok := mr.providers[scheme] - if !ok { - return nil, fmt.Errorf("scheme %q is not supported for uri %q", scheme, uri) + if driverLetterRegexp.MatchString(uri) { + uri = "file:" + uri } - ret, err := p.Retrieve(ctx, uri, mr.onChange) + ret, err := mr.retrieveValue(ctx, location{uri: uri, defaultScheme: "file"}) if err != nil { - return nil, err + return nil, fmt.Errorf("cannot retrieve the configuration: %w", err) } + mr.closers = append(mr.closers, ret.Close) retCfgMap, err := ret.AsConf() if err != nil { return nil, err @@ -136,7 +132,18 @@ func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) { if err = retMap.Merge(retCfgMap); err != nil { return nil, err } - mr.closers = append(mr.closers, ret.Close) + } + + if mr.enableExpand { + cfgMap := make(map[string]interface{}) + for _, k := range retMap.AllKeys() { + val, err := mr.expandValueRecursively(ctx, retMap.Get(k)) + if err != nil { + return nil, err + } + cfgMap[k] = val + } + retMap = NewFromStringMap(cfgMap) } // Apply the converters in the given order. @@ -187,3 +194,82 @@ func (mr *Resolver) closeIfNeeded(ctx context.Context) error { } return err } + +func (mr *Resolver) expandValueRecursively(ctx context.Context, value interface{}) (interface{}, error) { + for i := 0; i < 100; i++ { + val, changed, err := mr.expandValue(ctx, value) + if err != nil { + return nil, err + } + if !changed { + return val, nil + } + value = val + } + return nil, errors.New("too many recursive expansions") +} + +func (mr *Resolver) expandValue(ctx context.Context, value interface{}) (interface{}, bool, error) { + switch v := value.(type) { + case string: + // If it doesn't have the format "${scheme:opaque}" no need to expand. + if !strings.HasPrefix(v, "${") || !strings.HasSuffix(v, "}") { + return value, false, nil + } + uri := v[2 : len(v)-1] + // For backwards compatibility: + // - empty scheme means "env". + ret, err := mr.retrieveValue(ctx, location{uri: uri, defaultScheme: "env"}) + if err != nil { + return nil, false, err + } + mr.closers = append(mr.closers, ret.Close) + val, err := ret.AsRaw() + return val, true, err + case []interface{}: + nslice := make([]interface{}, 0, len(v)) + nchanged := false + for _, vint := range v { + val, changed, err := mr.expandValue(ctx, vint) + if err != nil { + return nil, false, err + } + nslice = append(nslice, val) + nchanged = nchanged || changed + } + return nslice, nchanged, nil + case map[string]interface{}: + nmap := map[string]interface{}{} + nchanged := false + for mk, mv := range v { + val, changed, err := mr.expandValue(ctx, mv) + if err != nil { + return nil, false, err + } + nmap[mk] = val + nchanged = nchanged || changed + } + return nmap, nchanged, nil + } + return value, false, nil +} + +type location struct { + uri string + defaultScheme string +} + +func (mr *Resolver) retrieveValue(ctx context.Context, l location) (Retrieved, error) { + uri := l.uri + scheme := l.defaultScheme + if idx := strings.Index(uri, ":"); idx != -1 { + scheme = uri[:idx] + } else { + uri = scheme + ":" + uri + } + p, ok := mr.providers[scheme] + if !ok { + return Retrieved{}, fmt.Errorf("scheme %q is not supported for uri %q", scheme, uri) + } + return p.Retrieve(ctx, uri, mr.onChange) +} diff --git a/confmap/resolver_test.go b/confmap/resolver_test.go index 6d01ee12547..88663193c9f 100644 --- a/confmap/resolver_test.go +++ b/confmap/resolver_test.go @@ -27,7 +27,7 @@ import ( type mockProvider struct { scheme string - retM map[string]interface{} + retM interface{} errR error errS error errW error @@ -41,9 +41,8 @@ func (m *mockProvider) Retrieve(_ context.Context, _ string, watcher WatcherFunc if m.retM == nil { return NewRetrieved(nil) } - if watcher != nil { - watcher(&ChangeEvent{Error: m.errW}) - } + + watcher(&ChangeEvent{Error: m.errW}) return NewRetrieved(m.retM, WithRetrievedClose(func(ctx context.Context) error { return m.errC })) } @@ -122,6 +121,15 @@ func TestResolverErrors(t *testing.T) { }, expectResolveErr: true, }, + { + name: "retrieve location not convertable to Conf", + locations: []string{"mock:", "err:"}, + providers: []Provider{ + &mockProvider{}, + &mockProvider{scheme: "err", retM: "invalid value"}, + }, + expectResolveErr: true, + }, { name: "converter error", locations: []string{"mock:"}, @@ -306,6 +314,123 @@ func TestResolverShutdownClosesWatch(t *testing.T) { watcherWG.Wait() } +func TestResolverExpandEnvVars(t *testing.T) { + var testCases = []struct { + name string // test case name (also file name containing config yaml) + }{ + {name: "expand-with-no-env.yaml"}, + {name: "expand-with-partial-env.yaml"}, + {name: "expand-with-all-env.yaml"}, + {name: "expand-with-all-env-with-source.yaml"}, + } + + envs := map[string]string{ + "EXTRA": "some string", + "EXTRA_MAP_VALUE_1": "some map value_1", + "EXTRA_MAP_VALUE_2": "some map value_2", + "EXTRA_LIST_MAP_VALUE_1": "some list map value_1", + "EXTRA_LIST_MAP_VALUE_2": "some list map value_2", + "EXTRA_LIST_VALUE_1": "some list value_1", + "EXTRA_LIST_VALUE_2": "some list value_2", + } + + expectedCfgMap := newConfFromFile(t, filepath.Join("testdata", "expand-with-no-env.yaml")) + fileProvider := newFakeProvider("file", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(newConfFromFile(t, uri[5:])) + }) + envProvider := newFakeProvider("env", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(envs[uri[4:]]) + }) + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + resolver, err := NewResolver(ResolverSettings{URIs: []string{filepath.Join("testdata", test.name)}, Providers: makeMapProvidersMap(fileProvider, envProvider), Converters: nil}) + require.NoError(t, err) + resolver.enableExpand = true + // Test that expanded configs are the same with the simple config with no env vars. + cfgMap, err := resolver.Resolve(context.Background()) + require.NoError(t, err) + assert.Equal(t, expectedCfgMap, cfgMap.ToStringMap()) + }) + } +} + +func TestResolverExpandMapAndSliceValues(t *testing.T) { + provider := newFakeProvider("input", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(map[string]interface{}{ + "test_map": map[string]interface{}{"recv": "${test:MAP_VALUE}"}, + "test_slice": []interface{}{"${test:MAP_VALUE}"}}) + }) + + const receiverExtraMapValue = "some map value" + testProvider := newFakeProvider("test", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(receiverExtraMapValue) + }) + + resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) + require.NoError(t, err) + resolver.enableExpand = true + + cfgMap, err := resolver.Resolve(context.Background()) + require.NoError(t, err) + expectedMap := map[string]interface{}{ + "test_map": map[string]interface{}{"recv": receiverExtraMapValue}, + "test_slice": []interface{}{receiverExtraMapValue}} + assert.Equal(t, expectedMap, cfgMap.ToStringMap()) +} + +func TestResolverInfiniteExpand(t *testing.T) { + const receiverValue = "${test:VALUE}" + provider := newFakeProvider("input", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(map[string]interface{}{"test": receiverValue}) + }) + + testProvider := newFakeProvider("test", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(receiverValue) + }) + + resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) + require.NoError(t, err) + resolver.enableExpand = true + + _, err = resolver.Resolve(context.Background()) + assert.Error(t, err) +} + +func TestResolverExpandSliceValueError(t *testing.T) { + provider := newFakeProvider("input", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(map[string]interface{}{"test": []interface{}{"${test:VALUE}"}}) + }) + + testProvider := newFakeProvider("test", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(errors.New("invalid value")) + }) + + resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) + require.NoError(t, err) + resolver.enableExpand = true + + _, err = resolver.Resolve(context.Background()) + assert.Error(t, err) +} + +func TestResolverExpandMapValueError(t *testing.T) { + provider := newFakeProvider("input", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(map[string]interface{}{"test": []interface{}{map[string]interface{}{"test": "${test:VALUE}"}}}) + }) + + testProvider := newFakeProvider("test", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(errors.New("invalid value")) + }) + + resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) + require.NoError(t, err) + resolver.enableExpand = true + + _, err = resolver.Resolve(context.Background()) + assert.Error(t, err) +} + func makeMapProvidersMap(providers ...Provider) map[string]Provider { ret := make(map[string]Provider, len(providers)) for _, provider := range providers { diff --git a/confmap/testdata/expand-with-all-env-with-source.yaml b/confmap/testdata/expand-with-all-env-with-source.yaml new file mode 100644 index 00000000000..5386e05158a --- /dev/null +++ b/confmap/testdata/expand-with-all-env-with-source.yaml @@ -0,0 +1,11 @@ +test_map: + extra: "${env:EXTRA}" + extra_map: + recv.1: "${env:EXTRA_MAP_VALUE_1}" + recv.2: "${env:EXTRA_MAP_VALUE_2}" + extra_list_map: + - { recv.1: "${env:EXTRA_LIST_MAP_VALUE_1}",recv.2: "${env:EXTRA_LIST_MAP_VALUE_2}" } + extra_list: + - "${env:EXTRA_LIST_VALUE_1}" + - "${env:EXTRA_LIST_VALUE_2}" + diff --git a/confmap/testdata/expand-with-all-env.yaml b/confmap/testdata/expand-with-all-env.yaml new file mode 100644 index 00000000000..ed623bf9a57 --- /dev/null +++ b/confmap/testdata/expand-with-all-env.yaml @@ -0,0 +1,11 @@ +test_map: + extra: "${EXTRA}" + extra_map: + recv.1: "${EXTRA_MAP_VALUE_1}" + recv.2: "${EXTRA_MAP_VALUE_2}" + extra_list_map: + - { recv.1: "${EXTRA_LIST_MAP_VALUE_1}",recv.2: "${EXTRA_LIST_MAP_VALUE_2}" } + extra_list: + - "${EXTRA_LIST_VALUE_1}" + - "${EXTRA_LIST_VALUE_2}" + diff --git a/confmap/testdata/expand-with-no-env.yaml b/confmap/testdata/expand-with-no-env.yaml new file mode 100644 index 00000000000..fd4dd08210d --- /dev/null +++ b/confmap/testdata/expand-with-no-env.yaml @@ -0,0 +1,10 @@ +test_map: + extra: "some string" + extra_map: + recv.1: "some map value_1" + recv.2: "some map value_2" + extra_list_map: + - { recv.1: "some list map value_1",recv.2: "some list map value_2" } + extra_list: + - "some list value_1" + - "some list value_2" diff --git a/confmap/testdata/expand-with-partial-env.yaml b/confmap/testdata/expand-with-partial-env.yaml new file mode 100644 index 00000000000..fb8ffe51b8a --- /dev/null +++ b/confmap/testdata/expand-with-partial-env.yaml @@ -0,0 +1,10 @@ +test_map: + extra: "${EXTRA}" + extra_map: + recv.1: "${EXTRA_MAP_VALUE_1}" + recv.2: "some map value_2" + extra_list_map: + - { recv.1: "some list map value_1",recv.2: "${EXTRA_LIST_MAP_VALUE_2}" } + extra_list: + - "some list value_1" + - "${EXTRA_LIST_VALUE_2}" From d7b097cf0745850a3353cf97952d828ab0d64eea Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 4 Aug 2022 14:02:14 -0700 Subject: [PATCH 6/6] Remove the InstrumentationLibrary to Scope translation, OTLP 0.19 no longer supports it. (#5819) * Remove the InstrumentationLibrary to Scope translation, OTLP 0.19 no longer supports it. Signed-off-by: Bogdan * Update CHANGELOG.md Co-authored-by: Alex Boten --- CHANGELOG.md | 2 + pdata/internal/otlp/logs.go | 31 +--- pdata/internal/otlp/metrics.go | 31 +--- pdata/internal/otlp/traces.go | 31 +--- pdata/plog/json.go | 2 +- pdata/plog/plogotlp/logs.go | 6 +- pdata/plog/plogotlp/logs_test.go | 163 --------------------- pdata/pmetric/json.go | 2 +- pdata/pmetric/pmetricotlp/metrics.go | 4 +- pdata/pmetric/pmetricotlp/metrics_test.go | 151 -------------------- pdata/ptrace/json.go | 12 +- pdata/ptrace/json_test.go | 16 +-- pdata/ptrace/ptraceotlp/traces.go | 6 +- pdata/ptrace/ptraceotlp/traces_test.go | 164 ---------------------- 14 files changed, 35 insertions(+), 586 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bcf657f9fab..4936ac646b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### πŸ›‘ Breaking changes πŸ›‘ +- Remove the InstrumentationLibrary to Scope translation (part of transition to OTLP 0.19). (#5819) + - This has a side effect that when sending JSON encoded telemetry using OTLP proto <= 0.15.0, telemetry will be dropped. - Require the storage to be explicitly set for the (experimental) persistent queue (#5784) - Remove deprecated `confighttp.HTTPClientSettings.ToClientWithHost` (#5803) - Remove deprecated component stability helpers (#5802): diff --git a/pdata/internal/otlp/logs.go b/pdata/internal/otlp/logs.go index a94fc24b6da..d89a36de8a9 100644 --- a/pdata/internal/otlp/logs.go +++ b/pdata/internal/otlp/logs.go @@ -15,34 +15,9 @@ package otlp // import "go.opentelemetry.io/collector/pdata/internal/otlp" import ( - otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1" otlplogs "go.opentelemetry.io/collector/pdata/internal/data/protogen/logs/v1" ) -// InstrumentationLibraryLogsToScope implements the translation of resource logs data -// following the v0.15.0 upgrade: -// -// receivers SHOULD check if instrumentation_library_logs is set -// and scope_logs is not set then the value in instrumentation_library_logs -// SHOULD be used instead by converting InstrumentationLibraryLogs into ScopeLogs. -// If scope_logs is set then instrumentation_library_logs SHOULD be ignored. -// -// https://github.com/open-telemetry/opentelemetry-proto/blob/3c2915c01a9fb37abfc0415ec71247c4978386b0/opentelemetry/proto/logs/v1/logs.proto#L58 -func InstrumentationLibraryLogsToScope(rls []*otlplogs.ResourceLogs) { - for _, rl := range rls { - if len(rl.ScopeLogs) == 0 { - for _, ill := range rl.InstrumentationLibraryLogs { - scopeLogs := otlplogs.ScopeLogs{ - Scope: otlpcommon.InstrumentationScope{ - Name: ill.InstrumentationLibrary.Name, - Version: ill.InstrumentationLibrary.Version, - }, - LogRecords: ill.LogRecords, - SchemaUrl: ill.SchemaUrl, - } - rl.ScopeLogs = append(rl.ScopeLogs, &scopeLogs) - } - } - rl.InstrumentationLibraryLogs = nil - } -} +// MigrateLogs implements any translation needed due to deprecation in OTLP logs protocol. +// Any plog.Unmarshaler implementation from OTLP (proto/json) MUST call this, and the gRPC Server implementation. +func MigrateLogs(_ []*otlplogs.ResourceLogs) {} diff --git a/pdata/internal/otlp/metrics.go b/pdata/internal/otlp/metrics.go index 7f65ec5d9e0..775ace72b83 100644 --- a/pdata/internal/otlp/metrics.go +++ b/pdata/internal/otlp/metrics.go @@ -15,34 +15,9 @@ package otlp // import "go.opentelemetry.io/collector/pdata/internal/otlp" import ( - otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1" otlpmetrics "go.opentelemetry.io/collector/pdata/internal/data/protogen/metrics/v1" ) -// InstrumentationLibraryMetricsToScope implements the translation of resource metrics data -// following the v0.15.0 upgrade: -// -// receivers SHOULD check if instrumentation_library_metrics is set -// and scope_metrics is not set then the value in instrumentation_library_metrics -// SHOULD be used instead by converting InstrumentationLibraryMetrics into ScopeMetrics. -// If scope_metrics is set then instrumentation_library_metrics SHOULD be ignored. -// -// https://github.com/open-telemetry/opentelemetry-proto/blob/3c2915c01a9fb37abfc0415ec71247c4978386b0/opentelemetry/proto/metrics/v1/metrics.proto#L58 -func InstrumentationLibraryMetricsToScope(rms []*otlpmetrics.ResourceMetrics) { - for _, rm := range rms { - if len(rm.ScopeMetrics) == 0 { - for _, ilm := range rm.InstrumentationLibraryMetrics { - scopeMetrics := otlpmetrics.ScopeMetrics{ - Scope: otlpcommon.InstrumentationScope{ - Name: ilm.InstrumentationLibrary.Name, - Version: ilm.InstrumentationLibrary.Version, - }, - Metrics: ilm.Metrics, - SchemaUrl: ilm.SchemaUrl, - } - rm.ScopeMetrics = append(rm.ScopeMetrics, &scopeMetrics) - } - } - rm.InstrumentationLibraryMetrics = nil - } -} +// MigrateMetrics implements any translation needed due to deprecation in OTLP metrics protocol. +// Any pmetric.Unmarshaler implementation from OTLP (proto/json) MUST call this, and the gRPC Server implementation. +func MigrateMetrics(_ []*otlpmetrics.ResourceMetrics) {} diff --git a/pdata/internal/otlp/traces.go b/pdata/internal/otlp/traces.go index 0658f2a19d4..70d202aea12 100644 --- a/pdata/internal/otlp/traces.go +++ b/pdata/internal/otlp/traces.go @@ -15,34 +15,9 @@ package otlp // import "go.opentelemetry.io/collector/pdata/internal/otlp" import ( - otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1" otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1" ) -// InstrumentationLibraryToScope implements the translation of resource span data -// following the v0.15.0 upgrade: -// -// receivers SHOULD check if instrumentation_library_spans is set -// and scope_spans is not set then the value in instrumentation_library_spans -// SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans. -// If scope_spans is set then instrumentation_library_spans SHOULD be ignored. -// -// https://github.com/open-telemetry/opentelemetry-proto/blob/3c2915c01a9fb37abfc0415ec71247c4978386b0/opentelemetry/proto/trace/v1/trace.proto#L58 -func InstrumentationLibrarySpansToScope(rss []*otlptrace.ResourceSpans) { - for _, rs := range rss { - if len(rs.ScopeSpans) == 0 { - for _, ils := range rs.InstrumentationLibrarySpans { - scopeSpans := otlptrace.ScopeSpans{ - Scope: otlpcommon.InstrumentationScope{ - Name: ils.InstrumentationLibrary.Name, - Version: ils.InstrumentationLibrary.Version, - }, - Spans: ils.Spans, - SchemaUrl: ils.SchemaUrl, - } - rs.ScopeSpans = append(rs.ScopeSpans, &scopeSpans) - } - } - rs.InstrumentationLibrarySpans = nil - } -} +// MigrateTraces implements any translation needed due to deprecation in OTLP traces protocol. +// Any ptrace.Unmarshaler implementation from OTLP (proto/json) MUST call this, and the gRPC Server implementation. +func MigrateTraces(_ []*otlptrace.ResourceSpans) {} diff --git a/pdata/plog/json.go b/pdata/plog/json.go index d66e31a7872..050651ae740 100644 --- a/pdata/plog/json.go +++ b/pdata/plog/json.go @@ -62,6 +62,6 @@ func (d *jsonUnmarshaler) UnmarshalLogs(buf []byte) (Logs, error) { if err := d.delegate.Unmarshal(bytes.NewReader(buf), &ld); err != nil { return Logs{}, err } - otlp.InstrumentationLibraryLogsToScope(ld.ResourceLogs) + otlp.MigrateLogs(ld.ResourceLogs) return internal.LogsFromProto(ld), nil } diff --git a/pdata/plog/plogotlp/logs.go b/pdata/plog/plogotlp/logs.go index 822370c2273..4026fdc9db1 100644 --- a/pdata/plog/plogotlp/logs.go +++ b/pdata/plog/plogotlp/logs.go @@ -92,7 +92,7 @@ func (lr Request) UnmarshalProto(data []byte) error { if err := lr.orig.Unmarshal(data); err != nil { return err } - otlp.InstrumentationLibraryLogsToScope(lr.orig.ResourceLogs) + otlp.MigrateLogs(lr.orig.ResourceLogs) return nil } @@ -110,7 +110,7 @@ func (lr Request) UnmarshalJSON(data []byte) error { if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), lr.orig); err != nil { return err } - otlp.InstrumentationLibraryLogsToScope(lr.orig.ResourceLogs) + otlp.MigrateLogs(lr.orig.ResourceLogs) return nil } @@ -162,7 +162,7 @@ type rawLogsServer struct { } func (s rawLogsServer) Export(ctx context.Context, request *otlpcollectorlog.ExportLogsServiceRequest) (*otlpcollectorlog.ExportLogsServiceResponse, error) { - otlp.InstrumentationLibraryLogsToScope(request.ResourceLogs) + otlp.MigrateLogs(request.ResourceLogs) rsp, err := s.srv.Export(ctx, Request{orig: request}) return rsp.orig, err } diff --git a/pdata/plog/plogotlp/logs_test.go b/pdata/plog/plogotlp/logs_test.go index 95a3adc6747..9f85a541b51 100644 --- a/pdata/plog/plogotlp/logs_test.go +++ b/pdata/plog/plogotlp/logs_test.go @@ -31,8 +31,6 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/test/bufconn" - v1 "go.opentelemetry.io/collector/pdata/internal/data/protogen/logs/v1" - "go.opentelemetry.io/collector/pdata/internal/otlp" "go.opentelemetry.io/collector/pdata/plog" ) @@ -65,67 +63,6 @@ var logsRequestJSON = []byte(` ] }`) -var logsTransitionData = [][]byte{ - []byte(` - { - "resourceLogs": [ - { - "resource": {}, - "instrumentationLibraryLogs": [ - { - "instrumentationLibrary": {}, - "logRecords": [ - { - "body": { - "stringValue": "test_log_record" - }, - "traceId": "", - "spanId": "" - } - ] - } - ] - } - ] - }`), - []byte(` - { - "resourceLogs": [ - { - "resource": {}, - "instrumentationLibraryLogs": [ - { - "instrumentationLibrary": {}, - "logRecords": [ - { - "body": { - "stringValue": "test_log_record" - }, - "traceId": "", - "spanId": "" - } - ] - } - ], - "scopeLogs": [ - { - "scope": {}, - "logRecords": [ - { - "body": { - "stringValue": "test_log_record" - }, - "traceId": "", - "spanId": "" - } - ] - } - ] - } - ] - }`), -} - func TestRequestToPData(t *testing.T) { tr := NewRequest() assert.Equal(t, tr.Logs().LogRecordCount(), 0) @@ -143,18 +80,6 @@ func TestRequestJSON(t *testing.T) { assert.Equal(t, strings.Join(strings.Fields(string(logsRequestJSON)), ""), string(got)) } -func TestRequestJSONTransition(t *testing.T) { - for _, data := range logsTransitionData { - lr := NewRequest() - assert.NoError(t, lr.UnmarshalJSON(data)) - assert.Equal(t, "test_log_record", lr.Logs().ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString()) - - got, err := lr.MarshalJSON() - assert.NoError(t, err) - assert.Equal(t, strings.Join(strings.Fields(string(logsRequestJSON)), ""), string(got)) - } -} - func TestGrpc(t *testing.T) { lis := bufconn.Listen(1024 * 1024) s := grpc.NewServer() @@ -188,83 +113,6 @@ func TestGrpc(t *testing.T) { assert.Equal(t, NewResponse(), resp) } -func TestGrpcTransition(t *testing.T) { - lis := bufconn.Listen(1024 * 1024) - s := grpc.NewServer() - RegisterServer(s, &fakeLogsServer{t: t}) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.NoError(t, s.Serve(lis)) - }() - t.Cleanup(func() { - s.Stop() - wg.Wait() - }) - - cc, err := grpc.Dial("bufnet", - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return lis.Dial() - }), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock()) - assert.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, cc.Close()) - }) - - logClient := NewClient(cc) - - req := generateLogsRequestWithInstrumentationLibrary() - otlp.InstrumentationLibraryLogsToScope(req.orig.ResourceLogs) - resp, err := logClient.Export(context.Background(), req) - assert.NoError(t, err) - assert.Equal(t, NewResponse(), resp) -} - -type fakeRawServer struct { - t *testing.T -} - -func (s fakeRawServer) Export(_ context.Context, req Request) (Response, error) { - assert.Equal(s.t, 1, req.Logs().LogRecordCount()) - return NewResponse(), nil -} - -func TestGrpcExport(t *testing.T) { - lis := bufconn.Listen(1024 * 1024) - s := grpc.NewServer() - RegisterServer(s, &fakeRawServer{t: t}) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.NoError(t, s.Serve(lis)) - }() - t.Cleanup(func() { - s.Stop() - wg.Wait() - }) - - cc, err := grpc.Dial("bufnet", - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return lis.Dial() - }), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock()) - assert.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, cc.Close()) - }) - - logClient := NewClient(cc) - - resp, err := logClient.Export(context.Background(), generateLogsRequestWithInstrumentationLibrary()) - assert.NoError(t, err) - assert.Equal(t, NewResponse(), resp) -} - func TestGrpcError(t *testing.T) { lis := bufconn.Listen(1024 * 1024) s := grpc.NewServer() @@ -316,14 +164,3 @@ func generateLogsRequest() Request { ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStringVal("test_log_record") return NewRequestFromLogs(ld) } - -func generateLogsRequestWithInstrumentationLibrary() Request { - lr := generateLogsRequest() - lr.orig.ResourceLogs[0].InstrumentationLibraryLogs = []*v1.InstrumentationLibraryLogs{ //nolint:staticcheck // SA1019 ignore this! - { - LogRecords: lr.orig.ResourceLogs[0].ScopeLogs[0].LogRecords, - }, - } - lr.orig.ResourceLogs[0].ScopeLogs = []*v1.ScopeLogs{} - return lr -} diff --git a/pdata/pmetric/json.go b/pdata/pmetric/json.go index c570eb163d5..f1a210fd028 100644 --- a/pdata/pmetric/json.go +++ b/pdata/pmetric/json.go @@ -61,6 +61,6 @@ func (d *jsonUnmarshaler) UnmarshalMetrics(buf []byte) (Metrics, error) { if err := d.delegate.Unmarshal(bytes.NewReader(buf), &md); err != nil { return Metrics{}, err } - otlp.InstrumentationLibraryMetricsToScope(md.ResourceMetrics) + otlp.MigrateMetrics(md.ResourceMetrics) return internal.MetricsFromProto(md), nil } diff --git a/pdata/pmetric/pmetricotlp/metrics.go b/pdata/pmetric/pmetricotlp/metrics.go index 7499e6e185c..01fe2bafa3e 100644 --- a/pdata/pmetric/pmetricotlp/metrics.go +++ b/pdata/pmetric/pmetricotlp/metrics.go @@ -106,7 +106,7 @@ func (mr Request) UnmarshalJSON(data []byte) error { if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), mr.orig); err != nil { return err } - otlp.InstrumentationLibraryMetricsToScope(mr.orig.ResourceMetrics) + otlp.MigrateMetrics(mr.orig.ResourceMetrics) return nil } @@ -158,7 +158,7 @@ type rawMetricsServer struct { } func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetrics.ExportMetricsServiceRequest) (*otlpcollectormetrics.ExportMetricsServiceResponse, error) { - otlp.InstrumentationLibraryMetricsToScope(request.ResourceMetrics) + otlp.MigrateMetrics(request.ResourceMetrics) rsp, err := s.srv.Export(ctx, Request{orig: request}) return rsp.orig, err } diff --git a/pdata/pmetric/pmetricotlp/metrics_test.go b/pdata/pmetric/pmetricotlp/metrics_test.go index 6ca4112e8c5..efc607dac7a 100644 --- a/pdata/pmetric/pmetricotlp/metrics_test.go +++ b/pdata/pmetric/pmetricotlp/metrics_test.go @@ -31,8 +31,6 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/test/bufconn" - v1 "go.opentelemetry.io/collector/pdata/internal/data/protogen/metrics/v1" - "go.opentelemetry.io/collector/pdata/internal/otlp" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -61,55 +59,6 @@ var metricsRequestJSON = []byte(` ] }`) -var metricsTransitionData = [][]byte{ - []byte(` - { - "resourceMetrics": [ - { - "resource": {}, - "instrumentationLibraryMetrics": [ - { - "instrumentationLibrary": {}, - "metrics": [ - { - "name": "test_metric" - } - ] - } - ] - } - ] - }`), - []byte(` - { - "resourceMetrics": [ - { - "resource": {}, - "instrumentationLibraryMetrics": [ - { - "instrumentationLibrary": {}, - "metrics": [ - { - "name": "test_metric" - } - ] - } - ], - "scopeMetrics": [ - { - "scope": {}, - "metrics": [ - { - "name": "test_metric" - } - ] - } - ] - } - ] - }`), -} - func TestRequestToPData(t *testing.T) { tr := NewRequest() assert.Equal(t, tr.Metrics().MetricCount(), 0) @@ -127,18 +76,6 @@ func TestRequestJSON(t *testing.T) { assert.Equal(t, strings.Join(strings.Fields(string(metricsRequestJSON)), ""), string(got)) } -func TestRequestJSONTransition(t *testing.T) { - for _, data := range metricsTransitionData { - mr := NewRequest() - assert.NoError(t, mr.UnmarshalJSON(data)) - assert.Equal(t, "test_metric", mr.Metrics().ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Name()) - - got, err := mr.MarshalJSON() - assert.NoError(t, err) - assert.Equal(t, strings.Join(strings.Fields(string(metricsRequestJSON)), ""), string(got)) - } -} - func TestGrpc(t *testing.T) { lis := bufconn.Listen(1024 * 1024) s := grpc.NewServer() @@ -172,83 +109,6 @@ func TestGrpc(t *testing.T) { assert.Equal(t, NewResponse(), resp) } -func TestGrpcTransition(t *testing.T) { - lis := bufconn.Listen(1024 * 1024) - s := grpc.NewServer() - RegisterServer(s, &fakeMetricsServer{t: t}) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.NoError(t, s.Serve(lis)) - }() - t.Cleanup(func() { - s.Stop() - wg.Wait() - }) - - cc, err := grpc.Dial("bufnet", - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return lis.Dial() - }), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock()) - assert.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, cc.Close()) - }) - - logClient := NewClient(cc) - - req := generateMetricsRequestWithInstrumentationLibrary() - otlp.InstrumentationLibraryMetricsToScope(req.orig.ResourceMetrics) - resp, err := logClient.Export(context.Background(), req) - assert.NoError(t, err) - assert.Equal(t, NewResponse(), resp) -} - -type fakeRawServer struct { - t *testing.T -} - -func (s fakeRawServer) Export(_ context.Context, req Request) (Response, error) { - assert.Equal(s.t, 1, req.Metrics().DataPointCount()) - return NewResponse(), nil -} - -func TestGrpcExport(t *testing.T) { - lis := bufconn.Listen(1024 * 1024) - s := grpc.NewServer() - RegisterServer(s, &fakeRawServer{t: t}) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.NoError(t, s.Serve(lis)) - }() - t.Cleanup(func() { - s.Stop() - wg.Wait() - }) - - cc, err := grpc.Dial("bufnet", - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return lis.Dial() - }), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock()) - assert.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, cc.Close()) - }) - - metricClient := NewClient(cc) - - resp, err := metricClient.Export(context.Background(), generateMetricsRequestWithInstrumentationLibrary()) - assert.NoError(t, err) - assert.Equal(t, NewResponse(), resp) -} - func TestGrpcError(t *testing.T) { lis := bufconn.Listen(1024 * 1024) s := grpc.NewServer() @@ -303,14 +163,3 @@ func generateMetricsRequest() Request { m.Gauge().DataPoints().AppendEmpty() return NewRequestFromMetrics(md) } - -func generateMetricsRequestWithInstrumentationLibrary() Request { - mr := generateMetricsRequest() - mr.orig.ResourceMetrics[0].InstrumentationLibraryMetrics = []*v1.InstrumentationLibraryMetrics{ //nolint:staticcheck // SA1019 ignore this! - { - Metrics: mr.orig.ResourceMetrics[0].ScopeMetrics[0].Metrics, - }, - } - mr.orig.ResourceMetrics[0].ScopeMetrics = []*v1.ScopeMetrics{} - return mr -} diff --git a/pdata/ptrace/json.go b/pdata/ptrace/json.go index bddee762cf3..e2592361893 100644 --- a/pdata/ptrace/json.go +++ b/pdata/ptrace/json.go @@ -100,10 +100,10 @@ func readResourceSpans(iter *jsoniter.Iterator) *otlptrace.ResourceSpans { } return true }) - case "instrumentationLibrarySpans", "instrumentation_library_spans", "scopeSpans", "scope_spans": + case "scopeSpans", "scope_spans": iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { rs.ScopeSpans = append(rs.ScopeSpans, - readInstrumentationLibrarySpans(iter)) + readScopeSpans(iter)) return true }) case "schemaUrl", "schema_url": @@ -116,12 +116,12 @@ func readResourceSpans(iter *jsoniter.Iterator) *otlptrace.ResourceSpans { return rs } -func readInstrumentationLibrarySpans(iter *jsoniter.Iterator) *otlptrace.ScopeSpans { +func readScopeSpans(iter *jsoniter.Iterator) *otlptrace.ScopeSpans { ils := &otlptrace.ScopeSpans{} iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { switch f { - case "instrumentationLibrary", "instrumentation_library", "scope": + case "scope": iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { switch f { case "name": @@ -129,7 +129,7 @@ func readInstrumentationLibrarySpans(iter *jsoniter.Iterator) *otlptrace.ScopeSp case "version": ils.Scope.Version = iter.ReadString() default: - iter.ReportError("readInstrumentationLibrarySpans.instrumentationLibrary", fmt.Sprintf("unknown field:%v", f)) + iter.ReportError("readScopeSpans.instrumentationLibrary", fmt.Sprintf("unknown field:%v", f)) } return true }) @@ -141,7 +141,7 @@ func readInstrumentationLibrarySpans(iter *jsoniter.Iterator) *otlptrace.ScopeSp case "schemaUrl", "schema_url": ils.SchemaUrl = iter.ReadString() default: - iter.ReportError("readInstrumentationLibrarySpans", fmt.Sprintf("unknown field:%v", f)) + iter.ReportError("readScopeSpans", fmt.Sprintf("unknown field:%v", f)) } return true }) diff --git a/pdata/ptrace/json_test.go b/pdata/ptrace/json_test.go index 37d251dd08e..9d241eeaff9 100644 --- a/pdata/ptrace/json_test.go +++ b/pdata/ptrace/json_test.go @@ -70,10 +70,10 @@ var tracesOTLPFull = func() Traces { rs.Resource().Attributes().UpsertString("host.name", "testHost") rs.Resource().Attributes().UpsertString("service.name", "testService") rs.Resource().SetDroppedAttributesCount(1) - // Add InstrumentationLibrarySpans. + // Add ScopeSpans. il := rs.ScopeSpans().AppendEmpty() - il.Scope().SetName("instrumentation name") - il.Scope().SetVersion("instrumentation version") + il.Scope().SetName("scope name") + il.Scope().SetVersion("scope version") il.SetSchemaUrl("schemaURL") // Add spans. sp := il.Spans().AppendEmpty() @@ -206,21 +206,21 @@ func TestReadResourceSpansUnknownResourceField(t *testing.T) { } } -func TestReadInstrumentationLibrarySpansUnknownField(t *testing.T) { +func TestReadScopeSpansUnknownField(t *testing.T) { jsonStr := `{"extra":""}` iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) defer jsoniter.ConfigFastest.ReturnIterator(iter) - readInstrumentationLibrarySpans(iter) + readScopeSpans(iter) if assert.Error(t, iter.Error) { assert.Contains(t, iter.Error.Error(), "unknown field") } } -func TestReadInstrumentationLibrarySpansUnknownInstrumentationLibraryField(t *testing.T) { - jsonStr := `{"instrumentationLibrary":{"extra":""}}` +func TestReadScopeSpansUnknownScopeField(t *testing.T) { + jsonStr := `{"scope":{"extra":""}}` iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) defer jsoniter.ConfigFastest.ReturnIterator(iter) - readInstrumentationLibrarySpans(iter) + readScopeSpans(iter) if assert.Error(t, iter.Error) { assert.Contains(t, iter.Error.Error(), "unknown field") } diff --git a/pdata/ptrace/ptraceotlp/traces.go b/pdata/ptrace/ptraceotlp/traces.go index 16884555d01..ddbe475c170 100644 --- a/pdata/ptrace/ptraceotlp/traces.go +++ b/pdata/ptrace/ptraceotlp/traces.go @@ -92,7 +92,7 @@ func (tr Request) UnmarshalProto(data []byte) error { if err := tr.orig.Unmarshal(data); err != nil { return err } - otlp.InstrumentationLibrarySpansToScope(tr.orig.ResourceSpans) + otlp.MigrateTraces(tr.orig.ResourceSpans) return nil } @@ -110,7 +110,7 @@ func (tr Request) UnmarshalJSON(data []byte) error { if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), tr.orig); err != nil { return err } - otlp.InstrumentationLibrarySpansToScope(tr.orig.ResourceSpans) + otlp.MigrateTraces(tr.orig.ResourceSpans) return nil } @@ -163,7 +163,7 @@ type rawTracesServer struct { } func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortrace.ExportTraceServiceRequest) (*otlpcollectortrace.ExportTraceServiceResponse, error) { - otlp.InstrumentationLibrarySpansToScope(request.ResourceSpans) + otlp.MigrateTraces(request.ResourceSpans) rsp, err := s.srv.Export(ctx, Request{orig: request}) return rsp.orig, err } diff --git a/pdata/ptrace/ptraceotlp/traces_test.go b/pdata/ptrace/ptraceotlp/traces_test.go index 0b4f48b9817..13767cfb60e 100644 --- a/pdata/ptrace/ptraceotlp/traces_test.go +++ b/pdata/ptrace/ptraceotlp/traces_test.go @@ -31,8 +31,6 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/test/bufconn" - v1 "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1" - "go.opentelemetry.io/collector/pdata/internal/otlp" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -65,67 +63,6 @@ var tracesRequestJSON = []byte(` ] }`) -var tracesTransitionData = [][]byte{ - []byte(` - { - "resourceSpans": [ - { - "resource": {}, - "instrumentationLibrarySpans": [ - { - "instrumentationLibrary": {}, - "spans": [ - { - "traceId": "", - "spanId":"", - "parentSpanId":"", - "name": "test_span", - "status": {} - } - ] - } - ] - } - ] - }`), - []byte(` - { - "resourceSpans": [ - { - "resource": {}, - "instrumentationLibrarySpans": [ - { - "instrumentationLibrary": {}, - "spans": [ - { - "traceId": "", - "spanId":"", - "parentSpanId":"", - "name": "test_span", - "status": {} - } - ] - } - ], - "scopeSpans": [ - { - "scope": {}, - "spans": [ - { - "traceId": "", - "spanId":"", - "parentSpanId":"", - "name": "test_span", - "status": {} - } - ] - } - ] - } - ] - }`), -} - func TestRequestToPData(t *testing.T) { tr := NewRequest() assert.Equal(t, tr.Traces().SpanCount(), 0) @@ -143,18 +80,6 @@ func TestRequestJSON(t *testing.T) { assert.Equal(t, strings.Join(strings.Fields(string(tracesRequestJSON)), ""), string(got)) } -func TestRequestJSONTransition(t *testing.T) { - for _, data := range tracesTransitionData { - tr := NewRequest() - assert.NoError(t, tr.UnmarshalJSON(data)) - assert.Equal(t, "test_span", tr.Traces().ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name()) - - got, err := tr.MarshalJSON() - assert.NoError(t, err) - assert.Equal(t, strings.Join(strings.Fields(string(tracesRequestJSON)), ""), string(got)) - } -} - func TestGrpc(t *testing.T) { lis := bufconn.Listen(1024 * 1024) s := grpc.NewServer() @@ -188,84 +113,6 @@ func TestGrpc(t *testing.T) { assert.Equal(t, NewResponse(), resp) } -func TestGrpcTransition(t *testing.T) { - lis := bufconn.Listen(1024 * 1024) - s := grpc.NewServer() - RegisterServer(s, &fakeTracesServer{t: t}) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.NoError(t, s.Serve(lis)) - }() - t.Cleanup(func() { - s.Stop() - wg.Wait() - }) - - cc, err := grpc.Dial("bufnet", - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return lis.Dial() - }), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock()) - assert.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, cc.Close()) - }) - - logClient := NewClient(cc) - - req := generateTracesRequestWithInstrumentationLibrary() - otlp.InstrumentationLibrarySpansToScope(req.orig.ResourceSpans) - resp, err := logClient.Export(context.Background(), req) - assert.NoError(t, err) - assert.Equal(t, NewResponse(), resp) -} - -type fakeRawServer struct { - t *testing.T -} - -func (s fakeRawServer) Export(_ context.Context, req Request) (Response, error) { - assert.Equal(s.t, 1, req.Traces().SpanCount()) - return NewResponse(), nil -} - -func TestGrpcExport(t *testing.T) { - lis := bufconn.Listen(1024 * 1024) - s := grpc.NewServer() - - RegisterServer(s, &fakeRawServer{t: t}) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.NoError(t, s.Serve(lis)) - }() - t.Cleanup(func() { - s.Stop() - wg.Wait() - }) - - cc, err := grpc.Dial("bufnet", - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return lis.Dial() - }), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock()) - assert.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, cc.Close()) - }) - traceClient := NewClient(cc) - - req := generateTracesRequestWithInstrumentationLibrary() - resp, err := traceClient.Export(context.Background(), req) - assert.NoError(t, err) - assert.Equal(t, NewResponse(), resp) -} - func TestGrpcError(t *testing.T) { lis := bufconn.Listen(1024 * 1024) s := grpc.NewServer() @@ -317,14 +164,3 @@ func generateTracesRequest() Request { td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetName("test_span") return NewRequestFromTraces(td) } - -func generateTracesRequestWithInstrumentationLibrary() Request { - tr := generateTracesRequest() - tr.orig.ResourceSpans[0].InstrumentationLibrarySpans = []*v1.InstrumentationLibrarySpans{ //nolint:staticcheck // SA1019 ignore this! - { - Spans: tr.orig.ResourceSpans[0].ScopeSpans[0].Spans, - }, - } - tr.orig.ResourceSpans[0].ScopeSpans = []*v1.ScopeSpans{} - return tr -}