Skip to content

Commit

Permalink
[pdatautil] Add function for splitting log records while preserving c…
Browse files Browse the repository at this point in the history
…ontext
  • Loading branch information
djaglowski committed Oct 24, 2024
1 parent eb48ed7 commit ec75457
Show file tree
Hide file tree
Showing 52 changed files with 3,210 additions and 47 deletions.
27 changes: 27 additions & 0 deletions .chloggen/split-log-records.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: connector/routing

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add ability to route log records individually using OTTL log record context.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [19738]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
14 changes: 14 additions & 0 deletions connector/routingconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,27 @@ func (c *Config) Validate() error {
if len(item.Pipelines) == 0 {
return errNoPipelines
}

switch item.Context {
case "", "resource": // ok
case "log":
if !c.MatchOnce {
return errors.New("log context is not supported with match_once: false")
}
default:
return errors.New("invalid context: " + item.Context)
}
}

return nil
}

// RoutingTableItem specifies how data should be routed to the different pipelines
type RoutingTableItem struct {
// One of "resource" or "log" (other OTTL contexts will be added in the future)
// Optional. Default "resource".
Context string `mapstructure:"context"`

// Statement is a OTTL statement used for making a routing decision.
// One of 'Statement' or 'Condition' must be provided.
Statement string `mapstructure:"statement"`
Expand Down
31 changes: 31 additions & 0 deletions connector/routingconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,37 @@ func TestValidateConfig(t *testing.T) {
},
error: "invalid route: both condition and statement provided",
},
{
name: "invalid context",
config: &Config{
Table: []RoutingTableItem{
{
Context: "invalid",
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
error: "invalid context: invalid",
},
{
name: "log context with match_once false",
config: &Config{
MatchOnce: false,
Table: []RoutingTableItem{
{
Context: "log",
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
error: "log context is not supported with match_once: false",
},
}

for _, tt := range tests {
Expand Down
40 changes: 22 additions & 18 deletions connector/routingconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/plogutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource"
)

Expand Down Expand Up @@ -74,29 +75,36 @@ func (c *logsConnector) switchLogs(ctx context.Context, ld plog.Logs) error {
var errs error
for _, route := range c.router.routeSlice {
matchedLogs := plog.NewLogs()

plogutil.MoveResourcesIf(ld, matchedLogs,
func(rl plog.ResourceLogs) bool {
rtx := ottlresource.NewTransformContext(rl.Resource(), rl)
_, isMatch, err := route.statement.Execute(ctx, rtx)
errs = errors.Join(errs, err)
return isMatch
},
)

switch route.statementContext {
case "", "resource":
plogutil.MoveResourcesIf(ld, matchedLogs,
func(rl plog.ResourceLogs) bool {
rtx := ottlresource.NewTransformContext(rl.Resource(), rl)
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
errs = errors.Join(errs, err)
return isMatch
},
)
case "log":
plogutil.MoveRecordsWithContextIf(ld, matchedLogs,
func(rl plog.ResourceLogs, sl plog.ScopeLogs, lr plog.LogRecord) bool {
ltx := ottllog.NewTransformContext(lr, sl.Scope(), rl.Resource(), sl, rl)
_, isMatch, err := route.logStatement.Execute(ctx, ltx)
errs = errors.Join(errs, err)
return isMatch
},
)
}
if errs != nil {
if c.config.ErrorMode == ottl.PropagateError {
return errs
}
groupAll(groups, c.router.defaultConsumer, matchedLogs)

}
groupAll(groups, route.consumer, matchedLogs)
}

// anything left wasn't matched by any route. Send to default consumer
groupAll(groups, c.router.defaultConsumer, ld)

for consumer, group := range groups {
errs = errors.Join(errs, consumer.ConsumeLogs(ctx, group))
}
Expand All @@ -110,14 +118,12 @@ func (c *logsConnector) matchAll(ctx context.Context, ld plog.Logs) error {
// higher CPU usage.
groups := make(map[consumer.Logs]plog.Logs)
var errs error

for i := 0; i < ld.ResourceLogs().Len(); i++ {
rlogs := ld.ResourceLogs().At(i)
rtx := ottlresource.NewTransformContext(rlogs.Resource(), rlogs)

noRoutesMatch := true
for _, route := range c.router.routeSlice {
_, isMatch, err := route.statement.Execute(ctx, rtx)
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
return err
Expand All @@ -129,9 +135,7 @@ func (c *logsConnector) matchAll(ctx context.Context, ld plog.Logs) error {
noRoutesMatch = false
group(groups, route.consumer, rlogs)
}

}

if noRoutesMatch {
// no route conditions are matched, add resource logs to default exporters group
group(groups, c.router.defaultConsumer, rlogs)
Expand Down
10 changes: 10 additions & 0 deletions connector/routingconnector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,16 @@ func TestLogsConnectorDetailed(t *testing.T) {
filepath.Join("testdata", "logs", "resource_context", "each_matches_one"),
filepath.Join("testdata", "logs", "resource_context", "match_none_with_default"),
filepath.Join("testdata", "logs", "resource_context", "match_none_without_default"),
filepath.Join("testdata", "logs", "log_context", "all_match_first_only"),
filepath.Join("testdata", "logs", "log_context", "all_match_last_only"),
filepath.Join("testdata", "logs", "log_context", "match_none_with_default"),
filepath.Join("testdata", "logs", "log_context", "match_none_without_default"),
filepath.Join("testdata", "logs", "log_context", "some_match_each_route"),
filepath.Join("testdata", "logs", "log_context", "with_resource_condition"),
filepath.Join("testdata", "logs", "log_context", "with_scope_condition"),
filepath.Join("testdata", "logs", "log_context", "with_resource_and_scope_conditions"),
filepath.Join("testdata", "logs", "mixed_context", "match_resource_then_logs"),
filepath.Join("testdata", "logs", "mixed_context", "match_logs_then_resource"),
}

for _, tt := range testCases {
Expand Down
2 changes: 1 addition & 1 deletion connector/routingconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric

noRoutesMatch := true
for _, route := range c.router.routeSlice {
_, isMatch, err := route.statement.Execute(ctx, rtx)
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
return err
Expand Down
96 changes: 71 additions & 25 deletions connector/routingconnector/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource"
)

Expand All @@ -28,8 +29,9 @@ type consumerProvider[C any] func(...pipeline.ID) (C, error)
// parameter C is expected to be one of: consumer.Traces, consumer.Metrics, or
// consumer.Logs.
type router[C any] struct {
logger *zap.Logger
parser ottl.Parser[ottlresource.TransformContext]
logger *zap.Logger
resourceParser ottl.Parser[ottlresource.TransformContext]
logParser ottl.Parser[ottllog.TransformContext]

table []RoutingTableItem
routes map[string]routingItem[C]
Expand All @@ -47,23 +49,17 @@ func newRouter[C any](
provider consumerProvider[C],
settings component.TelemetrySettings,
) (*router[C], error) {
parser, err := ottlresource.NewParser(
common.Functions[ottlresource.TransformContext](),
settings,
)

if err != nil {
return nil, err
}

r := &router[C]{
logger: settings.Logger,
parser: parser,
table: table,
routes: make(map[string]routingItem[C]),
consumerProvider: provider,
}

if err := r.buildParsers(table, settings); err != nil {
return nil, err
}

if err := r.registerConsumers(defaultPipelineIDs); err != nil {
return nil, err
}
Expand All @@ -72,8 +68,48 @@ func newRouter[C any](
}

type routingItem[C any] struct {
consumer C
statement *ottl.Statement[ottlresource.TransformContext]
consumer C
statementContext string

resourceStatement *ottl.Statement[ottlresource.TransformContext]
logStatement *ottl.Statement[ottllog.TransformContext]
}

func (r *router[C]) buildParsers(table []RoutingTableItem, settings component.TelemetrySettings) error {
var buildResource, buildLog bool
for _, item := range table {
switch item.Context {
case "", "resource":
buildResource = true
case "log":
buildLog = true
}
}

var errs error
if buildResource {
parser, err := ottlresource.NewParser(
common.Functions[ottlresource.TransformContext](),
settings,
)
if err == nil {
r.resourceParser = parser
} else {
errs = errors.Join(errs, err)
}
}
if buildLog {
parser, err := ottllog.NewParser(
common.Functions[ottllog.TransformContext](),
settings,
)
if err == nil {
r.logParser = parser
} else {
errs = errors.Join(errs, err)
}
}
return errs
}

func (r *router[C]) registerConsumers(defaultPipelineIDs []pipeline.ID) error {
Expand All @@ -94,8 +130,7 @@ func (r *router[C]) registerConsumers(defaultPipelineIDs []pipeline.ID) error {
return nil
}

// registerDefaultConsumer registers a consumer for the default
// pipelines configured
// registerDefaultConsumer registers a consumer for the default pipelines configured
func (r *router[C]) registerDefaultConsumer(pipelineIDs []pipeline.ID) error {
if len(pipelineIDs) == 0 {
return nil
Expand All @@ -121,18 +156,26 @@ func (r *router[C]) normalizeConditions() {
}
}

// registerRouteConsumers registers a consumer for the pipelines configured
// for each route
// registerRouteConsumers registers a consumer for the pipelines configured for each route
func (r *router[C]) registerRouteConsumers() error {
for _, item := range r.table {
statement, err := r.parser.ParseStatement(item.Statement)
if err != nil {
return err
}

route, ok := r.routes[key(item)]
if !ok {
route.statement = statement
route.statementContext = item.Context
switch item.Context {
case "", "resource":
statement, err := r.resourceParser.ParseStatement(item.Statement)
if err != nil {
return err
}
route.resourceStatement = statement
case "log":
statement, err := r.logParser.ParseStatement(item.Statement)
if err != nil {
return err
}
route.logStatement = statement
}
} else {
pipelineNames := []string{}
for _, pipeline := range item.Pipelines {
Expand All @@ -157,5 +200,8 @@ func (r *router[C]) registerRouteConsumers() error {
}

func key(entry RoutingTableItem) string {
return entry.Statement
if entry.Context == "" || entry.Context == "resource" {
return entry.Statement
}
return "[" + entry.Context + "] " + entry.Statement
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
routing:
match_once: true
default_pipelines:
- logs/default
table:
- context: log
condition: attributes["logName"] != nil
pipelines:
- logs/0
- context: log
condition: attributes["logName"] == "logY"
pipelines:
- logs/1
Loading

0 comments on commit ec75457

Please sign in to comment.