From 321eb566c075b9119f5f548aa2f999dc64faa2c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Thu, 4 Aug 2022 21:25:07 +0200 Subject: [PATCH] Enable persistent queue in the build by default --- CHANGELOG.md | 1 + Makefile | 4 +- Makefile.Common | 8 - exporter/exporterhelper/README.md | 9 +- .../internal/persistent_queue.go | 3 - .../internal/persistent_queue_test.go | 3 - .../internal/persistent_storage.go | 3 - .../internal/persistent_storage_batch.go | 3 - .../internal/persistent_storage_batch_test.go | 3 - .../internal/persistent_storage_test.go | 3 - .../queued_retry_experimental.go | 246 ------------------ .../queued_retry_experimental_test.go | 3 - .../exporterhelper/queued_retry_inmemory.go | 117 +++++++-- 13 files changed, 96 insertions(+), 310 deletions(-) delete mode 100644 exporter/exporterhelper/queued_retry_experimental.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b4968ebea1..e918228bfb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ ### 💡 Enhancements 💡 +- Enable persistent queue in the build by default (#5828) - Bump to opentelemetry-proto v0.19.0. (#5823) - Expose `Scope.Attributes` in pdata (#5826) diff --git a/Makefile b/Makefile index bff7c435ff2..33807a6b152 100644 --- a/Makefile +++ b/Makefile @@ -58,7 +58,7 @@ gomoddownload: .PHONY: gotest gotest: - @$(MAKE) for-all-target TARGET="test test-unstable" + @$(MAKE) for-all-target TARGET="test" .PHONY: gobenchmark gobenchmark: @@ -75,7 +75,7 @@ goporto: .PHONY: golint golint: - @$(MAKE) for-all-target TARGET="lint lint-unstable" + @$(MAKE) for-all-target TARGET="lint" .PHONY: goimpi goimpi: diff --git a/Makefile.Common b/Makefile.Common index daf1e14dc87..662c55190cb 100644 --- a/Makefile.Common +++ b/Makefile.Common @@ -18,10 +18,6 @@ GH := $(shell which gh) test: $(GOTEST) $(GOTEST_OPT) ./... -.PHONY: test-unstable -test-unstable: - $(GOTEST) $(GOTEST_OPT) -tags enable_unstable ./... - .PHONY: test-with-cover test-with-cover: $(GO_ACC) --output=coverage.out ./... @@ -44,10 +40,6 @@ tidy: lint: $(LINT) run -.PHONY: lint-unstable -lint-unstable: - $(LINT) run --build-tags enable_unstable - .PHONY: generate generate: $(GOCMD) generate ./... diff --git a/exporter/exporterhelper/README.md b/exporter/exporterhelper/README.md index 5fbb9dd5731..25805edcc31 100644 --- a/exporter/exporterhelper/README.md +++ b/exporter/exporterhelper/README.md @@ -28,16 +28,14 @@ The following configuration options can be modified: ### Persistent Queue -**Status: under development** +**Status: [alpha]** -> :warning: The capability is under development and currently can be enabled only in OpenTelemetry -> Collector Contrib with `enable_unstable` build tag set. +> :warning: The capability is under development. To use it, a storage extension needs to be set up. -With this build tag set, additional configuration option can be enabled: +To use the persistent queue, the following setting needs to be set: - `sending_queue` - `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for the persistent queue - (note, `enable_unstable` build tag needs to be enabled first, see below for more details) The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which, similarly as for in-memory buffering, defaults to 5000 batches). @@ -111,3 +109,4 @@ service: ``` [filestorage]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage +[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index 3f95a896588..290e36d7860 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build enable_unstable -// +build enable_unstable - package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" import ( diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index c013719209b..9a8bfa7c9b6 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build enable_unstable -// +build enable_unstable - package internal import ( diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index d58a3020233..2be1a8eda01 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build enable_unstable -// +build enable_unstable - package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" import ( diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index 3dfcb5f2f16..4c40850a396 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build enable_unstable -// +build enable_unstable - package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" import ( diff --git a/exporter/exporterhelper/internal/persistent_storage_batch_test.go b/exporter/exporterhelper/internal/persistent_storage_batch_test.go index 9a7aef221b8..496a767b6b2 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch_test.go @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build enable_unstable -// +build enable_unstable - package internal import ( diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index a08231fda65..93ecd84379d 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build enable_unstable -// +build enable_unstable - package internal import ( diff --git a/exporter/exporterhelper/queued_retry_experimental.go b/exporter/exporterhelper/queued_retry_experimental.go deleted file mode 100644 index 773911c0396..00000000000 --- a/exporter/exporterhelper/queued_retry_experimental.go +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build enable_unstable -// +build enable_unstable - -package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" - -import ( - "context" - "errors" - "fmt" - - "go.opencensus.io/metric/metricdata" - "go.opentelemetry.io/otel/attribute" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - "go.opentelemetry.io/collector/extension/experimental/storage" - "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" -) - -// queued_retry_experimental includes the code for both memory-backed and persistent-storage backed queued retry helpers -// enabled by setting "enable_unstable" build tag - -// QueueSettings defines configuration for queueing batches before sending to the consumerSender. -type QueueSettings struct { - // Enabled indicates whether to not enqueue batches before sending to the consumerSender. - Enabled bool `mapstructure:"enabled"` - // NumConsumers is the number of consumers from the queue. - NumConsumers int `mapstructure:"num_consumers"` - // QueueSize is the maximum number of batches allowed in queue at a given time. - QueueSize int `mapstructure:"queue_size"` - // StorageID if not empty, enables the persistent storage and uses the component specified - // as a storage extension for the persistent queue - StorageID *config.ComponentID `mapstructure:"storage"` -} - -// NewDefaultQueueSettings returns the default settings for QueueSettings. -func NewDefaultQueueSettings() QueueSettings { - return QueueSettings{ - Enabled: true, - NumConsumers: 10, - // For 5000 queue elements at 100 requests/sec gives about 50 sec of survival of destination outage. - // This is a pretty decent value for production. - // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, - // multiply that by the number of requests per seconds. - QueueSize: 5000, - } -} - -// Validate checks if the QueueSettings configuration is valid -func (qCfg *QueueSettings) Validate() error { - if !qCfg.Enabled { - return nil - } - - if qCfg.QueueSize <= 0 { - return errors.New("queue size must be positive") - } - - return nil -} - -var ( - errNoStorageClient = errors.New("no storage client extension found") - errWrongExtensionType = errors.New("requested extension is not a storage extension") -) - -type queuedRetrySender struct { - fullName string - id config.ComponentID - signal config.DataType - cfg QueueSettings - consumerSender requestSender - queue internal.ProducerConsumerQueue - retryStopCh chan struct{} - traceAttributes []attribute.KeyValue - logger *zap.Logger - requeuingEnabled bool - requestUnmarshaler internal.RequestUnmarshaler -} - -func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { - retryStopCh := make(chan struct{}) - sampledLogger := createSampledLogger(logger) - traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) - - qrs := &queuedRetrySender{ - fullName: id.String(), - id: id, - signal: signal, - cfg: qCfg, - retryStopCh: retryStopCh, - traceAttributes: []attribute.KeyValue{traceAttr}, - logger: sampledLogger, - requestUnmarshaler: reqUnmarshaler, - } - - qrs.consumerSender = &retrySender{ - traceAttribute: traceAttr, - cfg: rCfg, - nextSender: nextSender, - stopCh: retryStopCh, - logger: sampledLogger, - // Following three functions actually depend on queuedRetrySender - onTemporaryFailure: qrs.onTemporaryFailure, - } - - if qCfg.StorageID == nil { - qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {}) - } - // The Persistent Queue is initialized separately as it needs extra information about the component - - return qrs -} - -func getStorageExtension(extensions map[config.ComponentID]component.Extension, storageID config.ComponentID) (storage.Extension, error) { - if ext, found := extensions[storageID]; found { - if storageExt, ok := ext.(storage.Extension); ok { - return storageExt, nil - } - return nil, errWrongExtensionType - } - return nil, errNoStorageClient -} - -func toStorageClient(ctx context.Context, storageID config.ComponentID, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) { - extension, err := getStorageExtension(host.GetExtensions(), storageID) - if err != nil { - return nil, err - } - - client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal)) - if err != nil { - return nil, err - } - - return client, err -} - -// initializePersistentQueue uses extra information for initialization available from component.Host -func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if qrs.cfg.StorageID != nil { - storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal) - if err != nil { - return err - } - - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) - - // TODO: this can be further exposed as a config param rather than relying on a type of queue - qrs.requeuingEnabled = true - } - - return nil -} - -func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req request, err error) error { - if !qrs.requeuingEnabled || qrs.queue == nil { - logger.Error( - "Exporting failed. No more retries left. Dropping data.", - zap.Error(err), - zap.Int("dropped_items", req.count()), - ) - return err - } - - if qrs.queue.Produce(req) { - logger.Error( - "Exporting failed. Putting back to the end of the queue.", - zap.Error(err), - ) - } else { - logger.Error( - "Exporting failed. Queue did not accept requeuing request. Dropping data.", - zap.Error(err), - zap.Int("dropped_items", req.count()), - ) - } - return err -} - -// start is invoked during service startup. -func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) error { - err := qrs.initializePersistentQueue(ctx, host) - if err != nil { - return err - } - - qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) { - req := item.(request) - _ = qrs.consumerSender.send(req) - req.OnProcessingFinished() - }) - - // Start reporting queue length metric - if qrs.cfg.Enabled { - err := globalInstruments.queueSize.UpsertEntry(func() int64 { - return int64(qrs.queue.Size()) - }, metricdata.NewLabelValue(qrs.fullName)) - if err != nil { - return fmt.Errorf("failed to create retry queue size metric: %w", err) - } - err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.cfg.QueueSize) - }, metricdata.NewLabelValue(qrs.fullName)) - if err != nil { - return fmt.Errorf("failed to create retry queue capacity metric: %w", err) - } - } - - return nil -} - -// shutdown is invoked during service shutdown. -func (qrs *queuedRetrySender) shutdown() { - // Cleanup queue metrics reporting - if qrs.cfg.Enabled { - _ = globalInstruments.queueSize.UpsertEntry(func() int64 { - return int64(0) - }, metricdata.NewLabelValue(qrs.fullName)) - } - - // First Stop the retry goroutines, so that unblocks the queue numWorkers. - close(qrs.retryStopCh) - - // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only - // try once every request. - if qrs.queue != nil { - qrs.queue.Stop() - } -} diff --git a/exporter/exporterhelper/queued_retry_experimental_test.go b/exporter/exporterhelper/queued_retry_experimental_test.go index 9deb641b34a..4065b09436d 100644 --- a/exporter/exporterhelper/queued_retry_experimental_test.go +++ b/exporter/exporterhelper/queued_retry_experimental_test.go @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build enable_unstable -// +build enable_unstable - package exporterhelper import ( diff --git a/exporter/exporterhelper/queued_retry_inmemory.go b/exporter/exporterhelper/queued_retry_inmemory.go index 61eb1da0c1a..cc3b3325ce8 100644 --- a/exporter/exporterhelper/queued_retry_inmemory.go +++ b/exporter/exporterhelper/queued_retry_inmemory.go @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build !enable_unstable -// +build !enable_unstable - package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" import ( @@ -29,11 +26,12 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) -// queued_retry_inmemory includes the code for memory-backed (original) queued retry helper only -// enabled when "enable_unstable" build tag is not set +// queued_retry_experimental includes the code for both memory-backed and persistent-storage backed queued retry helpers +// enabled by setting "enable_unstable" build tag // QueueSettings defines configuration for queueing batches before sending to the consumerSender. type QueueSettings struct { @@ -43,6 +41,9 @@ type QueueSettings struct { NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *config.ComponentID `mapstructure:"storage"` } // NewDefaultQueueSettings returns the default settings for QueueSettings. @@ -71,45 +72,100 @@ func (qCfg *QueueSettings) Validate() error { return nil } +var ( + errNoStorageClient = errors.New("no storage client extension found") + errWrongExtensionType = errors.New("requested extension is not a storage extension") +) + type queuedRetrySender struct { - fullName string - cfg QueueSettings - consumerSender requestSender - queue internal.ProducerConsumerQueue - retryStopCh chan struct{} - traceAttributes []attribute.KeyValue - logger *zap.Logger - // currently this is always false for the in-memory queue - // it's here for consistency with the persistent queue - requeuingEnabled bool + fullName string + id config.ComponentID + signal config.DataType + cfg QueueSettings + consumerSender requestSender + queue internal.ProducerConsumerQueue + retryStopCh chan struct{} + traceAttributes []attribute.KeyValue + logger *zap.Logger + requeuingEnabled bool + requestUnmarshaler internal.RequestUnmarshaler } -func newQueuedRetrySender(id config.ComponentID, _ config.DataType, qCfg QueueSettings, rCfg RetrySettings, _ internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { +func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) qrs := &queuedRetrySender{ - fullName: id.String(), - cfg: qCfg, - queue: internal.NewBoundedMemoryQueue(qCfg.QueueSize, func(item interface{}) {}), - retryStopCh: retryStopCh, - traceAttributes: []attribute.KeyValue{traceAttr}, - logger: sampledLogger, + fullName: id.String(), + id: id, + signal: signal, + cfg: qCfg, + retryStopCh: retryStopCh, + traceAttributes: []attribute.KeyValue{traceAttr}, + logger: sampledLogger, + requestUnmarshaler: reqUnmarshaler, } qrs.consumerSender = &retrySender{ - traceAttribute: traceAttr, - cfg: rCfg, - nextSender: nextSender, - stopCh: retryStopCh, - logger: sampledLogger, + traceAttribute: traceAttr, + cfg: rCfg, + nextSender: nextSender, + stopCh: retryStopCh, + logger: sampledLogger, + // Following three functions actually depend on queuedRetrySender onTemporaryFailure: qrs.onTemporaryFailure, } + if qCfg.StorageID == nil { + qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {}) + } + // The Persistent Queue is initialized separately as it needs extra information about the component + return qrs } +func getStorageExtension(extensions map[config.ComponentID]component.Extension, storageID config.ComponentID) (storage.Extension, error) { + if ext, found := extensions[storageID]; found { + if storageExt, ok := ext.(storage.Extension); ok { + return storageExt, nil + } + return nil, errWrongExtensionType + } + return nil, errNoStorageClient +} + +func toStorageClient(ctx context.Context, storageID config.ComponentID, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) { + extension, err := getStorageExtension(host.GetExtensions(), storageID) + if err != nil { + return nil, err + } + + client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal)) + if err != nil { + return nil, err + } + + return client, err +} + +// initializePersistentQueue uses extra information for initialization available from component.Host +func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { + if qrs.cfg.StorageID != nil { + storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal) + if err != nil { + return err + } + + qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) + + // TODO: this can be further exposed as a config param rather than relying on a type of queue + qrs.requeuingEnabled = true + } + + return nil +} + func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req request, err error) error { if !qrs.requeuingEnabled || qrs.queue == nil { logger.Error( @@ -136,7 +192,12 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req request } // start is invoked during service startup. -func (qrs *queuedRetrySender) start(context.Context, component.Host) error { +func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) error { + err := qrs.initializePersistentQueue(ctx, host) + if err != nil { + return err + } + qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) { req := item.(request) _ = qrs.consumerSender.send(req)