Skip to content

Commit

Permalink
Require the storage to be explicitly set for persistent queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Świątek committed Aug 2, 2022
1 parent 468a12a commit 112a044
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 42 deletions.
11 changes: 4 additions & 7 deletions exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,13 @@ The following configuration options can be modified:
With this build tag set, additional configuration option can be enabled:

- `sending_queue`
- `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension
- `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for the persistent queue
(note, `enable_unstable` build tag needs to be enabled first, see below for more details)

The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which,
similarly as for in-memory buffering, defaults to 5000 batches).

When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using
[file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage).
If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and
the exporting is continued.
When persistent queue is enabled, the batches are being buffered using the provided storage extension - [filestorage] is a popular and safe choice. If the collector instance is killed while having some items in the persistent queue, on restart the items will be be picked and the exporting is continued.

```
┌─Consumer #1─┐
Expand Down Expand Up @@ -93,9 +90,9 @@ exporters:
otlp:
endpoint: <ENDPOINT>
sending_queue:
persistent_storage_enabled: true
storage: file_storage/otc
extensions:
file_storage:
file_storage/otc:
directory: /var/lib/storage/otc
timeout: 10s
service:
Expand Down
49 changes: 29 additions & 20 deletions exporter/exporterhelper/queued_retry_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ type QueueSettings struct {
NumConsumers int `mapstructure:"num_consumers"`
// QueueSize is the maximum number of batches allowed in queue at a given time.
QueueSize int `mapstructure:"queue_size"`
// PersistentStorageEnabled describes whether persistence via a file storage extension is enabled
PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *config.ComponentID `mapstructure:"storage"`
}

// NewDefaultQueueSettings returns the default settings for QueueSettings.
Expand All @@ -57,8 +58,7 @@ func NewDefaultQueueSettings() QueueSettings {
// This is a pretty decent value for production.
// User should calculate this from the perspective of how many seconds to buffer in case of a backend outage,
// multiply that by the number of requests per seconds.
QueueSize: 5000,
PersistentStorageEnabled: false,
QueueSize: 5000,
}
}

Expand All @@ -76,8 +76,8 @@ func (qCfg *QueueSettings) Validate() error {
}

var (
errNoStorageClient = errors.New("no storage client extension found")
errMultipleStorageClients = errors.New("multiple storage extensions found")
errNoStorageClient = errors.New("no storage client extension found")
errWrongExtensionType = errors.New("requested extension is not a storage extension")
)

type queuedRetrySender struct {
Expand Down Expand Up @@ -120,46 +120,55 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu
onTemporaryFailure: qrs.onTemporaryFailure,
}

if !qCfg.PersistentStorageEnabled {
if qCfg.StorageID == nil {
qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {})
}
// The Persistent Queue is initialized separately as it needs extra information about the component

return qrs
}

func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) {
var storageExtension storage.Extension
for _, ext := range host.GetExtensions() {
if se, ok := ext.(storage.Extension); ok {
if storageExtension != nil {
return nil, errMultipleStorageClients
func (qCfg *QueueSettings) getStorageExtension(logger *zap.Logger, extensions map[config.ComponentID]component.Extension) (storage.Extension, error) {
if qCfg.StorageID != nil {
if ext, found := extensions[*qCfg.StorageID]; found {
if storageExt, ok := ext.(storage.Extension); ok {
return storageExt, nil
}
storageExtension = se
return nil, errWrongExtensionType
} else {
return nil, errNoStorageClient
}
}

if storageExtension == nil {
return nil, nil
}

func (qCfg *QueueSettings) toStorageClient(ctx context.Context, logger *zap.Logger, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) {
extension, err := qCfg.getStorageExtension(logger, host.GetExtensions())
if err != nil {
return nil, err
}
if extension == nil {
return nil, errNoStorageClient
}

client, err := storageExtension.GetClient(ctx, component.KindExporter, id, string(signal))
client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal))
if err != nil {
return nil, err
}

return &client, err
return client, err
}

// initializePersistentQueue uses extra information for initialization available from component.Host
func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error {
if qrs.cfg.PersistentStorageEnabled {
storageClient, err := getStorageClient(ctx, host, qrs.id, qrs.signal)
if qrs.cfg.StorageID != nil {
storageClient, err := qrs.cfg.toStorageClient(ctx, qrs.logger, host, qrs.id, qrs.signal)
if err != nil {
return err
}

qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler)
qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler)

// TODO: this can be further exposed as a config param rather than relying on a type of queue
qrs.requeuingEnabled = true
Expand Down
41 changes: 26 additions & 15 deletions exporter/exporterhelper/queued_retry_experimental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestGetRetrySettings(t *testing.T) {
desc string
storage storage.Extension
numStorages int
storageEnabled bool
storageID string
expectedError error
getClientError error
}{
Expand All @@ -80,21 +81,21 @@ func TestGetRetrySettings(t *testing.T) {
expectedError: errNoStorageClient,
},
{
desc: "obtain default storage extension",
numStorages: 1,
storageEnabled: true,
expectedError: nil,
desc: "obtain storage extension by name",
numStorages: 2,
storageID: "1",
expectedError: nil,
},
{
desc: "fail on obtaining default storage extension",
numStorages: 2,
storageEnabled: true,
expectedError: errMultipleStorageClients,
desc: "fail on not existing storage extension",
numStorages: 2,
storageID: "100",
expectedError: errNoStorageClient,
},
{
desc: "fail on error getting storage client from extension",
numStorages: 1,
storageEnabled: true,
storageID: "0",
expectedError: getStorageClientError,
getClientError: getStorageClientError,
},
Expand All @@ -103,6 +104,14 @@ func TestGetRetrySettings(t *testing.T) {
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
// prepare
cfg := &QueueSettings{
Enabled: true,
}
if tC.storageID != "" {
compID := config.NewComponentIDWithName("file_storage", tC.storageID)
cfg.StorageID = &compID
}

var extensions = map[config.ComponentID]component.Extension{}
for i := 0; i < tC.numStorages; i++ {
extensions[config.NewComponentIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError}
Expand All @@ -111,7 +120,7 @@ func TestGetRetrySettings(t *testing.T) {
ownerID := config.NewComponentID("foo_exporter")

// execute
client, err := getStorageClient(context.Background(), host, ownerID, config.TracesDataType)
client, err := cfg.toStorageClient(context.Background(), zap.NewNop(), host, ownerID, config.TracesDataType)

// verify
if tC.expectedError != nil {
Expand Down Expand Up @@ -182,12 +191,13 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg.PersistentStorageEnabled = true // enable persistence
storageID := config.NewComponentIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())

var extensions = map[config.ComponentID]component.Extension{
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{},
storageID: &mockStorageExtension{},
}
host := &mockHost{ext: extensions}

Expand All @@ -203,12 +213,13 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg.PersistentStorageEnabled = true // enable persistence
storageID := config.NewComponentIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())

var extensions = map[config.ComponentID]component.Extension{
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{GetClientError: storageError},
storageID: &mockStorageExtension{GetClientError: storageError},
}
host := &mockHost{ext: extensions}

Expand Down

0 comments on commit 112a044

Please sign in to comment.