Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cassandra OTEL exporter #2139

Merged
merged 2 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ fmt:

.PHONY: lint-gosec
lint-gosec:
time gosec -quiet -exclude=G104,G107 ./...
time gosec -quiet -exclude=G104,G107 -exclude-dir=cmd/opentelemetry-collector ./...

.PHONY: lint-staticcheck
lint-staticcheck:
Expand All @@ -177,6 +177,7 @@ lint: lint-staticcheck lint-gosec lint-otel
.PHONY: lint-otel
lint-otel:
cd ${OTEL_COLLECTOR_DIR} && $(GOVET) ./...
cd ${OTEL_COLLECTOR_DIR} && time gosec -quiet -exclude=G104,G107 ./...

.PHONY: go-lint
go-lint:
Expand Down
8 changes: 8 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector/defaults"
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
storageKafka "github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

Expand All @@ -30,8 +32,14 @@ func Components(v *viper.Viper) config.Factories {
opts.InitFromViper(v)
return opts
}}
cassandraExp := cassandra.Factory{OptionsFactory: func() *storageCassandra.Options {
opts := cassandra.DefaultOptions()
opts.InitFromViper(v)
return opts
}}

factories, _ := defaults.Components()
factories.Exporters[kafkaExp.Type()] = kafkaExp
factories.Exporters[cassandraExp.Type()] = cassandraExp
return factories
}
8 changes: 7 additions & 1 deletion cmd/opentelemetry-collector/app/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ import (

"github.com/magiconair/properties/assert"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
)

func TestComponents(t *testing.T) {
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags)
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags, cassandra.DefaultOptions().AddFlags)
factories := Components(v)
assert.Equal(t, "jaeger_kafka", factories.Exporters[kafka.TypeStr].Type())
assert.Equal(t, "jaeger_cassandra", factories.Exporters[cassandra.TypeStr].Type())

kafkaFactory := factories.Exporters[kafka.TypeStr]
kc := kafkaFactory.CreateDefaultConfig().(*kafka.Config)
assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers)

cassandraFactory := factories.Exporters[cassandra.TypeStr]
cc := cassandraFactory.CreateDefaultConfig().(*cassandra.Config)
assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No harm in testing again, but default values are tested in cmd/opentelemetry-collector/app/exporter/cassandra/config_test.go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double-check that the default values have been applied. I will consider changing defaults interface to include cobra command and install Jaeger flags by default.

}
27 changes: 27 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we should consider naming this import storageCassandra. Less confusing and also consistent with other files where it has been imported

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only in one file, where there is a conflict in imports.

)

// Config holds configuration of Jaeger Cassandra exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
cassandra.Options `mapstructure:",squash"`
}
88 changes: 88 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"path"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/cmd/flags"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

func TestDefaultConfig(t *testing.T) {
factory := &Factory{OptionsFactory: func() *cassandra.Options {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
return opts
}}
defaultCfg := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, []string{"127.0.0.1"}, defaultCfg.Options.GetPrimary().Servers)
assert.Equal(t, []string{"127.0.0.1"}, defaultCfg.Options.Primary.Servers)
assert.Equal(t, 2, defaultCfg.Primary.ConnectionsPerHost)
assert.Equal(t, "jaeger_v1_test", defaultCfg.Primary.Keyspace)
assert.Equal(t, 3, defaultCfg.Primary.MaxRetryAttempts)
assert.Equal(t, 4, defaultCfg.Primary.ProtoVersion)
assert.Equal(t, time.Minute, defaultCfg.Primary.ReconnectInterval)
assert.Equal(t, time.Hour*12, defaultCfg.SpanStoreWriteCacheTTL)
assert.Equal(t, true, defaultCfg.Index.Tags)
assert.Equal(t, true, defaultCfg.Index.Logs)
assert.Equal(t, true, defaultCfg.Index.ProcessTags)
}

func TestLoadConfigAndFlags(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)

v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag)
err = c.ParseFlags([]string{"--cassandra.servers=bar", "--cassandra.port=9000", "--config-file=./testdata/jaeger-config.yaml"})
require.NoError(t, err)

err = flags.TryLoadConfigFile(v)
require.NoError(t, err)

factory := &Factory{OptionsFactory: func() *cassandra.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
require.Equal(t, []string{"bar"}, opts.GetPrimary().Servers)
return opts
}}

factories.Exporters[TypeStr] = factory
colConfig, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, colConfig)

