Skip to content

Commit

Permalink
Enable batch and queued retry processors by default (#2330)
Browse files Browse the repository at this point in the history
* Enable batch and queued retry processors by default

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Add godoc

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix review comments

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay authored Jul 8, 2020
1 parent 97d2319 commit c549252
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 27 deletions.
35 changes: 21 additions & 14 deletions cmd/opentelemetry/app/defaultconfig/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configprotocol"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/processor/queuedprocessor"
"go.opentelemetry.io/collector/processor/resourceprocessor"
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
"go.opentelemetry.io/collector/receiver/zipkinreceiver"
Expand Down Expand Up @@ -71,11 +73,7 @@ func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) {
return nil, err
}
receivers := createReceivers(c.ComponentType, c.ZipkinHostPort, c.Factories)
processors := configmodels.Processors{}
resProcessor := c.Factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
if len(resProcessor.Labels) > 0 {
processors[resProcessor.Name()] = resProcessor
}
processors, processorNames := createProcessors(c.Factories)
hc := c.Factories.Extensions["health_check"].CreateDefaultConfig()
return &configmodels.Config{
Receivers: receivers,
Expand All @@ -88,14 +86,31 @@ func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) {
string(configmodels.TracesDataType): {
InputType: configmodels.TracesDataType,
Receivers: receiverNames(receivers),
Processors: processorNames(processors),
Processors: processorNames,
Exporters: exporterNames(exporters),
},
},
},
}, nil
}

func createProcessors(factories config.Factories) (configmodels.Processors, []string) {
processors := configmodels.Processors{}
var names []string
resource := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
if len(resource.Labels) > 0 {
processors[resource.Name()] = resource
names = append(names, resource.Name())
}
batch := factories.Processors["batch"].CreateDefaultConfig().(*batchprocessor.Config)
processors[batch.Name()] = batch
names = append(names, batch.Name())
queuedRetry := factories.Processors["queued_retry"].CreateDefaultConfig().(*queuedprocessor.Config)
processors[queuedRetry.Name()] = queuedRetry
names = append(names, queuedRetry.Name())
return processors, names
}

func createReceivers(component ComponentType, zipkinHostPort string, factories config.Factories) configmodels.Receivers {
if component == Ingester {
kafkaReceiver := factories.Receivers[kafkareceiver.TypeStr].CreateDefaultConfig().(*kafkareceiver.Config)
Expand Down Expand Up @@ -188,14 +203,6 @@ func receiverNames(receivers configmodels.Receivers) []string {
return names
}

func processorNames(processors configmodels.Processors) []string {
var names []string
for _, v := range processors {
names = append(names, v.Name())
}
return names
}

func exporterNames(exporters configmodels.Exporters) []string {
var names []string
for _, v := range exporters {
Expand Down
30 changes: 17 additions & 13 deletions cmd/opentelemetry/app/defaultconfig/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func TestService(t *testing.T) {
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger"},
Exporters: []string{"jaeger"},
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger"},
Processors: []string{"batch", "queued_retry"},
Exporters: []string{"jaeger"},
},
},
},
Expand All @@ -70,7 +71,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger"},
Processors: []string{"resource"},
Processors: []string{"resource", "batch", "queued_retry"},
Exporters: []string{elasticsearchexporter.TypeStr, kafkaexporter.TypeStr, memoryexporter.TypeStr},
},
},
Expand All @@ -85,9 +86,10 @@ func TestService(t *testing.T) {
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Exporters: []string{elasticsearchexporter.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Processors: []string{"batch", "queued_retry"},
Exporters: []string{elasticsearchexporter.TypeStr},
},
},
},
Expand All @@ -101,9 +103,10 @@ func TestService(t *testing.T) {
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Exporters: []string{cassandraexporter.TypeStr, elasticsearchexporter.TypeStr, grpcpluginexporter.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Processors: []string{"batch", "queued_retry"},
Exporters: []string{cassandraexporter.TypeStr, elasticsearchexporter.TypeStr, grpcpluginexporter.TypeStr},
},
},
},
Expand All @@ -118,9 +121,10 @@ func TestService(t *testing.T) {
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger", "zipkin"},
Exporters: []string{elasticsearchexporter.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger", "zipkin"},
Processors: []string{"batch", "queued_retry"},
Exporters: []string{elasticsearchexporter.TypeStr},
},
},
},
Expand Down

0 comments on commit c549252

Please sign in to comment.