Skip to content

Commit

Permalink
[pipelines] Change test to not reuse same processor twice in one pipe…
Browse files Browse the repository at this point in the history
…line (#6540)

* [pipelines] Change test to not reuse same processor twice in one pipeline

* Add note to documentation about reuse of processors within a pipeline

* can -> MUST
  • Loading branch information
djaglowski authored Nov 17, 2022
1 parent 0ccafa1 commit c205595
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 4 deletions.
16 changes: 16 additions & 0 deletions .chloggen/processor-duplication-per-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Disallow duplicate references to processors within a single pipeline

# One or more tracking issues or pull requests related to the change
issues: [6540]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 2 additions & 0 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ When the Collector loads this config the result will look like this:

Note that each `batch` processor is an independent instance, although both are configured the same way, i.e. each have a `send_batch_size` of 10000.

The same name of the processor MUST NOT be referenced multiple times in the `processors` key of a single pipeline.

## <a name="opentelemetry-agent"></a>Running as an Agent

On a typical VM/container, there are user applications running in some
Expand Down
7 changes: 7 additions & 0 deletions service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,18 @@ func (cfg *Config) validateService() error {
}

// Validate pipeline processor name references.
procSet := make(map[component.ID]bool, len(cfg.Processors))
for _, ref := range pipeline.Processors {
// Check that the name referenced in the pipeline's processors exists in the top-level processors.
if cfg.Processors[ref] == nil {
return fmt.Errorf("pipeline %q references processor %q which does not exist", pipelineID, ref)
}
// Ensure no processors are duplicated within the pipeline
if _, exists := procSet[ref]; exists {

return fmt.Errorf("pipeline %q references processor %q multiple times", pipelineID, ref)
}
procSet[ref] = true
}

// Validate pipeline has at least one exporter.
Expand Down
10 changes: 10 additions & 0 deletions service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ func TestConfigValidate(t *testing.T) {
},
expected: errors.New(`pipeline "traces" references processor "nop/2" which does not exist`),
},
{
name: "duplicate-processor-reference",
cfgFn: func() *Config {
cfg := generateConfig()
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Processors = append(pipe.Processors, pipe.Processors...)
return cfg
},
expected: errors.New(`pipeline "traces" references processor "nop" multiple times`),
},
{
name: "invalid-exporter-reference",
cfgFn: func() *Config {
Expand Down
2 changes: 1 addition & 1 deletion service/internal/pipelines/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestBuild(t *testing.T) {
{
name: "pipelines_simple_multi_proc.yaml",
receiverIDs: []component.ID{component.NewID("examplereceiver")},
processorIDs: []component.ID{component.NewID("exampleprocessor"), component.NewID("exampleprocessor")},
processorIDs: []component.ID{component.NewID("exampleprocessor"), component.NewIDWithName("exampleprocessor", "1")},
exporterIDs: []component.ID{component.NewID("exampleexporter")},
expectedRequests: 1,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ receivers:

processors:
exampleprocessor:
exampleprocessor/1:

exporters:
exampleexporter:
Expand All @@ -11,15 +12,15 @@ service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [exampleprocessor, exampleprocessor]
processors: [exampleprocessor, exampleprocessor/1]
exporters: [exampleexporter]

metrics:
receivers: [examplereceiver]
processors: [exampleprocessor, exampleprocessor]
processors: [exampleprocessor, exampleprocessor/1]
exporters: [exampleexporter]

logs:
receivers: [examplereceiver]
processors: [exampleprocessor, exampleprocessor]
processors: [exampleprocessor, exampleprocessor/1]
exporters: [exampleexporter]

0 comments on commit c205595

Please sign in to comment.