Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Guard against empty stream.datasource and namespace #18769

Merged
merged 5 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
- Remove fleet admin from setup script {pull}18611[18611]
- Correctly report platform and family. {issue}18665[18665]
- Clean action store after enrolling to new configuration {pull}18656[18656]
[Ingest Manager] Avoid watching monitor logs {pull}18723[18723]
- Avoid watching monitor logs {pull}18723[18723]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double entry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no i'm just fixing format of previous one

- Guard against empty stream.datasource and namespace {pull}18769[18769]

==== New features

Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ filebeat:
paths:
- /var/log/hello1.log
- /var/log/hello2.log
dataset: generic
index: logs-generic-default
processors:
- add_fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ filebeat:
paths:
- /var/log/hello1.log
- /var/log/hello2.log
dataset: generic
index: logs-generic-default
processors:
- add_fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ filebeat:
paths:
- /var/log/hello1.log
- /var/log/hello2.log
dataset: generic
index: logs-generic-default
processors:
- add_fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
dataset: generic
vars:
var: value
processors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ metricbeat:
type: metrics
dataset: docker.status
namespace: default
- module: docker
metricsets: [info]
index: metrics-generic-default
hosts: ["http://127.0.0.1:8080"]
processors:
- add_fields:
target: "stream"
fields:
type: metrics
dataset: generic
namespace: default
- module: apache
metricsets: [info]
index: metrics-generic-testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ datasources:
streams:
- metricset: status
dataset: docker.status
- metricset: info
dataset: ""
hosts: ["http://127.0.0.1:8080"]
- type: logs
streams:
Expand Down
115 changes: 110 additions & 5 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func (r *RuleList) MarshalYAML() (interface{}, error) {
name = "make_array"
case *RemoveKeyRule:
name = "remove_key"

case *FixStreamRule:
name = "fix_stream"
default:
return nil, fmt.Errorf("unknown rule of type %T", rule)
}
Expand Down Expand Up @@ -153,6 +154,8 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error {
r = &MakeArrayRule{}
case "remove_key":
r = &RemoveKeyRule{}
case "fix_stream":
r = &FixStreamRule{}
default:
return fmt.Errorf("unknown rule of type %s", name)
}
Expand Down Expand Up @@ -345,6 +348,100 @@ func CopyAllToList(to, onMerge string, except ...string) *CopyAllToListRule {
}
}

// FixStreamRule fixes streams to contain default values
// in case no value or invalid value are provided
type FixStreamRule struct {
}

// Apply stream fixes.
func (r *FixStreamRule) Apply(ast *AST) error {
const defaultNamespace = "default"
const defaultDataset = "generic"

datasourcesNode, found := Lookup(ast, "datasources")
if !found {
return nil
}

datasourcesList, ok := datasourcesNode.Value().(*List)
if !ok {
return nil
}

for _, datasourceNode := range datasourcesList.value {
nsNode, found := datasourceNode.Find("namespace")
if found {
nsKey, ok := nsNode.(*Key)
if ok {
if newNamespace := nsKey.value.String(); newNamespace == "" {
nsKey.value = &StrVal{value: defaultNamespace}
}
}
} else {
datasourceMap, ok := datasourceNode.(*Dict)
if !ok {
continue
}
datasourceMap.value = append(datasourceMap.value, &Key{
name: "namespace",
value: &StrVal{value: defaultNamespace},
})
}

// get input
inputNode, found := datasourceNode.Find("inputs")
if !found {
continue
}

inputsList, ok := inputNode.Value().(*List)
if !ok {
continue
}

for _, inputNode := range inputsList.value {
streamsNode, ok := inputNode.Find("streams")
if !ok {
continue
}

streamsList, ok := streamsNode.Value().(*List)
if !ok {
continue
}

for _, streamNode := range streamsList.value {
streamMap, ok := streamNode.(*Dict)
if !ok {
continue
}

dsNode, found := streamNode.Find("dataset")
if found {
dsKey, ok := dsNode.(*Key)
if ok {
if newDataset := dsKey.value.String(); newDataset == "" {
dsKey.value = &StrVal{value: defaultDataset}
}
}
} else {
streamMap.value = append(streamMap.value, &Key{
name: "dataset",
value: &StrVal{value: defaultDataset},
})
}
}
}
}

return nil
}

