Skip to content

Commit

Permalink
Add missing journalbeat non breaking fixes (elastic#9106)
Browse files Browse the repository at this point in the history
Backported from master:

* refactoring of `SeekMode`
* journalbeat can be stopped when no output is available
* add dashboard

++ add deprecation warnings for options I want to remove in 7.0
  • Loading branch information
kvch authored Nov 15, 2018
1 parent a4320b0 commit f7638a5
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ https://github.com/elastic/beats/compare/v6.5.0...6.5[Check the HEAD diff]

*Journalbeat*

- Add missing journalbeat non breaking fixes. {pull}9106[9106]

*Metricbeat*

- Fix race condition when enriching events with kubernetes metadata. {issue}9055[9055] {issue}9067[9067]
Expand Down
169 changes: 169 additions & 0 deletions journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
{
"objects": [
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "process.name:systemd"
},
"version": true
}
},
"sort": [
"@timestamp",
"desc"
],
"title": "[Journalbeat] Systemd messages",
"version": 1
},
"id": "aa003e90-e2b9-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:19:28.377Z",
"version": 1
},
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"journald.kernel.subsystem",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "syslog.facility:0 AND syslog.priority:\u003c4"
},
"version": true
}
},
"sort": [
"_score",
"desc"
],
"title": "[Journalbeat] Kernel errors",
"version": 1
},
"id": "5db75310-e2ba-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:24:29.889Z",
"version": 1
},
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"process.name",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "syslog.facility:4"
},
"version": true
}
},
"sort": [
"_score",
"desc"
],
"title": "[Journalbeat] Login authorization",
"version": 1
},
"id": "82408120-e2ba-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:26:05.348Z",
"version": 2
},
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"journald.kernel.subsystem",
"journald.kernel.device_node_path",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "journald.kernel.subsystem:usb OR journald.kernel.subsystem:hid"
},
"version": true
}
},
"sort": [
"_score",
"desc"
],
"title": "[Journalbeat] USB and HID messages",
"version": 1
},
"id": "f0232670-e2ba-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:28:35.543Z",
"version": 1
},
{
"attributes": {
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"query": {
"language": "lucene",
"query": ""
}
}
},
"optionsJSON": {
"darkTheme": false,
"hidePanelTitles": false,
"useMargins": true
},
"panelsJSON": null,
"timeRestore": false,
"title": "[Journalbeat] Overview",
"version": 1
},
"id": "f2de4440-e2b9-11e8-9f52-734e93de180d",
"type": "dashboard",
"updated_at": "2018-11-07T18:30:18.083Z",
"version": 2
}
],
"version": "7.0.0-alpha1-SNAPSHOT"
}
22 changes: 19 additions & 3 deletions journalbeat/beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import (
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/journalbeat/config"
conf "github.com/elastic/beats/journalbeat/config"
)

// Journalbeat instance
type Journalbeat struct {
inputs []*input.Input
done chan struct{}
config config.Config
config conf.Config

pipeline beat.Pipeline
checkpoint *checkpoint.Checkpoint
Expand All @@ -48,7 +48,23 @@ type Journalbeat struct {
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
cfgwarn.Experimental("Journalbeat is experimental.")

config := config.DefaultConfig
if cfg.HasField("seek") {
cfgwarn.Deprecate("7.0.0", "global seek is deprecated, Use seek on input level instead.")
}

if cfg.HasField("backoff") {
cfgwarn.Deprecate("7.0.0", "global backoff is deprecated, Use backoff on input level instead.")
}

if cfg.HasField("max_backoff") {
cfgwarn.Deprecate("7.0.0", "global max_backoff is deprecated, Use max_backoff on input level instead.")
}

if cfg.HasField("include_matches") {
cfgwarn.Deprecate("7.0.0", "global include_matches is deprecated, Use include_matches on input level instead.")
}

config := conf.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("error reading config file: %v", err)
}
Expand Down
51 changes: 45 additions & 6 deletions journalbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,64 @@
package config

import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
)

