Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Mar 25, 2020
1 parent d16301a commit a362a70
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 57 deletions.
8 changes: 8 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector/defaults"
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
storageKafka "github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

Expand All @@ -30,8 +32,14 @@ func Components(v *viper.Viper) config.Factories {
opts.InitFromViper(v)
return opts
}}
cassandraExp := cassandra.Factory{OptionsFactory: func() *storageCassandra.Options {
opts := cassandra.DefaultOptions()
opts.InitFromViper(v)
return opts
}}

factories, _ := defaults.Components()
factories.Exporters[kafkaExp.Type()] = kafkaExp
factories.Exporters[cassandraExp.Type()] = cassandraExp
return factories
}
8 changes: 7 additions & 1 deletion cmd/opentelemetry-collector/app/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ import (

"github.com/magiconair/properties/assert"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
)

func TestComponents(t *testing.T) {
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags)
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags, cassandra.DefaultOptions().AddFlags)
factories := Components(v)
assert.Equal(t, "jaeger_kafka", factories.Exporters[kafka.TypeStr].Type())
assert.Equal(t, "jaeger_cassandra", factories.Exporters[cassandra.TypeStr].Type())

kafkaFactory := factories.Exporters[kafka.TypeStr]
kc := kafkaFactory.CreateDefaultConfig().(*kafka.Config)
assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers)

