Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local file storage extension #3087

Merged
merged 3 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/hostobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
Expand Down Expand Up @@ -91,6 +92,7 @@ func components() (component.Factories, error) {
hostobserver.NewFactory(),
httpforwarder.NewFactory(),
k8sobserver.NewFactory(),
filestorage.NewFactory(),
}

for _, ext := range factories.Extensions {
Expand Down
3 changes: 0 additions & 3 deletions extension/storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,3 @@ Set(string, []byte) error
Delete(string) error
```
Note: All methods should return error only if a problem occurred. (For example, if a file is no longer accessible, or if a remote service is unavailable.)

# TODO Sample code
- Document component expectations
1 change: 1 addition & 0 deletions extension/storage/filestorage/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
34 changes: 34 additions & 0 deletions extension/storage/filestorage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# File Storage

The File Storage extension can persist state to the local file system.

The extension requires read and write access to a directory. A default directory can be used, but it must already exist in order for the extension to operate.

`directory` is the relative or absolute path to the dedicated data storage directory.

`timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances.


```
extensions:
file_storage:
file_storage/all_settings:
directory: /var/lib/otelcol/mydir
timeout: 1s

service:
extensions: [file_storage, file_storage/all_settings]
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [nop]

# Data pipeline is required to load the config.
receivers:
nop:
processors:
nop:
exporters:
nop:
```
99 changes: 99 additions & 0 deletions extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filestorage

import (
"context"
"errors"
"time"

"go.etcd.io/bbolt"
)

var defaultBucket = []byte(`default`)

type fileStorageClient struct {
db *bbolt.DB
}

func newClient(filePath string, timeout time.Duration) (*fileStorageClient, error) {
options := &bbolt.Options{
Timeout: timeout,
NoSync: true,
}
db, err := bbolt.Open(filePath, 0600, options)
if err != nil {
return nil, err
}

initBucket := func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
}
if err := db.Update(initBucket); err != nil {
return nil, err
}

return &fileStorageClient{db}, nil
}

// Get will retrieve data from storage that corresponds to the specified key
func (c *fileStorageClient) Get(_ context.Context, key string) ([]byte, error) {
var result []byte
get := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
return errors.New("storage not initialized")
}
result = bucket.Get([]byte(key))
return nil // no error
}

if err := c.db.Update(get); err != nil {
return nil, err
}
return result, nil
}

// Set will store data. The data can be retrieved using the same key
func (c *fileStorageClient) Set(_ context.Context, key string, value []byte) error {
set := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
return errors.New("storage not initialized")
}
return bucket.Put([]byte(key), value)
}

return c.db.Update(set)
}

// Delete will delete data associated with the specified key
func (c *fileStorageClient) Delete(_ context.Context, key string) error {
delete := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
return errors.New("storage not initialized")
}
return bucket.Delete([]byte(key))
}

return c.db.Update(delete)
}

// Close will close the database
func (c *fileStorageClient) close() error {
return c.db.Close()
}
194 changes: 194 additions & 0 deletions extension/storage/filestorage/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filestorage

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)

func TestClientOperations(t *testing.T) {
tempDir := newTempDir(t)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(t, err)

ctx := context.Background()
testKey := "testKey"
testValue := []byte("testValue")

// Make sure nothing is there
value, err := client.Get(ctx, testKey)
require.NoError(t, err)
require.Nil(t, value)

// Set it
err = client.Set(ctx, testKey, testValue)
require.NoError(t, err)

// Get it back out, make sure it's right
value, err = client.Get(ctx, testKey)
require.NoError(t, err)
require.Equal(t, testValue, value)

// Delete it
err = client.Delete(ctx, testKey)
require.NoError(t, err)

// Make sure it's gone
value, err = client.Get(ctx, testKey)
require.NoError(t, err)
require.Nil(t, value)
}

func TestNewClientTransactionErrors(t *testing.T) {
timeout := 100 * time.Millisecond

testKey := "testKey"
testValue := []byte("testValue")

testCases := []struct {
name string
setup func(*bbolt.Tx) error
validate func(*testing.T, *fileStorageClient)
}{
{
name: "get",
setup: func(tx *bbolt.Tx) error {
return tx.DeleteBucket(defaultBucket)
},
validate: func(t *testing.T, c *fileStorageClient) {
value, err := c.Get(context.Background(), testKey)
require.Error(t, err)
require.Equal(t, "storage not initialized", err.Error())
require.Nil(t, value)
},
},
{
name: "set",
setup: func(tx *bbolt.Tx) error {
return tx.DeleteBucket(defaultBucket)
},
validate: func(t *testing.T, c *fileStorageClient) {
err := c.Set(context.Background(), testKey, testValue)
require.Error(t, err)
require.Equal(t, "storage not initialized", err.Error())
},
},
{
name: "delete",
setup: func(tx *bbolt.Tx) error {
return tx.DeleteBucket(defaultBucket)
},
validate: func(t *testing.T, c *fileStorageClient) {
err := c.Delete(context.Background(), testKey)
require.Error(t, err)
require.Equal(t, "storage not initialized", err.Error())
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

tempDir := newTempDir(t)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, timeout)
require.NoError(t, err)

// Create a problem
client.db.Update(tc.setup)

// Validate expected behavior
tc.validate(t, client)
})
}
}

func TestNewClientErrorsOnInvalidBucket(t *testing.T) {
temp := defaultBucket
defaultBucket = nil

tempDir := newTempDir(t)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.Error(t, err)
require.Nil(t, client)

defaultBucket = temp
}

func BenchmarkClientGet(b *testing.B) {
tempDir := newTempDir(b)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(b, err)

ctx := context.Background()
testKey := "testKey"

for n := 0; n < b.N; n++ {
client.Get(ctx, testKey)
}
}

func BenchmarkClientSet(b *testing.B) {
tempDir := newTempDir(b)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(b, err)

ctx := context.Background()
testKey := "testKey"
testValue := []byte("testValue")

for n := 0; n < b.N; n++ {
client.Set(ctx, testKey, testValue)
}
}

func BenchmarkClientDelete(b *testing.B) {
tempDir := newTempDir(b)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(b, err)

ctx := context.Background()
testKey := "testKey"

for n := 0; n < b.N; n++ {
client.Delete(ctx, testKey)
}
}

func newTempDir(tb testing.TB) string {
tempDir, err := ioutil.TempDir("", "")
require.NoError(tb, err)
tb.Cleanup(func() { os.RemoveAll(tempDir) })
return tempDir
}
29 changes: 29 additions & 0 deletions extension/storage/filestorage/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filestorage

import (
"time"

"go.opentelemetry.io/collector/config"
)

// Config defines configuration for http forwarder extension.
type Config struct {
config.ExtensionSettings `mapstructure:",squash"`

Directory string `mapstructure:"directory,omitempty"`
Timeout time.Duration `mapstructure:"timeout,omitempty"`
}
Loading