From 5703e3a1b21cf33f8a42752da125ec286fd4972c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 9 Nov 2018 16:09:11 +0100 Subject: [PATCH 1/2] Revert "Minor Journalbeat fixes and additions (#8973) (#9007)" This reverts commit 7c6081d243218b6ece350e2dab3489b955809a92. --- journalbeat/_meta/beat.yml | 14 +- .../6/dashboard/Journalbeat-overview.json | 169 ------------------ journalbeat/checkpoint/checkpoint.go | 3 - journalbeat/config/config.go | 53 ++---- journalbeat/input/config.go | 33 +++- journalbeat/input/input.go | 12 +- journalbeat/journalbeat.reference.yml | 14 +- journalbeat/journalbeat.yml | 14 +- journalbeat/reader/config.go | 8 +- journalbeat/reader/journal.go | 13 +- journalbeat/reader/journal_test.go | 6 +- 11 files changed, 89 insertions(+), 250 deletions(-) delete mode 100644 journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json diff --git a/journalbeat/_meta/beat.yml b/journalbeat/_meta/beat.yml index 78c28ffe36d9..c4e3c14db56c 100644 --- a/journalbeat/_meta/beat.yml +++ b/journalbeat/_meta/beat.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 20s + #max_backoff: 60s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -42,3 +42,15 @@ journalbeat.inputs: # Name of the registry file. If a relative path is used, it is considered relative to the # data path. #registry_file: registry + + # The number of seconds to wait before trying to read again from journals. + #backoff: 1s + # The maximum number of seconds to wait before attempting to read again from journals. + #max_backoff: 60s + + # Position to start reading from all journal. Possible values: head, tail, cursor + #seek: head + + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] diff --git a/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json b/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json deleted file mode 100644 index fc771e9bebd4..000000000000 --- a/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json +++ /dev/null @@ -1,169 +0,0 @@ -{ - "objects": [ - { - "attributes": { - "columns": [ - "@timestamp", - "host.name", - "message" - ], - "description": "", - "hits": 0, - "kibanaSavedObjectMeta": { - "searchSourceJSON": { - "filter": [], - "highlightAll": true, - "index": "journalbeat-*", - "query": { - "language": "lucene", - "query": "process.name:systemd" - }, - "version": true - } - }, - "sort": [ - "@timestamp", - "desc" - ], - "title": "[Journalbeat] Systemd messages", - "version": 1 - }, - "id": "aa003e90-e2b9-11e8-9f52-734e93de180d", - "type": "search", - "updated_at": "2018-11-07T18:19:28.377Z", - "version": 1 - }, - { - "attributes": { - "columns": [ - "@timestamp", - "host.name", - "journald.kernel.subsystem", - "message" - ], - "description": "", - "hits": 0, - "kibanaSavedObjectMeta": { - "searchSourceJSON": { - "filter": [], - "highlightAll": true, - "index": "journalbeat-*", - "query": { - "language": "lucene", - "query": "syslog.facility:0 AND syslog.priority:\u003c4" - }, - "version": true - } - }, - "sort": [ - "_score", - "desc" - ], - "title": "[Journalbeat] Kernel errors", - "version": 1 - }, - "id": "5db75310-e2ba-11e8-9f52-734e93de180d", - "type": "search", - "updated_at": "2018-11-07T18:24:29.889Z", - "version": 1 - }, - { - "attributes": { - "columns": [ - "@timestamp", - "host.name", - "process.name", - "message" - ], - "description": "", - "hits": 0, - "kibanaSavedObjectMeta": { - "searchSourceJSON": { - "filter": [], - "highlightAll": true, - "index": "journalbeat-*", - "query": { - "language": "lucene", - "query": "syslog.facility:4" - }, - "version": true - } - }, - "sort": [ - "_score", - "desc" - ], - "title": "[Journalbeat] Login authorization", - "version": 1 - }, - "id": "82408120-e2ba-11e8-9f52-734e93de180d", - "type": "search", - "updated_at": "2018-11-07T18:26:05.348Z", - "version": 2 - }, - { - "attributes": { - "columns": [ - "@timestamp", - "host.name", - "journald.kernel.subsystem", - "journald.kernel.device_node_path", - "message" - ], - "description": "", - "hits": 0, - "kibanaSavedObjectMeta": { - "searchSourceJSON": { - "filter": [], - "highlightAll": true, - "index": "journalbeat-*", - "query": { - "language": "lucene", - "query": "journald.kernel.subsystem:usb OR journald.kernel.subsystem:hid" - }, - "version": true - } - }, - "sort": [ - "_score", - "desc" - ], - "title": "[Journalbeat] USB and HID messages", - "version": 1 - }, - "id": "f0232670-e2ba-11e8-9f52-734e93de180d", - "type": "search", - "updated_at": "2018-11-07T18:28:35.543Z", - "version": 1 - }, - { - "attributes": { - "description": "", - "hits": 0, - "kibanaSavedObjectMeta": { - "searchSourceJSON": { - "filter": [], - "query": { - "language": "lucene", - "query": "" - } - } - }, - "optionsJSON": { - "darkTheme": false, - "hidePanelTitles": false, - "useMargins": true - }, - "panelsJSON": null, - "timeRestore": false, - "title": "[Journalbeat] Overview", - "version": 1 - }, - "id": "f2de4440-e2b9-11e8-9f52-734e93de180d", - "type": "dashboard", - "updated_at": "2018-11-07T18:30:18.083Z", - "version": 2 - } - ], - "version": "7.0.0-alpha1-SNAPSHOT" -} diff --git a/journalbeat/checkpoint/checkpoint.go b/journalbeat/checkpoint/checkpoint.go index 0f29861040b4..f2c3bfacdabc 100644 --- a/journalbeat/checkpoint/checkpoint.go +++ b/journalbeat/checkpoint/checkpoint.go @@ -32,7 +32,6 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/paths" ) // Checkpoint persists event log state information to disk. @@ -88,8 +87,6 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp save: make(chan JournalState, 1), } - c.file = paths.Resolve(paths.Data, c.file) - // Minimum batch size. if c.maxUpdates < 1 { c.maxUpdates = 1 diff --git a/journalbeat/config/config.go b/journalbeat/config/config.go index 395bf13ec9cb..a2c5b69d9514 100644 --- a/journalbeat/config/config.go +++ b/journalbeat/config/config.go @@ -21,56 +21,25 @@ package config import ( - "fmt" + "time" "github.com/elastic/beats/libbeat/common" ) -// SeekMode is specifies how a journal is read -type SeekMode uint8 - // Config stores the configuration of Journalbeat type Config struct { Inputs []*common.Config `config:"inputs"` RegistryFile string `config:"registry_file"` + Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` + MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` + Seek string `config:"seek"` + Matches []string `config:"include_matches"` } -const ( - // SeekInvalid is an invalid value for seek - SeekInvalid SeekMode = iota - // SeekHead option seeks to the head of a journal - SeekHead - // SeekTail option seeks to the tail of a journal - SeekTail - // SeekCursor option seeks to the position specified in the cursor - SeekCursor - - seekHeadStr = "head" - seekTailStr = "tail" - seekCursorStr = "cursor" -) - -var ( - // DefaultConfig are the defaults of a Journalbeat instance - DefaultConfig = Config{ - RegistryFile: "registry", - } - - seekModes = map[string]SeekMode{ - seekHeadStr: SeekHead, - seekTailStr: SeekTail, - seekCursorStr: SeekCursor, - } -) - -// Unpack validates and unpack "seek" config option -func (m *SeekMode) Unpack(value string) error { - mode, ok := seekModes[value] - if !ok { - return fmt.Errorf("invalid seek mode '%s'", value) - } - - *m = mode - - return nil +// DefaultConfig are the defaults of a Journalbeat instance +var DefaultConfig = Config{ + RegistryFile: "registry", + Backoff: 1 * time.Second, + MaxBackoff: 60 * time.Second, + Seek: "cursor", } diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index 5bdbfcd2ec95..6383998bd1b0 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -18,9 +18,9 @@ package input import ( + "fmt" "time" - "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" ) @@ -29,13 +29,15 @@ import ( type Config struct { // Paths stores the paths to the journal files to be read. Paths []string `config:"paths"` + // MaxBackoff is the limit of the backoff time. + Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` // Backoff is the current interval to wait before // attemting to read again from the journal. - Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` - // MaxBackoff is the limit of the backoff time. + BackoffFactor int `config:"backoff_factor" validate:"min=1"` + // BackoffFactor is the multiplier of Backoff. MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` // Seek is the method to read from journals. - Seek config.SeekMode `config:"seek"` + Seek string `config:"seek"` // Matches store the key value pairs to match entries. Matches []string `config:"include_matches"` @@ -48,8 +50,25 @@ type Config struct { var ( // DefaultConfig is the defaults for an inputs DefaultConfig = Config{ - Backoff: 1 * time.Second, - MaxBackoff: 20 * time.Second, - Seek: config.SeekCursor, + Backoff: 1 * time.Second, + BackoffFactor: 2, + MaxBackoff: 60 * time.Second, + Seek: "cursor", } ) + +// Validate check the configuration of the input. +func (c *Config) Validate() error { + correctSeek := false + for _, s := range []string{"cursor", "head", "tail"} { + if c.Seek == s { + correctSeek = true + } + } + + if !correctSeek { + return fmt.Errorf("incorrect value for seek: %s. possible values: cursor, head, tail", c.Seek) + } + + return nil +} diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 094d169a4ca8..42d8a0ea394d 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -37,7 +37,6 @@ type Input struct { done chan struct{} config Config pipeline beat.Pipeline - client beat.Client states map[string]checkpoint.JournalState id uuid.UUID logger *logp.Logger @@ -121,8 +120,7 @@ func New( // Run connects to the output, collects entries from the readers // and then publishes the events. func (i *Input) Run() { - var err error - i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{ + client, err := i.pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, EventMetadata: i.eventMeta, Meta: nil, @@ -135,12 +133,13 @@ func (i *Input) Run() { i.logger.Error("Error connecting to output: %v", err) return } + defer client.Close() - i.publishAll() + i.publishAll(client) } // publishAll reads events from all readers and publishes them. -func (i *Input) publishAll() { +func (i *Input) publishAll(client beat.Client) { out := make(chan *beat.Event) defer close(out) @@ -180,14 +179,13 @@ func (i *Input) publishAll() { case <-i.done: return case e := <-out: - i.client.Publish(*e) + client.Publish(*e) } } } // Stop stops all readers of the input. func (i *Input) Stop() { - i.client.Close() for _, r := range i.readers { r.Close() } diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 46267fa6b09c..edc5af4b179f 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 20s + #max_backoff: 60s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -43,6 +43,18 @@ journalbeat.inputs: # data path. #registry_file: registry + # The number of seconds to wait before trying to read again from journals. + #backoff: 1s + # The maximum number of seconds to wait before attempting to read again from journals. + #max_backoff: 60s + + # Position to start reading from all journal. Possible values: head, tail, cursor + #seek: head + + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] + #================================ General ====================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/journalbeat/journalbeat.yml b/journalbeat/journalbeat.yml index b2ab42fb81b7..753c6ef4f8f7 100644 --- a/journalbeat/journalbeat.yml +++ b/journalbeat/journalbeat.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 20s + #max_backoff: 60s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -43,6 +43,18 @@ journalbeat.inputs: # data path. #registry_file: registry + # The number of seconds to wait before trying to read again from journals. + #backoff: 1s + # The maximum number of seconds to wait before attempting to read again from journals. + #max_backoff: 60s + + # Position to start reading from all journal. Possible values: head, tail, cursor + #seek: head + + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] + #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/journalbeat/reader/config.go b/journalbeat/reader/config.go index 7d52ff7422df..b81005ec926e 100644 --- a/journalbeat/reader/config.go +++ b/journalbeat/reader/config.go @@ -17,11 +17,7 @@ package reader -import ( - "time" - - "github.com/elastic/beats/journalbeat/config" -) +import "time" // Config stores the options of a reder. type Config struct { @@ -29,7 +25,7 @@ type Config struct { Path string // Seek specifies the seeking stategy. // Possible values: head, tail, cursor. - Seek config.SeekMode + Seek string // MaxBackoff is the limit of the backoff time. MaxBackoff time.Duration // Backoff is the current interval to wait before diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index 8df68170fcda..f7afc30a4d0c 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -32,7 +32,6 @@ import ( "github.com/elastic/beats/journalbeat/checkpoint" "github.com/elastic/beats/journalbeat/cmd/instance" - "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -143,8 +142,7 @@ func setupMatches(j *sdjournal.Journal, matches []string) error { // seek seeks to the position determined by the coniguration and cursor state. func (r *Reader) seek(cursor string) { - switch r.config.Seek { - case config.SeekCursor: + if r.config.Seek == "cursor" { if cursor == "" { r.journal.SeekHead() r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning") @@ -156,15 +154,12 @@ func (r *Reader) seek(cursor string) { r.logger.Error("Error while seeking to cursor") } r.logger.Debug("Seeked to position defined in cursor") - case config.SeekTail: + } else if r.config.Seek == "tail" { r.journal.SeekTail() - r.journal.Next() r.logger.Debug("Tailing the journal file") - case config.SeekHead: + } else if r.config.Seek == "head" { r.journal.SeekHead() r.logger.Debug("Reading from the beginning of the journal file") - default: - r.logger.Error("Invalid seeking mode") } } @@ -225,7 +220,7 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event { } if len(custom) != 0 { - fields.Put("journald.custom", custom) + fields["custom"] = custom } state := checkpoint.JournalState{ diff --git a/journalbeat/reader/journal_test.go b/journalbeat/reader/journal_test.go index 8c37026f8ba3..5170afd25933 100644 --- a/journalbeat/reader/journal_test.go +++ b/journalbeat/reader/journal_test.go @@ -65,10 +65,8 @@ func TestToEvent(t *testing.T) { }, }, expectedFields: common.MapStr{ - "journald": common.MapStr{ - "custom": common.MapStr{ - "my_custom_field": "value", - }, + "custom": common.MapStr{ + "my_custom_field": "value", }, }, }, From 7b19bad82d8d018da86cd673d75f546c607ce122 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Thu, 15 Nov 2018 19:23:16 +0100 Subject: [PATCH 2/2] Add missing journalbeat non breaking fixes (#9106) Backported from master: * refactoring of `SeekMode` * journalbeat can be stopped when no output is available * add dashboard ++ add deprecation warnings for options I want to remove in 7.0 (cherry picked from commit f7638a575ed6e5525fb5cb009ce562443c6c6434) --- CHANGELOG.asciidoc | 2 + .../6/dashboard/Journalbeat-overview.json | 169 ++++++++++++++++++ journalbeat/beater/journalbeat.go | 22 ++- journalbeat/config/config.go | 51 +++++- journalbeat/input/config.go | 22 +-- journalbeat/input/input.go | 12 +- journalbeat/reader/config.go | 8 +- journalbeat/reader/journal.go | 10 +- 8 files changed, 258 insertions(+), 38 deletions(-) create mode 100644 journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 14dd1f481775..9965e1c79420 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -54,6 +54,8 @@ https://github.com/elastic/beats/compare/v6.5.0...6.x[Check the HEAD diff] *Journalbeat* +- Add missing journalbeat non breaking fixes. {pull}9106[9106] + *Metricbeat* - Add missing namespace field in http server metricset {pull}7890[7890] diff --git a/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json b/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json new file mode 100644 index 000000000000..fc771e9bebd4 --- /dev/null +++ b/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json @@ -0,0 +1,169 @@ +{ + "objects": [ + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "process.name:systemd" + }, + "version": true + } + }, + "sort": [ + "@timestamp", + "desc" + ], + "title": "[Journalbeat] Systemd messages", + "version": 1 + }, + "id": "aa003e90-e2b9-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:19:28.377Z", + "version": 1 + }, + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "journald.kernel.subsystem", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "syslog.facility:0 AND syslog.priority:\u003c4" + }, + "version": true + } + }, + "sort": [ + "_score", + "desc" + ], + "title": "[Journalbeat] Kernel errors", + "version": 1 + }, + "id": "5db75310-e2ba-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:24:29.889Z", + "version": 1 + }, + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "process.name", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "syslog.facility:4" + }, + "version": true + } + }, + "sort": [ + "_score", + "desc" + ], + "title": "[Journalbeat] Login authorization", + "version": 1 + }, + "id": "82408120-e2ba-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:26:05.348Z", + "version": 2 + }, + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "journald.kernel.subsystem", + "journald.kernel.device_node_path", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "journald.kernel.subsystem:usb OR journald.kernel.subsystem:hid" + }, + "version": true + } + }, + "sort": [ + "_score", + "desc" + ], + "title": "[Journalbeat] USB and HID messages", + "version": 1 + }, + "id": "f0232670-e2ba-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:28:35.543Z", + "version": 1 + }, + { + "attributes": { + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "query": { + "language": "lucene", + "query": "" + } + } + }, + "optionsJSON": { + "darkTheme": false, + "hidePanelTitles": false, + "useMargins": true + }, + "panelsJSON": null, + "timeRestore": false, + "title": "[Journalbeat] Overview", + "version": 1 + }, + "id": "f2de4440-e2b9-11e8-9f52-734e93de180d", + "type": "dashboard", + "updated_at": "2018-11-07T18:30:18.083Z", + "version": 2 + } + ], + "version": "7.0.0-alpha1-SNAPSHOT" +} diff --git a/journalbeat/beater/journalbeat.go b/journalbeat/beater/journalbeat.go index d6f47b61334c..0c83941f40ad 100644 --- a/journalbeat/beater/journalbeat.go +++ b/journalbeat/beater/journalbeat.go @@ -30,14 +30,14 @@ import ( "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/journalbeat/config" + conf "github.com/elastic/beats/journalbeat/config" ) // Journalbeat instance type Journalbeat struct { inputs []*input.Input done chan struct{} - config config.Config + config conf.Config pipeline beat.Pipeline checkpoint *checkpoint.Checkpoint @@ -48,7 +48,23 @@ type Journalbeat struct { func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { cfgwarn.Experimental("Journalbeat is experimental.") - config := config.DefaultConfig + if cfg.HasField("seek") { + cfgwarn.Deprecate("7.0.0", "global seek is deprecated, Use seek on input level instead.") + } + + if cfg.HasField("backoff") { + cfgwarn.Deprecate("7.0.0", "global backoff is deprecated, Use backoff on input level instead.") + } + + if cfg.HasField("max_backoff") { + cfgwarn.Deprecate("7.0.0", "global max_backoff is deprecated, Use max_backoff on input level instead.") + } + + if cfg.HasField("include_matches") { + cfgwarn.Deprecate("7.0.0", "global include_matches is deprecated, Use include_matches on input level instead.") + } + + config := conf.DefaultConfig if err := cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("error reading config file: %v", err) } diff --git a/journalbeat/config/config.go b/journalbeat/config/config.go index a2c5b69d9514..9a819affdb0b 100644 --- a/journalbeat/config/config.go +++ b/journalbeat/config/config.go @@ -21,25 +21,64 @@ package config import ( + "fmt" "time" "github.com/elastic/beats/libbeat/common" ) +// SeekMode is specifies how a journal is read +type SeekMode uint8 + // Config stores the configuration of Journalbeat type Config struct { Inputs []*common.Config `config:"inputs"` RegistryFile string `config:"registry_file"` Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` - Seek string `config:"seek"` + Seek SeekMode `config:"seek"` Matches []string `config:"include_matches"` } +const ( + // SeekInvalid is an invalid value for seek + SeekInvalid SeekMode = iota + // SeekHead option seeks to the head of a journal + SeekHead + // SeekTail option seeks to the tail of a journal + SeekTail + // SeekCursor option seeks to the position specified in the cursor + SeekCursor + + seekHeadStr = "head" + seekTailStr = "tail" + seekCursorStr = "cursor" +) + // DefaultConfig are the defaults of a Journalbeat instance -var DefaultConfig = Config{ - RegistryFile: "registry", - Backoff: 1 * time.Second, - MaxBackoff: 60 * time.Second, - Seek: "cursor", +var ( + DefaultConfig = Config{ + RegistryFile: "registry", + Backoff: 1 * time.Second, + MaxBackoff: 60 * time.Second, + Seek: SeekCursor, + } + + seekModes = map[string]SeekMode{ + seekHeadStr: SeekHead, + seekTailStr: SeekTail, + seekCursorStr: SeekCursor, + } +) + +// Unpack validates and unpack "seek" config option +func (m *SeekMode) Unpack(value string) error { + mode, ok := seekModes[value] + if !ok { + return fmt.Errorf("invalid seek mode '%s'", value) + } + + *m = mode + + return nil } diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index 6383998bd1b0..04be530e1faf 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -18,9 +18,9 @@ package input import ( - "fmt" "time" + "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" ) @@ -37,7 +37,7 @@ type Config struct { // BackoffFactor is the multiplier of Backoff. MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` // Seek is the method to read from journals. - Seek string `config:"seek"` + Seek config.SeekMode `config:"seek"` // Matches store the key value pairs to match entries. Matches []string `config:"include_matches"` @@ -53,22 +53,6 @@ var ( Backoff: 1 * time.Second, BackoffFactor: 2, MaxBackoff: 60 * time.Second, - Seek: "cursor", + Seek: config.SeekCursor, } ) - -// Validate check the configuration of the input. -func (c *Config) Validate() error { - correctSeek := false - for _, s := range []string{"cursor", "head", "tail"} { - if c.Seek == s { - correctSeek = true - } - } - - if !correctSeek { - return fmt.Errorf("incorrect value for seek: %s. possible values: cursor, head, tail", c.Seek) - } - - return nil -} diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 42d8a0ea394d..094d169a4ca8 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -37,6 +37,7 @@ type Input struct { done chan struct{} config Config pipeline beat.Pipeline + client beat.Client states map[string]checkpoint.JournalState id uuid.UUID logger *logp.Logger @@ -120,7 +121,8 @@ func New( // Run connects to the output, collects entries from the readers // and then publishes the events. func (i *Input) Run() { - client, err := i.pipeline.ConnectWith(beat.ClientConfig{ + var err error + i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, EventMetadata: i.eventMeta, Meta: nil, @@ -133,13 +135,12 @@ func (i *Input) Run() { i.logger.Error("Error connecting to output: %v", err) return } - defer client.Close() - i.publishAll(client) + i.publishAll() } // publishAll reads events from all readers and publishes them. -func (i *Input) publishAll(client beat.Client) { +func (i *Input) publishAll() { out := make(chan *beat.Event) defer close(out) @@ -179,13 +180,14 @@ func (i *Input) publishAll(client beat.Client) { case <-i.done: return case e := <-out: - client.Publish(*e) + i.client.Publish(*e) } } } // Stop stops all readers of the input. func (i *Input) Stop() { + i.client.Close() for _, r := range i.readers { r.Close() } diff --git a/journalbeat/reader/config.go b/journalbeat/reader/config.go index b81005ec926e..7d52ff7422df 100644 --- a/journalbeat/reader/config.go +++ b/journalbeat/reader/config.go @@ -17,7 +17,11 @@ package reader -import "time" +import ( + "time" + + "github.com/elastic/beats/journalbeat/config" +) // Config stores the options of a reder. type Config struct { @@ -25,7 +29,7 @@ type Config struct { Path string // Seek specifies the seeking stategy. // Possible values: head, tail, cursor. - Seek string + Seek config.SeekMode // MaxBackoff is the limit of the backoff time. MaxBackoff time.Duration // Backoff is the current interval to wait before diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index f7afc30a4d0c..f2bc5ef3fa9d 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/journalbeat/checkpoint" "github.com/elastic/beats/journalbeat/cmd/instance" + "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -142,7 +143,8 @@ func setupMatches(j *sdjournal.Journal, matches []string) error { // seek seeks to the position determined by the coniguration and cursor state. func (r *Reader) seek(cursor string) { - if r.config.Seek == "cursor" { + switch r.config.Seek { + case config.SeekCursor: if cursor == "" { r.journal.SeekHead() r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning") @@ -154,12 +156,14 @@ func (r *Reader) seek(cursor string) { r.logger.Error("Error while seeking to cursor") } r.logger.Debug("Seeked to position defined in cursor") - } else if r.config.Seek == "tail" { + case config.SeekTail: r.journal.SeekTail() r.logger.Debug("Tailing the journal file") - } else if r.config.Seek == "head" { + case config.SeekHead: r.journal.SeekHead() r.logger.Debug("Reading from the beginning of the journal file") + default: + r.logger.Error("Invalid seeking mode") } }