cassandraFactory := factories.Exporters[cassandra.TypeStr]
cc := cassandraFactory.CreateDefaultConfig().(*cassandra.Config)
assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func TestDefaultConfig(t *testing.T) {
factory := &Factory{Options: func() *cassandra.Options {
factory := &Factory{OptionsFactory: func() *cassandra.Options {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestLoadConfigAndFlags(t *testing.T) {
err = flags.TryLoadConfigFile(v)
require.NoError(t, err)

factory := &Factory{Options: func() *cassandra.Options {
factory := &Factory{OptionsFactory: func() *cassandra.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
require.Equal(t, []string{"bar"}, opts.GetPrimary().Servers)
Expand Down
53 changes: 11 additions & 42 deletions cmd/opentelemetry-collector/app/exporter/cassandra/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,32 @@
package cassandra

import (
"context"
"io"

"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumererror"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper"
jaegertranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/jaeger"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
spanstoreInterface "github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage"
)

// New creates Cassandra exporter/storage
func New(config *Config, log *zap.Logger) (exporter.TraceExporter, error) {
fac := cassandra.NewFactory()
fac.InitFromOptions(&config.Options)
f := cassandra.NewFactory()
f.InitFromOptions(&config.Options)

err := fac.Initialize(metrics.NullFactory, log)
if err != nil {
return nil, err
}
spanWriter, err := fac.CreateSpanWriter()
err := f.Initialize(metrics.NullFactory, log)
if err != nil {
return nil, err
}
storage := storage{Writer: spanWriter}
return exporterhelper.NewTraceExporter(
config,
storage.store,
exporterhelper.WithShutdown(func() error {
closer := spanWriter.(io.Closer)
closer.Close()
return nil
}))
return create(f, config)
}

type storage struct {
Writer spanstoreInterface.Writer
}

// Store stores data into storage
func (s *storage) store(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error) {
protoBatch, err := jaegertranslator.OCProtoToJaegerProto(td)
func create(factory storage.Factory, config *Config) (exporter.TraceExporter, error) {
// ignoring error for code coverage. Kafka factory never returns an error
spanWriter, err := factory.CreateSpanWriter()
if err != nil {
return len(td.Spans), consumererror.Permanent(err)
}
dropped := 0
for _, span := range protoBatch.Spans {
span.Process = protoBatch.Process
err := s.Writer.WriteSpan(span)
// TODO should we wrap errors as we go and return?
if err != nil {
dropped++
}
return nil, err
}
return dropped, nil
return storageOtelExporter.NewSpanWriterExporter(config, spanWriter)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

func TestNew(t *testing.T) {
m := &mocStorageFactory{}
c := &Config{}
exporter, err := create(m, c)
require.Nil(t, err)
assert.NotNil(t, exporter)
m = &mocStorageFactory{err: errors.New("failed to create")}
exporter, err = create(m, c)
assert.Error(t, err, "failed to create")
assert.Nil(t, exporter)
}

type mocStorageFactory struct {
err error
}

func (m mocStorageFactory) CreateSpanWriter() (spanstore.Writer, error) {
return nil, m.err
}
func (mocStorageFactory) CreateSpanReader() (spanstore.Reader, error) {
return nil, nil
}
func (mocStorageFactory) CreateDependencyReader() (dependencystore.Reader, error) {
return nil, nil
}
func (mocStorageFactory) Initialize(metrics.Factory, *zap.Logger) error {
return nil
}
4 changes: 2 additions & 2 deletions cmd/opentelemetry-collector/app/exporter/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func DefaultOptions() *cassandra.Options {

// Factory is the factory for Jaeger Cassandra exporter.
type Factory struct {
Options OptionsFactory
OptionsFactory OptionsFactory
}

// Type gets the type of exporter.
Expand All @@ -50,7 +50,7 @@ func (Factory) Type() string {

// CreateDefaultConfig returns default configuration of Factory.
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := f.Options()
opts := f.OptionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCreateTraceExporter(t *testing.T) {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
factory := Factory{Options: func() *cassandra.Options {
factory := Factory{OptionsFactory: func() *cassandra.Options {
return opts
}}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), factory.CreateDefaultConfig())
Expand All @@ -47,14 +47,14 @@ func TestCreateTraceExporter_NilConfig(t *testing.T) {
}

func TestCreateDefaultConfig(t *testing.T) {
factory := Factory{Options: DefaultOptions}
factory := Factory{OptionsFactory: DefaultOptions}
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateMetricsExporter(t *testing.T) {
f := Factory{Options: DefaultOptions}
f := Factory{OptionsFactory: DefaultOptions}
mReceiver, err := f.CreateMetricsExporter(zap.NewNop(), f.CreateDefaultConfig())
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
Expand Down
14 changes: 7 additions & 7 deletions cmd/opentelemetry-collector/app/exporter/kafka/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,30 @@ import (
)

func TestNew(t *testing.T) {
m := &mockProducerBuilder{}
m := &mockStorageFactory{}
c := &Config{}
exporter, err := create(m, c)
require.Nil(t, err)
assert.NotNil(t, exporter)
m = &mockProducerBuilder{err: errors.New("failed to create")}
m = &mockStorageFactory{err: errors.New("failed to create")}
exporter, err = create(m, c)
assert.Error(t, err, "failed to create")
assert.Nil(t, exporter)
}

type mockProducerBuilder struct {
type mockStorageFactory struct {
err error
}

func (m mockProducerBuilder) CreateSpanWriter() (spanstore.Writer, error) {
func (m mockStorageFactory) CreateSpanWriter() (spanstore.Writer, error) {
return nil, m.err
}
func (mockProducerBuilder) CreateSpanReader() (spanstore.Reader, error) {
func (mockStorageFactory) CreateSpanReader() (spanstore.Reader, error) {
return nil, nil
}
func (mockProducerBuilder) CreateDependencyReader() (dependencystore.Reader, error) {
func (mockStorageFactory) CreateDependencyReader() (dependencystore.Reader, error) {
return nil, nil
}
func (mockProducerBuilder) Initialize(metrics.Factory, *zap.Logger) error {
func (mockStorageFactory) Initialize(metrics.Factory, *zap.Logger) error {
return nil
}
2 changes: 2 additions & 0 deletions cmd/opentelemetry-collector/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/V
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/gocql/gocql v0.0.0-20200226121155-e5c8c1f505c5/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocql/gocql v0.0.0-20200228163523-cd4b606dd2fb h1:H3tisfjQwq9FTyWqlKsZpgoYrsvn2pmTWvAiDHa5pho=
github.com/gocql/gocql v0.0.0-20200228163523-cd4b606dd2fb/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gofrs/flock v0.0.0-20190320160742-5135e617513b/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/googleapis v1.0.1-0.20180501115203-b23578765ee5/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
Expand Down Expand Up @@ -366,6 +367,7 @@ github.com/grpc-ecosystem/grpc-gateway v1.11.1 h1:/dBYI+n4xIL+Y9SKXQrjlKTmJJDwCS
github.com/grpc-ecosystem/grpc-gateway v1.11.1/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.13.0 h1:sBDQoHXrOlfPobnKw69FIKa1wg9qsLLvvQ/Y19WtFgI=
github.com/grpc-ecosystem/grpc-gateway v1.13.0/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/api v1.2.0 h1:oPsuzLp2uk7I7rojPKuncWbZ+m5TMoD4Ivs+2Rkeh4Y=
Expand Down

0 comments on commit a362a70

Please sign in to comment.