Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #18769 to 7.8: Guard against empty stream.datasource and namespace #18806

Merged
merged 2 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- Clean action store after enrolling to new configuration {pull}18656[18656]
- Avoid watching monitor logs {pull}18723[18723]
- Correctly report platform and family. {issue}18665[18665]
- Guard against empty stream.datasource and namespace {pull}18769[18769]

==== New features

Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ filebeat:
paths:
- /var/log/hello1.log
- /var/log/hello2.log
dataset: generic
index: logs-generic-default
processors:
- add_fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ filebeat:
paths:
- /var/log/hello1.log
- /var/log/hello2.log
dataset: generic
index: logs-generic-default
processors:
- add_fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ filebeat:
paths:
- /var/log/hello1.log
- /var/log/hello2.log
dataset: generic
index: logs-generic-default
processors:
- add_fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
dataset: generic
vars:
var: value
processors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ metricbeat:
type: metrics
dataset: docker.status
namespace: default
- module: docker
metricsets: [info]
index: metrics-generic-default
hosts: ["http://127.0.0.1:8080"]
processors:
- add_fields:
target: "stream"
fields:
type: metrics
dataset: generic
namespace: default
- module: apache
metricsets: [info]
index: metrics-generic-testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ datasources:
streams:
- metricset: status
dataset: docker.status
- metricset: info
dataset: ""
hosts: ["http://127.0.0.1:8080"]
- type: logs
streams:
Expand Down
115 changes: 110 additions & 5 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func (r *RuleList) MarshalYAML() (interface{}, error) {
name = "make_array"
case *RemoveKeyRule:
name = "remove_key"

case *FixStreamRule:
name = "fix_stream"
default:
return nil, fmt.Errorf("unknown rule of type %T", rule)
}
Expand Down Expand Up @@ -153,6 +154,8 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error {
r = &MakeArrayRule{}
case "remove_key":
r = &RemoveKeyRule{}
case "fix_stream":
r = &FixStreamRule{}
default:
return fmt.Errorf("unknown rule of type %s", name)
}
Expand Down Expand Up @@ -345,6 +348,100 @@ func CopyAllToList(to, onMerge string, except ...string) *CopyAllToListRule {
}
}

// FixStreamRule fixes streams to contain default values
// in case no value or invalid value are provided
type FixStreamRule struct {
}

// Apply stream fixes.
func (r *FixStreamRule) Apply(ast *AST) error {
const defaultNamespace = "default"
const defaultDataset = "generic"

datasourcesNode, found := Lookup(ast, "datasources")
if !found {
return nil
}

datasourcesList, ok := datasourcesNode.Value().(*List)
if !ok {
return nil
}

for _, datasourceNode := range datasourcesList.value {
nsNode, found := datasourceNode.Find("namespace")
if found {
nsKey, ok := nsNode.(*Key)
if ok {
if newNamespace := nsKey.value.String(); newNamespace == "" {
nsKey.value = &StrVal{value: defaultNamespace}
}
}
} else {
datasourceMap, ok := datasourceNode.(*Dict)
if !ok {
continue
}
datasourceMap.value = append(datasourceMap.value, &Key{
name: "namespace",
value: &StrVal{value: defaultNamespace},
})
}

// get input
inputNode, found := datasourceNode.Find("inputs")
if !found {
continue
}

inputsList, ok := inputNode.Value().(*List)
if !ok {
continue
}

for _, inputNode := range inputsList.value {
streamsNode, ok := inputNode.Find("streams")
if !ok {
continue
}

streamsList, ok := streamsNode.Value().(*List)
if !ok {
continue
}

for _, streamNode := range streamsList.value {
streamMap, ok := streamNode.(*Dict)
if !ok {
continue
}

dsNode, found := streamNode.Find("dataset")
if found {
dsKey, ok := dsNode.(*Key)
if ok {
if newDataset := dsKey.value.String(); newDataset == "" {
dsKey.value = &StrVal{value: defaultDataset}
}
}
} else {
streamMap.value = append(streamMap.value, &Key{
name: "dataset",
value: &StrVal{value: defaultDataset},
})
}
}
}
}

return nil
}