// FixStream creates a FixStreamRule
func FixStream() *FixStreamRule {
return &FixStreamRule{}
}

// InjectIndexRule injects index to each input.
// Index is in form {type}-{namespace}-{dataset-type}
// type: is provided to the rule.
Expand Down Expand Up @@ -375,7 +472,9 @@ func (r *InjectIndexRule) Apply(ast *AST) error {
if found {
nsKey, ok := nsNode.(*Key)
if ok {
namespace = nsKey.value.String()
if newNamespace := nsKey.value.String(); newNamespace != "" {
namespace = newNamespace
}
}
}

Expand Down Expand Up @@ -413,7 +512,9 @@ func (r *InjectIndexRule) Apply(ast *AST) error {
if found {
dsKey, ok := dsNode.(*Key)
if ok {
dataset = dsKey.value.String()
if newDataset := dsKey.value.String(); newDataset != "" {
dataset = newDataset
}
}

}
Expand Down Expand Up @@ -464,7 +565,9 @@ func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
if found {
nsKey, ok := nsNode.(*Key)
if ok {
namespace = nsKey.value.String()
if newNamespace := nsKey.value.String(); newNamespace != "" {
namespace = newNamespace
}
}
}

Expand Down Expand Up @@ -502,7 +605,9 @@ func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
if found {
dsKey, ok := dsNode.(*Key)
if ok {
dataset = dsKey.value.String()
if newDataset := dsKey.value.String(); newDataset != "" {
dataset = newDataset
}
}
}

Expand Down
97 changes: 97 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,86 @@ func TestRules(t *testing.T) {
expectedYAML string
rule Rule
}{
"fix streams": {
givenYAML: `
datasources:
- name: All default
inputs:
- type: file
streams:
- paths: /var/log/mysql/error.log
- name: Specified namespace
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
- name: Specified dataset
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified with empty strings
namespace: ""
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: ""
`,
expectedYAML: `
datasources:
- name: All default
namespace: default
inputs:
- type: file
streams:
- paths: /var/log/mysql/error.log
dataset: generic
- name: Specified namespace
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: generic
- name: Specified dataset
namespace: default
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified with empty strings
namespace: default
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: generic
`,
rule: &RuleList{
Rules: []Rule{
FixStream(),
},
},
},

"inject index": {
givenYAML: `
datasources:
Expand Down Expand Up @@ -49,6 +129,13 @@ datasources:
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified with empty strings
namespace: ""
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: ""
`,
expectedYAML: `
datasources:
Expand Down Expand Up @@ -80,6 +167,14 @@ datasources:
- paths: /var/log/mysql/access.log
dataset: dsds
index: mytype-dsds-nsns
- name: All specified with empty strings
namespace: ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the expected now be default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is test for injecting index, it does not change existing values.
we either generate index based on these values or we're adding add_fields processor with correct values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need both. values and index must match.

In a follow up we can discuss, if we should even error if it is "".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michalpristas I also thought the result should be default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can fix it but outside of injectIndex rule, what this should do is injecting index and not modifying anything on top of that.

inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: ""
index: mytype-generic-default
`,
rule: &RuleList{
Rules: []Rule{
Expand Down Expand Up @@ -564,6 +659,7 @@ func TestSerialization(t *testing.T) {
InjectStreamProcessor("insert_after", "index-type"),
CopyToList("t1", "t2", "insert_after"),
CopyAllToList("t2", "insert_before", "a", "b"),
FixStream(),
)

y := `- rename:
Expand Down Expand Up @@ -623,6 +719,7 @@ func TestSerialization(t *testing.T) {
- a
- b
on_conflict: insert_before
- fix_stream: {}
`

t.Run("serialize_rules", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ cmd: filebeat
args: ["-E", "setup.ilm.enabled=false", "-E", "setup.template.enabled=false", "-E", "management.mode=x-pack-fleet", "-E", "management.enabled=true", "-E", "logging.level=debug"]
configurable: grpc
rules:
- fix_stream: {}
- inject_index:
type: logs

Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/metricbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ post_install:
path: "modules.d/system.yml"
target: "modules.d/system.yml.disabled"
rules:
- fix_stream: {}
- inject_index:
type: metrics

Expand Down