Skip to content

Commit

Permalink
Add batch_read_size config to Winlogbeat
Browse files Browse the repository at this point in the history
This configuration option allows users to control the number of event log records that are read, processed, and published in its event loop.
  • Loading branch information
andrewkroh committed Sep 26, 2016
1 parent f1af264 commit 0fb5ce7
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ winlogbeat.event_logs:
- name: Application
--------------------------------------------------------------------------------

===== event_logs.batch_read_size

The maximum number of event log records to read from the Windows API in a single
batch. The default batch size is 100. *{vista_and_newer}*

Winlogbeat starts a goroutine (a lightweight thread) to read from each
individual event log. The goroutine reads a batch of event log records using the
Windows API, applies any processors to the events, publishes them to the
configured outputs, and waits for an acknowledgement from the outputs before
reading additional event log records.

[[configuration-winlogbeat-options-event_logs-name]]
===== event_logs.name

Expand Down
25 changes: 13 additions & 12 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
)

const (
// defaultMaxNumRead is the maximum number of event Read will return.
defaultMaxNumRead = 100
// defaultBatchReadSize is the maximum number of event Read will return.
defaultBatchReadSize = 100

// renderBufferSize is the size in bytes of the buffer used to render events.
renderBufferSize = 1 << 14
Expand All @@ -28,15 +28,16 @@ const (
winEventLogAPIName = "wineventlog"
)

var winEventLogConfigKeys = append(commonConfigKeys, "ignore_older", "include_xml",
"event_id", "forwarded", "level", "provider")
var winEventLogConfigKeys = append(commonConfigKeys, "batch_read_size",
"ignore_older", "include_xml", "event_id", "forwarded", "level", "provider")

type winEventLogConfig struct {
ConfigCommon `config:",inline"`
IncludeXML bool `config:"include_xml"`
Forwarded *bool `config:"forwarded"`
SimpleQuery query `config:",inline"`
Raw map[string]interface{} `config:",inline"`
ConfigCommon `config:",inline"`
BatchReadSize int `config:"batch_read_size"`
IncludeXML bool `config:"include_xml"`
Forwarded *bool `config:"forwarded"`
SimpleQuery query `config:",inline"`
Raw map[string]interface{} `config:",inline"`
}

// query contains parameters used to customize the event log data that is
Expand Down Expand Up @@ -121,7 +122,7 @@ func (l *winEventLog) Read() ([]Record, error) {
return nil, nil
}
if err != nil {
logp.Warn("%s EventHandles returned error %v Errno: %d", l.logPrefix, err)
logp.Warn("%s EventHandles returned error %v", l.logPrefix, err)
return nil, err
}
defer func() {
Expand Down Expand Up @@ -218,7 +219,7 @@ func reportDrop(reason interface{}) {
// newWinEventLog creates and returns a new EventLog for reading event logs
// using the Windows Event Log.
func newWinEventLog(options map[string]interface{}) (EventLog, error) {
var c winEventLogConfig
c := winEventLogConfig{BatchReadSize: defaultBatchReadSize}
if err := readConfig(options, &c, winEventLogConfigKeys); err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,7 +255,7 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) {
config: c,
query: query,
channelName: c.Name,
maxRead: defaultMaxNumRead,
maxRead: c.BatchReadSize,
renderBuf: make([]byte, renderBufferSize),
cache: newMessageFilesCache(c.Name, eventMetadataHandle, freeHandle),
logPrefix: fmt.Sprintf("WinEventLog[%s]", c.Name),
Expand Down
54 changes: 54 additions & 0 deletions winlogbeat/eventlog/wineventlog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// +build windows

package eventlog

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestWinEventLogBatchReadSize(t *testing.T) {
configureLogp()
log, err := initLog(providerName, sourceName, eventCreateMsgFile)
if err != nil {
t.Fatal(err)
}
defer func() {
err := uninstallLog(providerName, sourceName, log)
if err != nil {
t.Fatal(err)
}
}()

// Publish test messages:
for k, m := range messages {
err = log.Report(m.eventType, k, []string{m.message})
if err != nil {
t.Fatal(err)
}
}

batchReadSize := 2
eventlog, err := newWinEventLog(map[string]interface{}{"name": providerName, "batch_read_size": batchReadSize})
if err != nil {
t.Fatal(err)
}
err = eventlog.Open(0)
if err != nil {
t.Fatal(err)
}
defer func() {
err := eventlog.Close()
if err != nil {
t.Fatal(err)
}
}()

records, err := eventlog.Read()
if err != nil {
t.Fatal(err)
}

assert.Len(t, records, batchReadSize)
}

0 comments on commit 0fb5ce7

Please sign in to comment.