Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] Wire up logs processing #9368

Merged
merged 3 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
### 💡 Enhancements 💡
- `pkg/translator/prometheusremotewrite`: Allow to disable sanitize metric labels (#8270)
- `basicauthextension`: Implement `configauth.ClientAuthenticator` so that the extension can also be used as HTTP client basic authenticator.(#8847)
- `processor/transform`: Add transformation of logs (#9368)

### 🧰 Bug fixes 🧰

Expand Down
21 changes: 19 additions & 2 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Transform Processor

Supported pipeline types: traces
Supported pipeline types: logs, traces

The transform processor modifies telemetry based on configuration using the Telemetry Query Language.
It takes a list of queries which are performed in the order specified in the config.
Expand Down Expand Up @@ -38,20 +38,37 @@ exporters:

processors:
transform:
logs:
queries:
- set(severity_text, "FAIL") where body == "request failed"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- set(body, attributes["http.route"])
traces:
queries:
- set(status.code, 1) where attributes["http.path"] == "/health"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- set(name, attributes["http.route"])
service:
pipelines:
logs:
receivers: [otlp]
processors: [transform]
exporters: [nop]
traces:
receivers: [otlp]
processors: [transform]
exporters: [nop]
```

This processor will perform the operations in order for all spans
This processor will perform the operations in order for

All logs

1) Set severity text to FAIL if the body contains a string text "request failed"
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
3) Set `body` to the `http.route` attribute if it is set

All spans

1) Set status code to OK for all spans with a path `/health`
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
Expand Down
22 changes: 20 additions & 2 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-co

import (
"go.opentelemetry.io/collector/config"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

type LogsConfig struct {
Queries []string `mapstructure:"queries"`

// The functions that have been registered in the extension for logs processing.
functions map[string]interface{} `mapstructure:"-"`
}

type TracesConfig struct {
Queries []string `mapstructure:"queries"`

Expand All @@ -31,12 +40,21 @@ type TracesConfig struct {
type Config struct {
config.ProcessorSettings `mapstructure:",squash"`

Logs LogsConfig `mapstructure:"logs"`
Traces TracesConfig `mapstructure:"traces"`
}

var _ config.Processor = (*Config)(nil)

func (c *Config) Validate() error {
_, err := common.ParseQueries(c.Traces.Queries, c.Traces.functions, traces.ParsePath)
return err
var errors error
_, err := common.ParseQueries(c.Logs.Queries, c.Logs.functions, logs.ParsePath)
if err != nil {
errors = multierr.Append(errors, err)
}
_, err = common.ParseQueries(c.Traces.Queries, c.Traces.functions, traces.ParsePath)
if err != nil {
errors = multierr.Append(errors, err)
}
return errors
}
25 changes: 21 additions & 4 deletions processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

func TestLoadingConfig(t *testing.T) {
Expand All @@ -40,13 +41,21 @@ func TestLoadingConfig(t *testing.T) {
p0 := cfg.Processors[config.NewComponentID(typeStr)]
assert.Equal(t, p0, &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: LogsConfig{
Queries: []string{
`set(body, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: logs.DefaultFunctions(),
},
Traces: TracesConfig{
Queries: []string{
`set(name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: common.DefaultFunctions(),
functions: traces.DefaultFunctions(),
},
})
}
Expand All @@ -58,11 +67,19 @@ func TestLoadInvalidConfig(t *testing.T) {
factory := NewFactory()
factories.Processors[typeStr] = factory

cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax.yaml"), factories)
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_log.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_log.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_trace.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function.yaml"), factories)
cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_trace.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)
}
26 changes: 26 additions & 0 deletions processor/transformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

Expand All @@ -36,13 +37,19 @@ func NewFactory() component.ProcessorFactory {
return component.NewProcessorFactory(
typeStr,
createDefaultConfig,
component.WithLogsProcessor(createLogsProcessor),
component.WithTracesProcessor(createTracesProcessor),
)
}

func createDefaultConfig() config.Processor {
return &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: LogsConfig{
Queries: []string{},

functions: logs.DefaultFunctions(),
},
Traces: TracesConfig{
Queries: []string{},

Expand All @@ -51,6 +58,25 @@ func createDefaultConfig() config.Processor {
}
}

func createLogsProcessor(
_ context.Context,
settings component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (component.LogsProcessor, error) {
oCfg := cfg.(*Config)

proc, err := logs.NewProcessor(oCfg.Logs.Queries, oCfg.Logs.functions, settings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
proc.ProcessLogs,
processorhelper.WithCapabilities(processorCapabilities))
}

func createTracesProcessor(
_ context.Context,
settings component.ProcessorCreateSettings,
Expand Down
36 changes: 34 additions & 2 deletions processor/transformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

func TestFactory_Type(t *testing.T) {
Expand All @@ -38,10 +40,15 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.Equal(t, cfg, &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: LogsConfig{
Queries: []string{},

functions: logs.DefaultFunctions(),
},
Traces: TracesConfig{
Queries: []string{},

functions: common.DefaultFunctions(),
functions: traces.DefaultFunctions(),
},
})
assert.NoError(t, configtest.CheckConfigStruct(cfg))
Expand All @@ -64,6 +71,31 @@ func TestFactoryCreateTracesProcessor_InvalidActions(t *testing.T) {
assert.Nil(t, ap)
}

func TestFactoryCreateLogsProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Logs.Queries = []string{`set(attributes["test"], "pass") where body == "operationA"`}

lp, err := factory.CreateLogsProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
assert.NotNil(t, lp)
assert.NoError(t, err)

ld := plog.NewLogs()
log := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
log.Body().SetStringVal("operationA")

_, ok := log.Attributes().Get("test")
assert.False(t, ok)

err = lp.ConsumeLogs(context.Background(), ld)
assert.NoError(t, err)

val, ok := log.Attributes().Get("test")
assert.True(t, ok)
assert.Equal(t, "pass", val.StringVal())
}

func TestFactoryCreateTracesProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
Expand Down
24 changes: 24 additions & 0 deletions processor/transformprocessor/internal/logs/functions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logs // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func DefaultFunctions() map[string]interface{} {
// No logs-only functions yet.
return common.DefaultFunctions()
}
Loading