Skip to content

Commit

Permalink
Enable persistent queue in the build by default
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Świątek committed Aug 5, 2022
1 parent 61de3f7 commit 3b0a261
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 705 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

### 💡 Enhancements 💡

- Enable persistent queue in the build by default (#5828)
- Bump to opentelemetry-proto v0.19.0. (#5823)

### 🧰 Bug fixes 🧰
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ gomoddownload:

.PHONY: gotest
gotest:
@$(MAKE) for-all-target TARGET="test test-unstable"
@$(MAKE) for-all-target TARGET="test"

.PHONY: gobenchmark
gobenchmark:
Expand All @@ -75,7 +75,7 @@ goporto:

.PHONY: golint
golint:
@$(MAKE) for-all-target TARGET="lint lint-unstable"
@$(MAKE) for-all-target TARGET="lint"

.PHONY: goimpi
goimpi:
Expand Down
8 changes: 0 additions & 8 deletions Makefile.Common
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ GH := $(shell which gh)
test:
$(GOTEST) $(GOTEST_OPT) ./...

.PHONY: test-unstable
test-unstable:
$(GOTEST) $(GOTEST_OPT) -tags enable_unstable ./...

.PHONY: test-with-cover
test-with-cover:
$(GO_ACC) --output=coverage.out ./...
Expand All @@ -44,10 +40,6 @@ tidy:
lint:
$(LINT) run

.PHONY: lint-unstable
lint-unstable:
$(LINT) run --build-tags enable_unstable

.PHONY: generate
generate:
$(GOCMD) generate ./...
Expand Down
9 changes: 4 additions & 5 deletions exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,14 @@ The following configuration options can be modified:

### Persistent Queue

**Status: under development**
**Status: [alpha]**

> :warning: The capability is under development and currently can be enabled only in OpenTelemetry
> Collector Contrib with `enable_unstable` build tag set.
> :warning: The capability is under development. To use it, a storage extension needs to be set up.
With this build tag set, additional configuration option can be enabled:
To use the persistent queue, the following setting needs to be set:

- `sending_queue`
- `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for the persistent queue
(note, `enable_unstable` build tag needs to be enabled first, see below for more details)

The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which,
similarly as for in-memory buffering, defaults to 5000 batches).
Expand Down Expand Up @@ -111,3 +109,4 @@ service:
```

[filestorage]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage
[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
3 changes: 0 additions & 3 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build enable_unstable
// +build enable_unstable

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
Expand Down
3 changes: 0 additions & 3 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build enable_unstable
// +build enable_unstable

package internal

import (
Expand Down
3 changes: 0 additions & 3 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build enable_unstable
// +build enable_unstable

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
Expand Down
3 changes: 0 additions & 3 deletions exporter/exporterhelper/internal/persistent_storage_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build enable_unstable
// +build enable_unstable

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build enable_unstable
// +build enable_unstable

package internal

import (
Expand Down
3 changes: 0 additions & 3 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build enable_unstable
// +build enable_unstable

package internal

import (
Expand Down
215 changes: 215 additions & 0 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,233 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"go.opencensus.io/metric/metricdata"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

var (
errSendingQueueIsFull = errors.New("sending_queue is full")
)

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
type QueueSettings struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Enabled bool `mapstructure:"enabled"`
// NumConsumers is the number of consumers from the queue.
NumConsumers int `mapstructure:"num_consumers"`
// QueueSize is the maximum number of batches allowed in queue at a given time.
QueueSize int `mapstructure:"queue_size"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *config.ComponentID `mapstructure:"storage"`
}

// NewDefaultQueueSettings returns the default settings for QueueSettings.
func NewDefaultQueueSettings() QueueSettings {
return QueueSettings{
Enabled: true,
NumConsumers: 10,
// For 5000 queue elements at 100 requests/sec gives about 50 sec of survival of destination outage.
// This is a pretty decent value for production.
// User should calculate this from the perspective of how many seconds to buffer in case of a backend outage,
// multiply that by the number of requests per seconds.
QueueSize: 5000,
}
}

// Validate checks if the QueueSettings configuration is valid
func (qCfg *QueueSettings) Validate() error {
if !qCfg.Enabled {
return nil
}

if qCfg.QueueSize <= 0 {
return errors.New("queue size must be positive")
}

return nil
}

var (
errNoStorageClient = errors.New("no storage client extension found")
errWrongExtensionType = errors.New("requested extension is not a storage extension")
)

type queuedRetrySender struct {
fullName string
id config.ComponentID
signal config.DataType
cfg QueueSettings
consumerSender requestSender
queue internal.ProducerConsumerQueue
retryStopCh chan struct{}
traceAttributes []attribute.KeyValue
logger *zap.Logger
requeuingEnabled bool
requestUnmarshaler internal.RequestUnmarshaler
}

func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
retryStopCh := make(chan struct{})
sampledLogger := createSampledLogger(logger)
traceAttr := attribute.String(obsmetrics.ExporterKey, id.String())

qrs := &queuedRetrySender{
fullName: id.String(),
id: id,
signal: signal,
cfg: qCfg,
retryStopCh: retryStopCh,
traceAttributes: []attribute.KeyValue{traceAttr},
logger: sampledLogger,
requestUnmarshaler: reqUnmarshaler,
}

qrs.consumerSender = &retrySender{
traceAttribute: traceAttr,
cfg: rCfg,
nextSender: nextSender,
stopCh: retryStopCh,
logger: sampledLogger,
// Following three functions actually depend on queuedRetrySender
onTemporaryFailure: qrs.onTemporaryFailure,
}

if qCfg.StorageID == nil {
qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {})
}
// The Persistent Queue is initialized separately as it needs extra information about the component

return qrs
}

func getStorageExtension(extensions map[config.ComponentID]component.Extension, storageID config.ComponentID) (storage.Extension, error) {
if ext, found := extensions[storageID]; found {
if storageExt, ok := ext.(storage.Extension); ok {
return storageExt, nil
}
return nil, errWrongExtensionType
}
return nil, errNoStorageClient
}

func toStorageClient(ctx context.Context, storageID config.ComponentID, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) {
extension, err := getStorageExtension(host.GetExtensions(), storageID)
if err != nil {
return nil, err
}

client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal))
if err != nil {
return nil, err
}

return client, err
}

// initializePersistentQueue uses extra information for initialization available from component.Host
func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error {
if qrs.cfg.StorageID != nil {
storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal)
if err != nil {
return err
}

qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler)

// TODO: this can be further exposed as a config param rather than relying on a type of queue
qrs.requeuingEnabled = true
}

return nil
}

func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req request, err error) error {
if !qrs.requeuingEnabled || qrs.queue == nil {
logger.Error(
"Exporting failed. No more retries left. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.count()),
)
return err
}

if qrs.queue.Produce(req) {
logger.Error(
"Exporting failed. Putting back to the end of the queue.",
zap.Error(err),
)
} else {
logger.Error(
"Exporting failed. Queue did not accept requeuing request. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.count()),
)
}
return err
}

// start is invoked during service startup.
func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) error {
err := qrs.initializePersistentQueue(ctx, host)
if err != nil {
return err
}

qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) {
req := item.(request)
_ = qrs.consumerSender.send(req)
req.OnProcessingFinished()
})

// Start reporting queue length metric
if qrs.cfg.Enabled {
err := globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(qrs.queue.Size())
}, metricdata.NewLabelValue(qrs.fullName))
if err != nil {
return fmt.Errorf("failed to create retry queue size metric: %w", err)
}
err = globalInstruments.queueCapacity.UpsertEntry(func() int64 {
return int64(qrs.cfg.QueueSize)
}, metricdata.NewLabelValue(qrs.fullName))
if err != nil {
return fmt.Errorf("failed to create retry queue capacity metric: %w", err)
}
}

return nil
}

// shutdown is invoked during service shutdown.
func (qrs *queuedRetrySender) shutdown() {
// Cleanup queue metrics reporting
if qrs.cfg.Enabled {
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
}, metricdata.NewLabelValue(qrs.fullName))
}

// First Stop the retry goroutines, so that unblocks the queue numWorkers.
close(qrs.retryStopCh)

// Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
if qrs.queue != nil {
qrs.queue.Stop()
}
}

// RetrySettings defines configuration for retrying batches in case of export failure.
// The current supported strategy is exponential backoff.
type RetrySettings struct {
Expand Down
Loading

0 comments on commit 3b0a261

Please sign in to comment.