// FixStream creates a FixStreamRule
func FixStream() *FixStreamRule {
return &FixStreamRule{}
}

// InjectIndexRule injects index to each input.
// Index is in form {type}-{namespace}-{dataset-type}
// type: is provided to the rule.
Expand Down Expand Up @@ -375,7 +472,9 @@ func (r *InjectIndexRule) Apply(ast *AST) error {
if found {
nsKey, ok := nsNode.(*Key)
if ok {
namespace = nsKey.value.String()
if newNamespace := nsKey.value.String(); newNamespace != "" {
namespace = newNamespace
}
}
}

Expand Down Expand Up @@ -413,7 +512,9 @@ func (r *InjectIndexRule) Apply(ast *AST) error {
if found {
dsKey, ok := dsNode.(*Key)
if ok {
dataset = dsKey.value.String()
if newDataset := dsKey.value.String(); newDataset != "" {
dataset = newDataset
}
}

}
Expand Down Expand Up @@ -464,7 +565,9 @@ func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
if found {
nsKey, ok := nsNode.(*Key)
if ok {
namespace = nsKey.value.String()
if newNamespace := nsKey.value.String(); newNamespace != "" {
namespace = newNamespace
}
}
}

Expand Down Expand Up @@ -502,7 +605,9 @@ func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
if found {
dsKey, ok := dsNode.(*Key)
if ok {
dataset = dsKey.value.String()
if newDataset := dsKey.value.String(); newDataset != "" {
dataset = newDataset
}
}
}

Expand Down
97 changes: 97 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,86 @@ func TestRules(t *testing.T) {
expectedYAML string
rule Rule
}{
"fix streams": {
givenYAML: `
datasources:
- name: All default
inputs:
- type: file
streams:
- paths: /var/log/mysql/error.log
- name: Specified namespace
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
- name: Specified dataset
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified with empty strings
namespace: ""
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: ""
`,
expectedYAML: `
datasources:
- name: All default
namespace: default
inputs:
- type: file
streams:
- paths: /var/log/mysql/error.log
dataset: generic
- name: Specified namespace
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: generic
- name: Specified dataset
namespace: default
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified with empty strings
namespace: default
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: generic
`,
rule: &RuleList{
Rules: []Rule{
FixStream(),
},
},
},

"inject index": {
givenYAML: `
datasources:
Expand Down Expand Up @@ -49,6 +129,13 @@ datasources:
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified with empty strings
namespace: ""
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: ""
`,
expectedYAML: `
datasources:
Expand Down Expand Up @@ -80,6 +167,14 @@ datasources:
- paths: /var/log/mysql/access.log
dataset: dsds
index: mytype-dsds-nsns
- name: All specified with empty strings
namespace: ""
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: ""
index: mytype-generic-default
`,
rule: &RuleList{
Rules: []Rule{
Expand Down Expand Up @@ -564,6 +659,7 @@ func TestSerialization(t *testing.T) {
InjectStreamProcessor("insert_after", "index-type"),
CopyToList("t1", "t2", "insert_after"),
CopyAllToList("t2", "insert_before", "a", "b"),
FixStream(),
)

y := `- rename:
Expand Down Expand Up @@ -623,6 +719,7 @@ func TestSerialization(t *testing.T) {
- a
- b
on_conflict: insert_before
- fix_stream: {}
`

t.Run("serialize_rules", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ cmd: filebeat
args: ["-E", "setup.ilm.enabled=false", "-E", "setup.template.enabled=false", "-E", "management.mode=x-pack-fleet", "-E", "management.enabled=true", "-E", "logging.level=debug"]
configurable: grpc
rules:
- fix_stream: {}
- inject_index:
type: logs

Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/metricbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ post_install:
path: "modules.d/system.yml"
target: "modules.d/system.yml.disabled"
rules:
- fix_stream: {}
- inject_index:
type: metrics

Expand Down