Skip to content

Commit

Permalink
Add Cassandra OTEL exporter (#2139)
Browse files Browse the repository at this point in the history
* Add Cassandra OTEL exporter

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Add comments

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay authored Mar 27, 2020
1 parent e124801 commit 5e06df7
Show file tree
Hide file tree
Showing 23 changed files with 495 additions and 142 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ fmt:

.PHONY: lint-gosec
lint-gosec:
time gosec -quiet -exclude=G104,G107 ./...
time gosec -quiet -exclude=G104,G107 -exclude-dir=cmd/opentelemetry-collector ./...

.PHONY: lint-staticcheck
lint-staticcheck:
Expand All @@ -177,6 +177,7 @@ lint: lint-staticcheck lint-gosec lint-otel
.PHONY: lint-otel
lint-otel:
cd ${OTEL_COLLECTOR_DIR} && $(GOVET) ./...
cd ${OTEL_COLLECTOR_DIR} && time gosec -quiet -exclude=G104,G107 ./...

.PHONY: go-lint
go-lint:
Expand Down
8 changes: 8 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector/defaults"
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
storageKafka "github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

Expand All @@ -30,8 +32,14 @@ func Components(v *viper.Viper) config.Factories {
opts.InitFromViper(v)
return opts
}}
cassandraExp := cassandra.Factory{OptionsFactory: func() *storageCassandra.Options {
opts := cassandra.DefaultOptions()
opts.InitFromViper(v)
return opts
}}

factories, _ := defaults.Components()
factories.Exporters[kafkaExp.Type()] = kafkaExp
factories.Exporters[cassandraExp.Type()] = cassandraExp
return factories
}
8 changes: 7 additions & 1 deletion cmd/opentelemetry-collector/app/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ import (

"github.com/magiconair/properties/assert"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
)

func TestComponents(t *testing.T) {
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags)
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags, cassandra.DefaultOptions().AddFlags)
factories := Components(v)
assert.Equal(t, "jaeger_kafka", factories.Exporters[kafka.TypeStr].Type())
assert.Equal(t, "jaeger_cassandra", factories.Exporters[cassandra.TypeStr].Type())

kafkaFactory := factories.Exporters[kafka.TypeStr]
kc := kafkaFactory.CreateDefaultConfig().(*kafka.Config)
assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers)

cassandraFactory := factories.Exporters[cassandra.TypeStr]
cc := cassandraFactory.CreateDefaultConfig().(*cassandra.Config)
assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers)
}
27 changes: 27 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

// Config holds configuration of Jaeger Cassandra exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
cassandra.Options `mapstructure:",squash"`
}
88 changes: 88 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"path"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/cmd/flags"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

func TestDefaultConfig(t *testing.T) {
factory := &Factory{OptionsFactory: func() *cassandra.Options {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
return opts
}}
defaultCfg := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, []string{"127.0.0.1"}, defaultCfg.Options.GetPrimary().Servers)
assert.Equal(t, []string{"127.0.0.1"}, defaultCfg.Options.Primary.Servers)
assert.Equal(t, 2, defaultCfg.Primary.ConnectionsPerHost)
assert.Equal(t, "jaeger_v1_test", defaultCfg.Primary.Keyspace)
assert.Equal(t, 3, defaultCfg.Primary.MaxRetryAttempts)
assert.Equal(t, 4, defaultCfg.Primary.ProtoVersion)
assert.Equal(t, time.Minute, defaultCfg.Primary.ReconnectInterval)
assert.Equal(t, time.Hour*12, defaultCfg.SpanStoreWriteCacheTTL)
assert.Equal(t, true, defaultCfg.Index.Tags)
assert.Equal(t, true, defaultCfg.Index.Logs)
assert.Equal(t, true, defaultCfg.Index.ProcessTags)
}

func TestLoadConfigAndFlags(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)

v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag)
err = c.ParseFlags([]string{"--cassandra.servers=bar", "--cassandra.port=9000", "--config-file=./testdata/jaeger-config.yaml"})
require.NoError(t, err)

