From 112a044c9e40b482b1cd86f4f4c3027ec1958b93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Tue, 2 Aug 2022 14:58:23 +0200 Subject: [PATCH] Require the storage to be explicitly set for persistent queue --- exporter/exporterhelper/README.md | 11 ++--- .../queued_retry_experimental.go | 49 +++++++++++-------- .../queued_retry_experimental_test.go | 41 ++++++++++------ 3 files changed, 59 insertions(+), 42 deletions(-) diff --git a/exporter/exporterhelper/README.md b/exporter/exporterhelper/README.md index 51bd4383096..fa5103165fe 100644 --- a/exporter/exporterhelper/README.md +++ b/exporter/exporterhelper/README.md @@ -36,16 +36,13 @@ The following configuration options can be modified: With this build tag set, additional configuration option can be enabled: - `sending_queue` - - `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension + - `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for the persistent queue (note, `enable_unstable` build tag needs to be enabled first, see below for more details) The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which, similarly as for in-memory buffering, defaults to 5000 batches). -When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using -[file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage). -If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and -the exporting is continued. +When persistent queue is enabled, the batches are being buffered using the provided storage extension - [filestorage] is a popular and safe choice. If the collector instance is killed while having some items in the persistent queue, on restart the items will be be picked and the exporting is continued. ``` ┌─Consumer #1─┐ @@ -93,9 +90,9 @@ exporters: otlp: endpoint: sending_queue: - persistent_storage_enabled: true + storage: file_storage/otc extensions: - file_storage: + file_storage/otc: directory: /var/lib/storage/otc timeout: 10s service: diff --git a/exporter/exporterhelper/queued_retry_experimental.go b/exporter/exporterhelper/queued_retry_experimental.go index 983650b1305..44af228c258 100644 --- a/exporter/exporterhelper/queued_retry_experimental.go +++ b/exporter/exporterhelper/queued_retry_experimental.go @@ -44,8 +44,9 @@ type QueueSettings struct { NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` - // PersistentStorageEnabled describes whether persistence via a file storage extension is enabled - PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *config.ComponentID `mapstructure:"storage"` } // NewDefaultQueueSettings returns the default settings for QueueSettings. @@ -57,8 +58,7 @@ func NewDefaultQueueSettings() QueueSettings { // This is a pretty decent value for production. // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, // multiply that by the number of requests per seconds. - QueueSize: 5000, - PersistentStorageEnabled: false, + QueueSize: 5000, } } @@ -76,8 +76,8 @@ func (qCfg *QueueSettings) Validate() error { } var ( - errNoStorageClient = errors.New("no storage client extension found") - errMultipleStorageClients = errors.New("multiple storage extensions found") + errNoStorageClient = errors.New("no storage client extension found") + errWrongExtensionType = errors.New("requested extension is not a storage extension") ) type queuedRetrySender struct { @@ -120,7 +120,7 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu onTemporaryFailure: qrs.onTemporaryFailure, } - if !qCfg.PersistentStorageEnabled { + if qCfg.StorageID == nil { qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {}) } // The Persistent Queue is initialized separately as it needs extra information about the component @@ -128,38 +128,47 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu return qrs } -func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) { - var storageExtension storage.Extension - for _, ext := range host.GetExtensions() { - if se, ok := ext.(storage.Extension); ok { - if storageExtension != nil { - return nil, errMultipleStorageClients +func (qCfg *QueueSettings) getStorageExtension(logger *zap.Logger, extensions map[config.ComponentID]component.Extension) (storage.Extension, error) { + if qCfg.StorageID != nil { + if ext, found := extensions[*qCfg.StorageID]; found { + if storageExt, ok := ext.(storage.Extension); ok { + return storageExt, nil } - storageExtension = se + return nil, errWrongExtensionType + } else { + return nil, errNoStorageClient } } - if storageExtension == nil { + return nil, nil +} + +func (qCfg *QueueSettings) toStorageClient(ctx context.Context, logger *zap.Logger, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) { + extension, err := qCfg.getStorageExtension(logger, host.GetExtensions()) + if err != nil { + return nil, err + } + if extension == nil { return nil, errNoStorageClient } - client, err := storageExtension.GetClient(ctx, component.KindExporter, id, string(signal)) + client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal)) if err != nil { return nil, err } - return &client, err + return client, err } // initializePersistentQueue uses extra information for initialization available from component.Host func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if qrs.cfg.PersistentStorageEnabled { - storageClient, err := getStorageClient(ctx, host, qrs.id, qrs.signal) + if qrs.cfg.StorageID != nil { + storageClient, err := qrs.cfg.toStorageClient(ctx, qrs.logger, host, qrs.id, qrs.signal) if err != nil { return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler) + qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true diff --git a/exporter/exporterhelper/queued_retry_experimental_test.go b/exporter/exporterhelper/queued_retry_experimental_test.go index f8c80083011..abba381addc 100644 --- a/exporter/exporterhelper/queued_retry_experimental_test.go +++ b/exporter/exporterhelper/queued_retry_experimental_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -70,7 +71,7 @@ func TestGetRetrySettings(t *testing.T) { desc string storage storage.Extension numStorages int - storageEnabled bool + storageID string expectedError error getClientError error }{ @@ -80,21 +81,21 @@ func TestGetRetrySettings(t *testing.T) { expectedError: errNoStorageClient, }, { - desc: "obtain default storage extension", - numStorages: 1, - storageEnabled: true, - expectedError: nil, + desc: "obtain storage extension by name", + numStorages: 2, + storageID: "1", + expectedError: nil, }, { - desc: "fail on obtaining default storage extension", - numStorages: 2, - storageEnabled: true, - expectedError: errMultipleStorageClients, + desc: "fail on not existing storage extension", + numStorages: 2, + storageID: "100", + expectedError: errNoStorageClient, }, { desc: "fail on error getting storage client from extension", numStorages: 1, - storageEnabled: true, + storageID: "0", expectedError: getStorageClientError, getClientError: getStorageClientError, }, @@ -103,6 +104,14 @@ func TestGetRetrySettings(t *testing.T) { for _, tC := range testCases { t.Run(tC.desc, func(t *testing.T) { // prepare + cfg := &QueueSettings{ + Enabled: true, + } + if tC.storageID != "" { + compID := config.NewComponentIDWithName("file_storage", tC.storageID) + cfg.StorageID = &compID + } + var extensions = map[config.ComponentID]component.Extension{} for i := 0; i < tC.numStorages; i++ { extensions[config.NewComponentIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError} @@ -111,7 +120,7 @@ func TestGetRetrySettings(t *testing.T) { ownerID := config.NewComponentID("foo_exporter") // execute - client, err := getStorageClient(context.Background(), host, ownerID, config.TracesDataType) + client, err := cfg.toStorageClient(context.Background(), zap.NewNop(), host, ownerID, config.TracesDataType) // verify if tC.expectedError != nil { @@ -182,12 +191,13 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) qCfg := NewDefaultQueueSettings() - qCfg.PersistentStorageEnabled = true // enable persistence + storageID := config.NewComponentIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) var extensions = map[config.ComponentID]component.Extension{ - config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{}, + storageID: &mockStorageExtension{}, } host := &mockHost{ext: extensions} @@ -203,12 +213,13 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) qCfg := NewDefaultQueueSettings() - qCfg.PersistentStorageEnabled = true // enable persistence + storageID := config.NewComponentIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) var extensions = map[config.ComponentID]component.Extension{ - config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{GetClientError: storageError}, + storageID: &mockStorageExtension{GetClientError: storageError}, } host := &mockHost{ext: extensions}