Skip to content

Commit

Permalink
Config management improvements in filebeat
Browse files Browse the repository at this point in the history
* Code cleanup
* Add humanize for buffersize
  • Loading branch information
ruflin committed Jun 27, 2016
1 parent f2c36ee commit b7aad0b
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 34 deletions.
5 changes: 4 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ type Filebeat struct {

// New creates a new Filebeat pointer instance.
func New() *Filebeat {
return &Filebeat{}
config := cfg.DefaultConfig
return &Filebeat{
config: &config,
}
}

// Config setups up the filebeat configuration by fetch all additional config files
Expand Down
19 changes: 13 additions & 6 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@ import (

// Defaults for config variables which are not set
const (
DefaultRegistryFile string = "registry"
DefaultSpoolSize uint64 = 2048
DefaultIdleTimeout time.Duration = 5 * time.Second
DefaultInputType = "log"
DefaultInputType = "log"
)

var (
DefaultConfig = Config{
FilebeatConfig{
RegistryFile: "registry",
SpoolSize: 2048,
IdleTimeout: 5 * time.Second,
},
}
)

type Config struct {
Expand All @@ -26,9 +33,9 @@ type Config struct {

type FilebeatConfig struct {
Prospectors []*common.Config `config:"prospectors"`
SpoolSize uint64 `config:"spool_size"`
SpoolSize uint64 `config:"spool_size" validate:"min=1"`
PublishAsync bool `config:"publish_async"`
IdleTimeout time.Duration `config:"idle_timeout"`
IdleTimeout time.Duration `config:"idle_timeout" validate:"nonzero,min=0s"`
RegistryFile string `config:"registry_file"`
ConfigDir string `config:"config_dir"`
}
Expand Down
4 changes: 3 additions & 1 deletion filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/processor"
"github.com/elastic/beats/libbeat/common"

"github.com/dustin/go-humanize"
)

var (
defaultConfig = harvesterConfig{
BufferSize: 16 << 10, // 16384
BufferSize: 16 * humanize.KiByte,
DocumentType: "log",
InputType: cfg.DefaultInputType,
TailFiles: false,
Expand Down
2 changes: 1 addition & 1 deletion filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (h *Harvester) openFile() (encoding.Encoding, error) {
f, err := file.ReadOpen(h.Path)
if err == nil {
// Check we are not following a rabbit hole (symlinks, etc.)
if !file.IsRegularFile(f) {
if !file.IsRegular(f) {
return nil, errors.New("Given file is not a regular file.")
}

Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type File struct {
}

// Check that the file isn't a symlink, mode is regular or file is nil
func (f *File) IsRegularFile() bool {
func (f *File) IsRegular() bool {
if f.File == nil {
logp.Critical("Harvester: BUG: f arg is nil")
return false
Expand Down Expand Up @@ -50,7 +50,7 @@ func IsSameFile(path string, info os.FileInfo) bool {
return os.SameFile(fileInfo, info)
}

func IsRegularFile(file *os.File) bool {
func IsRegular(file *os.File) bool {
f := &File{File: file}
return f.IsRegularFile()
return f.IsRegular()
}
5 changes: 0 additions & 5 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ func New(registryFile string) (*Registrar, error) {
// Init sets up the Registrar and make sure the registry file is setup correctly
func (r *Registrar) Init() error {

// Set to default in case it is not set
if r.registryFile == "" {
r.registryFile = cfg.DefaultRegistryFile
}

// The registry file is opened in the data path
r.registryFile = paths.Resolve(paths.Data, r.registryFile)

Expand Down
14 changes: 2 additions & 12 deletions filebeat/spooler/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,13 @@ func New(
publisher chan<- []*input.FileEvent,
) (*Spooler, error) {
spoolSize := config.SpoolSize
if spoolSize <= 0 {
spoolSize = cfg.DefaultSpoolSize
debugf("Spooler will use the default spool_size of %d", spoolSize)
}

idleTimeout := config.IdleTimeout
if idleTimeout <= 0 {
idleTimeout = cfg.DefaultIdleTimeout
debugf("Spooler will use the default idle_timeout of %s", idleTimeout)
}

return &Spooler{
Channel: make(chan *input.FileEvent, channelSize),
idleTimeout: idleTimeout,
idleTimeout: config.IdleTimeout,
spoolSize: spoolSize,
exit: make(chan struct{}),
nextFlushTime: time.Now().Add(idleTimeout),
nextFlushTime: time.Now().Add(config.IdleTimeout),
publisher: publisher,
spool: make([]*input.FileEvent, 0, spoolSize),
}, nil
Expand Down
10 changes: 5 additions & 5 deletions filebeat/spooler/spooler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ func load(t *testing.T, in string) cfg.FilebeatConfig {
t.Fatalf("Failed to parse config input: %v", err)
}

var config cfg.FilebeatConfig
config := cfg.DefaultConfig
err = yaml.Unpack(&config)
if err != nil {
t.Fatalf("Failed to unpack config: %v", err)
}

return config
return config.Filebeat
}

func TestNewSpoolerDefaultConfig(t *testing.T) {
Expand All @@ -33,8 +33,8 @@ func TestNewSpoolerDefaultConfig(t *testing.T) {
spooler, err := New(config, nil)

assert.NoError(t, err)
assert.Equal(t, cfg.DefaultSpoolSize, spooler.spoolSize)
assert.Equal(t, cfg.DefaultIdleTimeout, spooler.idleTimeout)
assert.Equal(t, cfg.DefaultConfig.Filebeat.SpoolSize, spooler.spoolSize)
assert.Equal(t, cfg.DefaultConfig.Filebeat.IdleTimeout, spooler.idleTimeout)
}

func TestNewSpoolerSpoolSize(t *testing.T) {
Expand All @@ -47,7 +47,7 @@ func TestNewSpoolerSpoolSize(t *testing.T) {
}

func TestNewSpoolerIdleTimeout(t *testing.T) {
config := load(t, "idle_timeout: 10s")
config := load(t, "filebeat.idle_timeout: 10s")
spooler, err := New(config, nil)

assert.NoError(t, err)
Expand Down

0 comments on commit b7aad0b

Please sign in to comment.