err = flags.TryLoadConfigFile(v)
require.NoError(t, err)

factory := &Factory{OptionsFactory: func() *cassandra.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
require.Equal(t, []string{"bar"}, opts.GetPrimary().Servers)
return opts
}}

factories.Exporters[TypeStr] = factory
colConfig, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, colConfig)

cfg := colConfig.Exporters[TypeStr].(*Config)
assert.Equal(t, TypeStr, cfg.Name())
assert.Equal(t, []string{"first", "second"}, cfg.Primary.Servers)
assert.Equal(t, 9000, cfg.Primary.Port)
assert.Equal(t, false, cfg.Index.Tags)
assert.Equal(t, "my-keyspace", cfg.Primary.Keyspace)
assert.Equal(t, false, cfg.Index.Tags)
assert.Equal(t, true, cfg.Index.Logs)
assert.Equal(t, "user", cfg.Primary.Authenticator.Basic.Username)
assert.Equal(t, "pass", cfg.Primary.Authenticator.Basic.Password)
assert.Equal(t, time.Second*12, cfg.SpanStoreWriteCacheTTL)
assert.Equal(t, true, cfg.Primary.TLS.Enabled)
assert.Equal(t, "/foo/bar", cfg.Primary.TLS.CAPath)
}
16 changes: 16 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package cassandra implements Jaeger Cassandra storage as OpenTelemetry exporter.
package cassandra
36 changes: 36 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

// New creates Cassandra exporter/storage
func New(config *Config, log *zap.Logger) (exporter.TraceExporter, error) {
f := cassandra.NewFactory()
f.InitFromOptions(&config.Options)

err := f.Initialize(metrics.NullFactory, log)
if err != nil {
return nil, err
}
return storageOtelExporter.NewSpanWriterExporter(config, f)
}
78 changes: 78 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

const (
// TypeStr defines type of the Cassandra exporter.
TypeStr = "jaeger_cassandra"
)

// OptionsFactory returns initialized cassandra.OptionsFactory structure.
type OptionsFactory func() *cassandra.Options

// DefaultOptions creates Cassandra options supported by this exporter.
func DefaultOptions() *cassandra.Options {
return cassandra.NewOptions("cassandra")
}

// Factory is the factory for Jaeger Cassandra exporter.
type Factory struct {
OptionsFactory OptionsFactory
}

// Type gets the type of exporter.
func (Factory) Type() string {
return TypeStr
}

// CreateDefaultConfig returns default configuration of Factory.
// This function implements OTEL exporter.BaseFactory interface.
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := f.OptionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
}
}

// CreateTraceExporter creates Jaeger Cassandra trace exporter.
// This function implements OTEL exporter.Factory interface.
func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (exporter.TraceExporter, error) {
config, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
return New(config, log)
}

// CreateMetricsExporter is not implemented.
// This function implements OTEL exporter.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (exporter.MetricsExporter, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
66 changes: 66 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"testing"

"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

func TestCreateTraceExporter(t *testing.T) {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
factory := Factory{OptionsFactory: func() *cassandra.Options {
return opts
}}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), factory.CreateDefaultConfig())
require.Nil(t, exporter)
assert.EqualError(t, err, "gocql: unable to create session: control: unable to connect to initial hosts: dial tcp 127.0.0.1:9042: connect: connection refused")
}

func TestCreateTraceExporter_NilConfig(t *testing.T) {
factory := Factory{}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), nil)
require.Nil(t, exporter)
assert.EqualError(t, err, "could not cast configuration to jaeger_cassandra")
}

func TestCreateDefaultConfig(t *testing.T) {
factory := Factory{OptionsFactory: DefaultOptions}
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateMetricsExporter(t *testing.T) {
f := Factory{OptionsFactory: DefaultOptions}
mReceiver, err := f.CreateMetricsExporter(zap.NewNop(), f.CreateDefaultConfig())
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
}

func TestType(t *testing.T) {
factory := Factory{}
assert.Equal(t, TypeStr, factory.Type())
}
Loading

0 comments on commit 5e06df7

Please sign in to comment.