Skip to content

Commit

Permalink
Refactor otlp exporter to upcoming auth changes (open-telemetry#3288)
Browse files Browse the repository at this point in the history
* refactor otlp exporter to upcoming auth changes

* addressed review comments

* addressed review comments

* addressed review comments

* addressed review comments

* added suggestion from the review

* added suggestion from the review

* added suggestion from the review
  • Loading branch information
pavankrish123 authored and dashpole committed Jun 14, 2021
1 parent 56e5914 commit 18ea677
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 30 deletions.
5 changes: 3 additions & 2 deletions exporter/otlpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestLoadConfig(t *testing.T) {

factory := NewFactory()
factories.Exporters[typeStr] = factory

cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

require.NoError(t, err)
Expand Down Expand Up @@ -81,12 +82,12 @@ func TestLoadConfig(t *testing.T) {
PermitWithoutStream: true,
Timeout: 30 * time.Second,
},
WriteBufferSize: 512 * 1024,
PerRPCAuth: &configgrpc.PerRPCAuthConfig{
AuthType: "bearer",
BearerToken: "some-token",
},
BalancerName: "round_robin",
WriteBufferSize: 512 * 1024,
BalancerName: "round_robin",
},
})
}
3 changes: 3 additions & 0 deletions exporter/otlpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func createTracesExporter(
exporterhelper.WithTimeout(oCfg.TimeoutSettings),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown))
}

Expand All @@ -92,6 +93,7 @@ func createMetricsExporter(
exporterhelper.WithTimeout(oCfg.TimeoutSettings),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown),
)
}
Expand All @@ -114,6 +116,7 @@ func createLogsExporter(
exporterhelper.WithTimeout(oCfg.TimeoutSettings),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown),
)
}
43 changes: 24 additions & 19 deletions exporter/otlpexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configgrpc"
Expand Down Expand Up @@ -57,11 +58,11 @@ func TestCreateMetricsExporter(t *testing.T) {

func TestCreateTracesExporter(t *testing.T) {
endpoint := testutil.GetAvailableLocalAddress(t)

tests := []struct {
name string
config Config
mustFail bool
name string
config Config
mustFailOnCreate bool
mustFailOnStart bool
}{
{
name: "NoEndpoint",
Expand All @@ -71,7 +72,7 @@ func TestCreateTracesExporter(t *testing.T) {
Endpoint: "",
},
},
mustFail: true,
mustFailOnCreate: true,
},
{
name: "UseSecure",
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestCreateTracesExporter(t *testing.T) {
Compression: "unknown compression",
},
},
mustFail: true,
mustFailOnStart: true,
},
{
name: "CaCert",
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestCreateTracesExporter(t *testing.T) {
},
},
},
mustFail: true,
mustFailOnStart: true,
},
}

Expand All @@ -178,19 +179,23 @@ func TestCreateTracesExporter(t *testing.T) {
factory := NewFactory()
creationParams := component.ExporterCreateParams{Logger: zap.NewNop()}
consumer, err := factory.CreateTracesExporter(context.Background(), creationParams, &tt.config)

if tt.mustFail {
if tt.mustFailOnCreate {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, consumer)

err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of OTLP exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
return
}
assert.NoError(t, err)
assert.NotNil(t, consumer)
err = consumer.Start(context.Background(), componenttest.NewNopHost())
if tt.mustFailOnStart {
assert.Error(t, err)
return
}
assert.NoError(t, err)
err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of OTLP exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
})
}
Expand Down
17 changes: 9 additions & 8 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
Expand All @@ -48,14 +49,14 @@ func newExporter(cfg config.Exporter) (*exporter, error) {
return nil, errors.New("OTLP exporter config requires an Endpoint")
}

e := &exporter{}
e.config = oCfg
w, err := newGrpcSender(oCfg)
if err != nil {
return nil, err
}
e.w = w
return e, nil
return &exporter{config: oCfg}, nil
}

// start actually creates the gRPC connection. The client construction is deferred till this point as this
// is the only place we get hold of Extensions which are required to construct auth round tripper.
func (e *exporter) start(_ context.Context, _ component.Host) (err error) {
e.w, err = newGrpcSender(e.config)
return
}

func (e *exporter) shutdown(context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ func TestSendTraces(t *testing.T) {
exp, err := factory.CreateTracesExporter(context.Background(), creationParams, cfg)
require.NoError(t, err)
require.NotNil(t, exp)

defer func() {
assert.NoError(t, exp.Shutdown(context.Background()))
}()

host := componenttest.NewNopHost()

assert.NoError(t, exp.Start(context.Background(), host))

// Ensure that initially there is no data in the receiver.
Expand Down

0 comments on commit 18ea677

Please sign in to comment.