From 0b95a8df977ef7bef1eda6d0570c4926f44d4b81 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 16 Jul 2021 12:30:57 +0200 Subject: [PATCH] Add custom suffix to identifiers in filestream input when needed (#26669) (#26918) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What does this PR do? This PR lets you inject suffixes to the state identifiers based on the configuration of the filestream input. For starters, it is needed by the container parser so tracking of different streams (stdout/stderr) can be done separately. ## Why is it important? Without this, the container input cannot be substituted with filestream input with a container parser. (cherry picked from commit 2876cfb15fba9eb8928811ede697ffc6614086f1) Co-authored-by: Noémi Ványi --- filebeat/input/filestream/identifier.go | 40 ++++++++++- filebeat/input/filestream/identifier_test.go | 28 +++++++- .../input/filestream/prospector_creator.go | 6 +- libbeat/reader/parser/parser.go | 10 +++ libbeat/reader/parser/parser_test.go | 69 +++++++++++++++++++ 5 files changed, 147 insertions(+), 6 deletions(-) diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index 7b28a1d3cba..4e2e5643fda 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -76,9 +76,13 @@ func (f fileSource) Name() string { } // newFileIdentifier creates a new state identifier for a log input. -func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) { +func newFileIdentifier(ns *common.ConfigNamespace, suffix string) (fileIdentifier, error) { if ns == nil { - return newINodeDeviceIdentifier(nil) + i, err := newINodeDeviceIdentifier(nil) + if err != nil { + return nil, err + } + return withSuffix(i, suffix), nil } identifierType := ns.Name() @@ -87,7 +91,11 @@ func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) { return nil, fmt.Errorf("no such file_identity generator: %s", identifierType) } - return f(ns.Config()) + i, err := f(ns.Config()) + if err != nil { + return nil, err + } + return withSuffix(i, suffix), nil } type inodeDeviceIdentifier struct { @@ -159,6 +167,32 @@ func (p *pathIdentifier) Supports(f identifierFeature) bool { return false } +type suffixIdentifier struct { + i fileIdentifier + suffix string +} + +func withSuffix(inner fileIdentifier, suffix string) fileIdentifier { + if suffix == "" { + return inner + } + return &suffixIdentifier{i: inner, suffix: suffix} +} + +func (s *suffixIdentifier) GetSource(e loginp.FSEvent) fileSource { + fs := s.i.GetSource(e) + fs.name += "-" + s.suffix + return fs +} + +func (s *suffixIdentifier) Name() string { + return s.i.Name() +} + +func (s *suffixIdentifier) Supports(f identifierFeature) bool { + return s.i.Supports(f) +} + // mockIdentifier is used for testing type MockIdentifier struct{} diff --git a/filebeat/input/filestream/identifier_test.go b/filebeat/input/filestream/identifier_test.go index f5f6296516e..8b9cb4e5f40 100644 --- a/filebeat/input/filestream/identifier_test.go +++ b/filebeat/input/filestream/identifier_test.go @@ -36,7 +36,7 @@ type testFileIdentifierConfig struct { func TestFileIdentifier(t *testing.T) { t.Run("default file identifier", func(t *testing.T) { - identifier, err := newFileIdentifier(nil) + identifier, err := newFileIdentifier(nil, "") require.NoError(t, err) assert.Equal(t, DefaultIdentifierName, identifier.Name()) @@ -59,6 +59,30 @@ func TestFileIdentifier(t *testing.T) { assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String(), src.Name()) }) + t.Run("default file identifier with suffix", func(t *testing.T) { + identifier, err := newFileIdentifier(nil, "my-suffix") + require.NoError(t, err) + assert.Equal(t, DefaultIdentifierName, identifier.Name()) + + tmpFile, err := ioutil.TempFile("", "test_file_identifier_native") + if err != nil { + t.Fatalf("cannot create temporary file for test: %v", err) + } + defer os.Remove(tmpFile.Name()) + + fi, err := tmpFile.Stat() + if err != nil { + t.Fatalf("cannot stat temporary file for test: %v", err) + } + + src := identifier.GetSource(loginp.FSEvent{ + NewPath: tmpFile.Name(), + Info: fi, + }) + + assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String()+"-my-suffix", src.Name()) + }) + t.Run("path identifier", func(t *testing.T) { c := common.MustNewConfigFrom(map[string]interface{}{ "identifier": map[string]interface{}{ @@ -69,7 +93,7 @@ func TestFileIdentifier(t *testing.T) { err := c.Unpack(&cfg) require.NoError(t, err) - identifier, err := newFileIdentifier(cfg.Identifier) + identifier, err := newFileIdentifier(cfg.Identifier, "") require.NoError(t, err) assert.Equal(t, pathName, identifier.Name()) diff --git a/filebeat/input/filestream/prospector_creator.go b/filebeat/input/filestream/prospector_creator.go index 59f86d1426a..f792b075cf3 100644 --- a/filebeat/input/filestream/prospector_creator.go +++ b/filebeat/input/filestream/prospector_creator.go @@ -43,7 +43,7 @@ func newProspector(config config) (loginp.Prospector, error) { return nil, fmt.Errorf("error while creating filewatcher %v", err) } - identifier, err := newFileIdentifier(config.FileIdentity) + identifier, err := newFileIdentifier(config.FileIdentity, getIdentifierSuffix(config)) if err != nil { return nil, fmt.Errorf("error while creating file identifier: %v", err) } @@ -104,3 +104,7 @@ func newProspector(config config) (loginp.Prospector, error) { } return nil, fmt.Errorf("no such rotation method: %s", rotationMethod) } + +func getIdentifierSuffix(config config) string { + return config.Reader.Parsers.Suffix +} diff --git a/libbeat/reader/parser/parser.go b/libbeat/reader/parser/parser.go index 151e912416a..f54c5b98dba 100644 --- a/libbeat/reader/parser/parser.go +++ b/libbeat/reader/parser/parser.go @@ -49,6 +49,8 @@ type CommonConfig struct { } type Config struct { + Suffix string + pCfg CommonConfig parsers []common.ConfigNamespace } @@ -79,6 +81,7 @@ func (c *Config) Unpack(cc *common.Config) error { } func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) { + var suffix string for _, ns := range parsers { name := ns.Name() switch name { @@ -103,12 +106,19 @@ func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, er if err != nil { return nil, fmt.Errorf("error while parsing container parser config: %+v", err) } + if config.Stream != readjson.All { + if suffix != "" { + return nil, fmt.Errorf("only one stream selection is allowed") + } + suffix = config.Stream.String() + } default: return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name) } } return &Config{ + Suffix: suffix, pCfg: pCfg, parsers: parsers, }, nil diff --git a/libbeat/reader/parser/parser_test.go b/libbeat/reader/parser/parser_test.go index 37eba5d15f9..1fdf09ef719 100644 --- a/libbeat/reader/parser/parser_test.go +++ b/libbeat/reader/parser/parser_test.go @@ -32,6 +32,75 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" ) +func TestParsersConfigSuffix(t *testing.T) { + tests := map[string]struct { + parsers map[string]interface{} + expectedSuffix string + expectedError string + }{ + "parsers with no suffix config": { + parsers: map[string]interface{}{ + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "all", + }, + }, + }, + }, + }, + "parsers with correct suffix config": { + parsers: map[string]interface{}{ + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "stdout", + }, + }, + }, + }, + expectedSuffix: "stdout", + }, + "parsers with multiple suffix config": { + parsers: map[string]interface{}{ + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "stdout", + }, + }, + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "stderr", + }, + }, + }, + }, + expectedError: "only one stream selection is allowed", + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + cfg := common.MustNewConfigFrom(test.parsers) + var parsersConfig testParsersConfig + err := cfg.Unpack(&parsersConfig) + require.NoError(t, err) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + + if test.expectedError == "" { + require.NoError(t, err) + } else { + require.Contains(t, err.Error(), test.expectedError) + return + } + require.Equal(t, c.Suffix, test.expectedSuffix) + }) + } + +} + func TestParsersConfigAndReading(t *testing.T) { tests := map[string]struct { lines string