cfg := colConfig.Exporters[TypeStr].(*Config)
assert.Equal(t, TypeStr, cfg.Name())
assert.Equal(t, []string{"first", "second"}, cfg.Primary.Servers)
assert.Equal(t, 9000, cfg.Primary.Port)
assert.Equal(t, false, cfg.Index.Tags)
assert.Equal(t, "my-keyspace", cfg.Primary.Keyspace)
assert.Equal(t, false, cfg.Index.Tags)
assert.Equal(t, true, cfg.Index.Logs)
assert.Equal(t, "user", cfg.Primary.Authenticator.Basic.Username)
assert.Equal(t, "pass", cfg.Primary.Authenticator.Basic.Password)
assert.Equal(t, time.Second*12, cfg.SpanStoreWriteCacheTTL)
assert.Equal(t, true, cfg.Primary.TLS.Enabled)
assert.Equal(t, "/foo/bar", cfg.Primary.TLS.CAPath)
}
16 changes: 16 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package cassandra implements Jaeger Cassandra storage as OpenTelemetry exporter.
package cassandra
36 changes: 36 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

// New creates Cassandra exporter/storage
func New(config *Config, log *zap.Logger) (exporter.TraceExporter, error) {
f := cassandra.NewFactory()
f.InitFromOptions(&config.Options)

err := f.Initialize(metrics.NullFactory, log)
if err != nil {
return nil, err
}
return storageOtelExporter.NewSpanWriterExporter(config, f)
}
78 changes: 78 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

const (
// TypeStr defines type of the Cassandra exporter.
TypeStr = "jaeger_cassandra"
)

// OptionsFactory returns initialized cassandra.OptionsFactory structure.
type OptionsFactory func() *cassandra.Options

// DefaultOptions creates Cassandra options supported by this exporter.
func DefaultOptions() *cassandra.Options {
return cassandra.NewOptions("cassandra")
}

// Factory is the factory for Jaeger Cassandra exporter.
type Factory struct {
OptionsFactory OptionsFactory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what scenarios does this need to be a function instead of a struct?

Copy link
Member Author

@pavolloffay pavolloffay Mar 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to initialize the Options struct with wiper once the collector service has started.

We can revisit this and instead pass a viper instance and run the initialization in CreateDefaultConfig.

}

// Type gets the type of exporter.
func (Factory) Type() string {
return TypeStr
}

// CreateDefaultConfig returns default configuration of Factory.
// This function implements OTEL exporter.BaseFactory interface.
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the name CreateDefaultConfig required by some external API? We usually mention it in the comment.

Given that the result of this function depends on externally provided (via OptionsFactory) options, the name "default" is confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateDefaultConfig() is part of the extension.Factory interface in the Otel collector. I agree that "default" is confusing here, we should ideally apply user supplied options once the default config is created

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that "default" is confusing here, we should ideally apply user supplied options once the default config is created

OTEL service applies config file changes to the returned instance. We cannot override them by config supplied as legacy jaeger flags. The OTEL config takes precedence over legacy jaeger configuration.

opts := f.OptionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
}
}

// CreateTraceExporter creates Jaeger Cassandra trace exporter.
// This function implements OTEL exporter.Factory interface.
func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (exporter.TraceExporter, error) {
config, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
return New(config, log)
}

// CreateMetricsExporter is not implemented.
// This function implements OTEL exporter.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (exporter.MetricsExporter, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
66 changes: 66 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"testing"

"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

func TestCreateTraceExporter(t *testing.T) {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
factory := Factory{OptionsFactory: func() *cassandra.Options {
return opts
}}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), factory.CreateDefaultConfig())
require.Nil(t, exporter)
assert.EqualError(t, err, "gocql: unable to create session: control: unable to connect to initial hosts: dial tcp 127.0.0.1:9042: connect: connection refused")
}

func TestCreateTraceExporter_NilConfig(t *testing.T) {
factory := Factory{}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), nil)
require.Nil(t, exporter)
assert.EqualError(t, err, "could not cast configuration to jaeger_cassandra")
}

func TestCreateDefaultConfig(t *testing.T) {
factory := Factory{OptionsFactory: DefaultOptions}
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateMetricsExporter(t *testing.T) {
f := Factory{OptionsFactory: DefaultOptions}
mReceiver, err := f.CreateMetricsExporter(zap.NewNop(), f.CreateDefaultConfig())
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
}

func TestType(t *testing.T) {
factory := Factory{}
assert.Equal(t, TypeStr, factory.Type())
}
Loading