Skip to content

Commit

Permalink
Revert "Minor Journalbeat fixes and additions (elastic#8973) (elastic…
Browse files Browse the repository at this point in the history
…#9006)" (elastic#9011)

This reverts commit e1c3e0e.
  • Loading branch information
kvch authored Nov 9, 2018
1 parent e1c3e0e commit ff5b9b3
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 250 deletions.
14 changes: 13 additions & 1 deletion journalbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ journalbeat.inputs:
# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 20s
#max_backoff: 60s

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
Expand All @@ -42,3 +42,15 @@ journalbeat.inputs:
# Name of the registry file. If a relative path is used, it is considered relative to the
# data path.
#registry_file: registry

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s

# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []
169 changes: 0 additions & 169 deletions journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json

This file was deleted.

3 changes: 0 additions & 3 deletions journalbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
)

// Checkpoint persists event log state information to disk.
Expand Down Expand Up @@ -88,8 +87,6 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
save: make(chan JournalState, 1),
}

c.file = paths.Resolve(paths.Data, c.file)

// Minimum batch size.
if c.maxUpdates < 1 {
c.maxUpdates = 1
Expand Down
53 changes: 11 additions & 42 deletions journalbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,25 @@
package config

import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
)

// SeekMode is specifies how a journal is read
type SeekMode uint8

// Config stores the configuration of Journalbeat
type Config struct {
Inputs []*common.Config `config:"inputs"`
RegistryFile string `config:"registry_file"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Seek string `config:"seek"`
Matches []string `config:"include_matches"`
}

const (
// SeekInvalid is an invalid value for seek
SeekInvalid SeekMode = iota
// SeekHead option seeks to the head of a journal
SeekHead
// SeekTail option seeks to the tail of a journal
SeekTail
// SeekCursor option seeks to the position specified in the cursor
SeekCursor

seekHeadStr = "head"
seekTailStr = "tail"
seekCursorStr = "cursor"
)

var (
// DefaultConfig are the defaults of a Journalbeat instance
DefaultConfig = Config{
RegistryFile: "registry",
}

seekModes = map[string]SeekMode{
seekHeadStr: SeekHead,
seekTailStr: SeekTail,
seekCursorStr: SeekCursor,
}
)

// Unpack validates and unpack "seek" config option
func (m *SeekMode) Unpack(value string) error {
mode, ok := seekModes[value]
if !ok {
return fmt.Errorf("invalid seek mode '%s'", value)
}

*m = mode

return nil
// DefaultConfig are the defaults of a Journalbeat instance
var DefaultConfig = Config{
RegistryFile: "registry",
Backoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
Seek: "cursor",
}
33 changes: 26 additions & 7 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package input

import (
"fmt"
"time"

"github.com/elastic/beats/journalbeat/config"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)
Expand All @@ -29,13 +29,15 @@ import (
type Config struct {
// Paths stores the paths to the journal files to be read.
Paths []string `config:"paths"`
// MaxBackoff is the limit of the backoff time.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
// Backoff is the current interval to wait before
// attemting to read again from the journal.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
// MaxBackoff is the limit of the backoff time.
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
// BackoffFactor is the multiplier of Backoff.
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
// Seek is the method to read from journals.
Seek config.SeekMode `config:"seek"`
Seek string `config:"seek"`
// Matches store the key value pairs to match entries.
Matches []string `config:"include_matches"`

Expand All @@ -48,8 +50,25 @@ type Config struct {
var (
// DefaultConfig is the defaults for an inputs
DefaultConfig = Config{
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: config.SeekCursor,
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 60 * time.Second,
Seek: "cursor",
}
)

// Validate check the configuration of the input.
func (c *Config) Validate() error {
correctSeek := false
for _, s := range []string{"cursor", "head", "tail"} {
if c.Seek == s {
correctSeek = true
}
}

if !correctSeek {
return fmt.Errorf("incorrect value for seek: %s. possible values: cursor, head, tail", c.Seek)
}

return nil
}
12 changes: 5 additions & 7 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type Input struct {
done chan struct{}
config Config
pipeline beat.Pipeline
client beat.Client
states map[string]checkpoint.JournalState
id uuid.UUID
logger *logp.Logger
Expand Down Expand Up @@ -121,8 +120,7 @@ func New(
// Run connects to the output, collects entries from the readers
// and then publishes the events.
func (i *Input) Run() {
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
client, err := i.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: i.eventMeta,
Meta: nil,
Expand All @@ -135,12 +133,13 @@ func (i *Input) Run() {
i.logger.Error("Error connecting to output: %v", err)
return
}
defer client.Close()

i.publishAll()
i.publishAll(client)
}

// publishAll reads events from all readers and publishes them.
func (i *Input) publishAll() {
func (i *Input) publishAll(client beat.Client) {
out := make(chan *beat.Event)
defer close(out)

Expand Down Expand Up @@ -180,14 +179,13 @@ func (i *Input) publishAll() {
case <-i.done:
return
case e := <-out:
i.client.Publish(*e)
client.Publish(*e)
}
}
}

// Stop stops all readers of the input.
func (i *Input) Stop() {
i.client.Close()
for _, r := range i.readers {
r.Close()
}
Expand Down
Loading

0 comments on commit ff5b9b3

Please sign in to comment.