// SeekMode is specifies how a journal is read
type SeekMode uint8

// Config stores the configuration of Journalbeat
type Config struct {
Inputs []*common.Config `config:"inputs"`
RegistryFile string `config:"registry_file"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Seek string `config:"seek"`
Seek SeekMode `config:"seek"`
Matches []string `config:"include_matches"`
}

const (
// SeekInvalid is an invalid value for seek
SeekInvalid SeekMode = iota
// SeekHead option seeks to the head of a journal
SeekHead
// SeekTail option seeks to the tail of a journal
SeekTail
// SeekCursor option seeks to the position specified in the cursor
SeekCursor

seekHeadStr = "head"
seekTailStr = "tail"
seekCursorStr = "cursor"
)

// DefaultConfig are the defaults of a Journalbeat instance
var DefaultConfig = Config{
RegistryFile: "registry",
Backoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
Seek: "cursor",
var (
DefaultConfig = Config{
RegistryFile: "registry",
Backoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
Seek: SeekCursor,
}

seekModes = map[string]SeekMode{
seekHeadStr: SeekHead,
seekTailStr: SeekTail,
seekCursorStr: SeekCursor,
}
)

// Unpack validates and unpack "seek" config option
func (m *SeekMode) Unpack(value string) error {
mode, ok := seekModes[value]
if !ok {
return fmt.Errorf("invalid seek mode '%s'", value)
}

*m = mode

return nil
}
22 changes: 3 additions & 19 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package input

import (
"fmt"
"time"

"github.com/elastic/beats/journalbeat/config"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)
Expand All @@ -37,7 +37,7 @@ type Config struct {
// BackoffFactor is the multiplier of Backoff.
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
// Seek is the method to read from journals.
Seek string `config:"seek"`
Seek config.SeekMode `config:"seek"`
// Matches store the key value pairs to match entries.
Matches []string `config:"include_matches"`

Expand All @@ -53,22 +53,6 @@ var (
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 60 * time.Second,
Seek: "cursor",
Seek: config.SeekCursor,
}
)

// Validate check the configuration of the input.
func (c *Config) Validate() error {
correctSeek := false
for _, s := range []string{"cursor", "head", "tail"} {
if c.Seek == s {
correctSeek = true
}
}

if !correctSeek {
return fmt.Errorf("incorrect value for seek: %s. possible values: cursor, head, tail", c.Seek)
}

return nil
}
12 changes: 7 additions & 5 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Input struct {
done chan struct{}
config Config
pipeline beat.Pipeline
client beat.Client
states map[string]checkpoint.JournalState
id uuid.UUID
logger *logp.Logger
Expand Down Expand Up @@ -120,7 +121,8 @@ func New(
// Run connects to the output, collects entries from the readers
// and then publishes the events.
func (i *Input) Run() {
client, err := i.pipeline.ConnectWith(beat.ClientConfig{
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: i.eventMeta,
Meta: nil,
Expand All @@ -133,13 +135,12 @@ func (i *Input) Run() {
i.logger.Error("Error connecting to output: %v", err)
return
}
defer client.Close()

i.publishAll(client)
i.publishAll()
}

// publishAll reads events from all readers and publishes them.
func (i *Input) publishAll(client beat.Client) {
func (i *Input) publishAll() {
out := make(chan *beat.Event)
defer close(out)

Expand Down Expand Up @@ -179,13 +180,14 @@ func (i *Input) publishAll(client beat.Client) {
case <-i.done:
return
case e := <-out:
client.Publish(*e)
i.client.Publish(*e)
}
}
}

// Stop stops all readers of the input.
func (i *Input) Stop() {
i.client.Close()
for _, r := range i.readers {
r.Close()
}
Expand Down
Loading

0 comments on commit f7638a5

Please sign in to comment.