Skip to content

Commit

Permalink
Winlogbeat: Use bookmarks to persist last published event (elastic#6150)
Browse files Browse the repository at this point in the history
*  Winlogbeat: Use bookmarks to persist last published event

When using the Windows Event Log API, a bookmark is used to persist the
last position read in the stream. This is a first step to support XML
queries (potentially involving multiple channels). Also fixes a problem
when using Forwarded Events.

* Winlogbeat: Fix tests on computers belonging to a domain

The hostname reported by beats is fully-qualified wheter platform.node()
might only include the host name.

* Add system test to validate resuming from registry
  • Loading branch information
adriansr authored and andrewkroh committed Jan 30, 2018
1 parent d9b1237 commit e3f055f
Show file tree
Hide file tree
Showing 13 changed files with 183 additions and 52 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di

*Winlogbeat*

- Use bookmarks to persist the last published event. {pull}6150[6150]

==== Deprecated

*Affecting all Beats*
Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/beater/eventlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (e *eventLogger) run(
client.Close()
}()

err = api.Open(state.RecordNumber)
err = api.Open(state)
if err != nil {
logp.Warn("EventLog[%s] Open() error. No events will be read from "+
"this source. %v", api.Name(), err)
Expand Down
4 changes: 3 additions & 1 deletion winlogbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type EventLogState struct {
Name string `yaml:"name"`
RecordNumber uint64 `yaml:"record_number"`
Timestamp time.Time `yaml:"timestamp"`
Bookmark string `yaml:"bookmark,omitempty"`
}

// NewCheckpoint creates and returns a new Checkpoint. This method loads state
Expand Down Expand Up @@ -156,11 +157,12 @@ func (c *Checkpoint) States() map[string]EventLogState {
}

// Persist queues the given event log state information to be written to disk.
func (c *Checkpoint) Persist(name string, recordNumber uint64, ts time.Time) {
func (c *Checkpoint) Persist(name string, recordNumber uint64, ts time.Time, bookmark string) {
c.PersistState(EventLogState{
Name: name,
RecordNumber: recordNumber,
Timestamp: ts,
Bookmark: bookmark,
})
}

Expand Down
6 changes: 3 additions & 3 deletions winlogbeat/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestWriteMaxUpdates(t *testing.T) {
defer cp.Shutdown()

// Send update - it's not written to disk but it's in memory.
cp.Persist("App", 1, time.Now())
cp.Persist("App", 1, time.Now(), "")
time.Sleep(500 * time.Millisecond)
_, found := cp.States()["App"]
assert.True(t, found)
Expand All @@ -50,7 +50,7 @@ func TestWriteMaxUpdates(t *testing.T) {
assert.Len(t, ps.States, 0)

// Send update - it is written to disk.
cp.Persist("App", 2, time.Now())
cp.Persist("App", 2, time.Now(), "")
time.Sleep(750 * time.Millisecond)
ps, err = cp.read()
if err != nil {
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestWriteTimedFlush(t *testing.T) {

// Send update then wait longer than the flush interval and it should be
// on disk.
cp.Persist("App", 1, time.Now())
cp.Persist("App", 1, time.Now(), "")
time.Sleep(1500 * time.Millisecond)
ps, err := cp.read()
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion winlogbeat/eventlog/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/winlogbeat/checkpoint"
)

type factory func(*common.Config) (EventLog, error)
Expand All @@ -25,6 +26,8 @@ func newTestEventLog(t *testing.T, factory factory, options map[string]interface

func setupEventLog(t *testing.T, factory factory, recordID uint64, options map[string]interface{}) (EventLog, teardown) {
eventLog := newTestEventLog(t, factory, options)
fatalErr(t, eventLog.Open(recordID))
fatalErr(t, eventLog.Open(checkpoint.EventLogState{
RecordNumber: recordID,
}))
return eventLog, func() { fatalErr(t, eventLog.Close()) }
}
19 changes: 8 additions & 11 deletions winlogbeat/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ var (

// EventLog is an interface to a Windows Event Log.
type EventLog interface {
// Open the event log. recordNumber is the last successfully read event log
// record number. Read will resume from recordNumber + 1. To start reading
// from the first event specify a recordNumber of 0.
Open(recordNumber uint64) error
// Open the event log. state points to the last successfully read event
// in this event log. Read will resume from the next record. To start reading
// from the first event specify a zero-valued EventLogState.
Open(state checkpoint.EventLogState) error

// Read records from the event log.
Read() ([]Record, error)
Expand All @@ -56,8 +56,9 @@ type EventLog interface {
// Record represents a single event from the log.
type Record struct {
sys.Event
API string // The event log API type used to read the record.
XML string // XML representation of the event.
API string // The event log API type used to read the record.
XML string // XML representation of the event.
Offset checkpoint.EventLogState // Position of the record within its source stream.
}

// ToMapStr returns a new MapStr containing the data from this Record.
Expand Down Expand Up @@ -112,11 +113,7 @@ func (e Record) ToEvent() beat.Event {
return beat.Event{
Timestamp: e.TimeCreated.SystemTime,
Fields: m,
Private: checkpoint.EventLogState{
Name: e.Channel,
RecordNumber: e.RecordID,
Timestamp: e.TimeCreated.SystemTime,
},
Private: e.Offset,
}
}

Expand Down
17 changes: 13 additions & 4 deletions winlogbeat/eventlog/eventlogging.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/winlogbeat/checkpoint"
"github.com/elastic/beats/winlogbeat/sys"
win "github.com/elastic/beats/winlogbeat/sys/eventlogging"
)
Expand Down Expand Up @@ -76,9 +77,9 @@ func (l eventLogging) Name() string {
return l.name
}

func (l *eventLogging) Open(recordNumber uint64) error {
func (l *eventLogging) Open(state checkpoint.EventLogState) error {
detailf("%s Open(recordNumber=%d) calling OpenEventLog(uncServerPath=, "+
"providerName=%s)", l.logPrefix, recordNumber, l.name)
"providerName=%s)", l.logPrefix, state.RecordNumber, l.name)
handle, err := win.OpenEventLog("", l.name)
if err != nil {
return err
Expand All @@ -91,7 +92,7 @@ func (l *eventLogging) Open(recordNumber uint64) error {

var oldestRecord, newestRecord uint32
if numRecords > 0 {
l.recordNumber = uint32(recordNumber)
l.recordNumber = uint32(state.RecordNumber)
l.seek = true
l.ignoreFirst = true

Expand Down Expand Up @@ -169,6 +170,11 @@ func (l *eventLogging) Read() ([]Record, error) {
records = append(records, Record{
API: eventLoggingAPIName,
Event: e,
Offset: checkpoint.EventLogState{
Name: l.name,
RecordNumber: e.RecordID,
Timestamp: e.TimeCreated.SystemTime,
},
})
}

Expand Down Expand Up @@ -208,7 +214,10 @@ func (l *eventLogging) readRetryErrorHandler(err error) error {

if reopen {
l.Close()
return l.Open(uint64(l.recordNumber))
return l.Open(checkpoint.EventLogState{
Name: l.name,
RecordNumber: uint64(l.recordNumber),
})
}
}
return err
Expand Down
3 changes: 2 additions & 1 deletion winlogbeat/eventlog/eventlogging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/winlogbeat/checkpoint"
"github.com/elastic/beats/winlogbeat/sys/eventlogging"
)

Expand Down Expand Up @@ -389,7 +390,7 @@ func TestOpenInvalidProvider(t *testing.T) {
configureLogp()

el := newTestEventLogging(t, map[string]interface{}{"name": "nonExistentProvider"})
assert.NoError(t, el.Open(0), "Calling Open() on an unknown provider "+
assert.NoError(t, el.Open(checkpoint.EventLogState{}), "Calling Open() on an unknown provider "+
"should automatically open Application.")
_, err := el.Read()
assert.NoError(t, err)
Expand Down
41 changes: 34 additions & 7 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/winlogbeat/checkpoint"
"github.com/elastic/beats/winlogbeat/sys"
win "github.com/elastic/beats/winlogbeat/sys/wineventlog"
)
Expand Down Expand Up @@ -71,10 +72,10 @@ var _ EventLog = &winEventLog{}
type winEventLog struct {
config winEventLogConfig
query string
channelName string // Name of the channel from which to read.
subscription win.EvtHandle // Handle to the subscription.
maxRead int // Maximum number returned in one Read.
lastRead uint64 // Record number of the last read event.
channelName string // Name of the channel from which to read.
subscription win.EvtHandle // Handle to the subscription.
maxRead int // Maximum number returned in one Read.
lastRead checkpoint.EventLogState // Record number of the last read event.

render func(event win.EvtHandle, out io.Writer) error // Function for rendering the event to XML.
renderBuf []byte // Buffer used for rendering event.
Expand All @@ -89,8 +90,14 @@ func (l *winEventLog) Name() string {
return l.channelName
}

func (l *winEventLog) Open(recordNumber uint64) error {
bookmark, err := win.CreateBookmark(l.channelName, recordNumber)
func (l *winEventLog) Open(state checkpoint.EventLogState) error {
var bookmark win.EvtHandle
var err error
if len(state.Bookmark) > 0 {
bookmark, err = win.CreateBookmarkFromXML(state.Bookmark)
} else {
bookmark, err = win.CreateBookmarkFromRecordID(l.channelName, state.RecordNumber)
}
if err != nil {
return err
}
Expand Down Expand Up @@ -154,8 +161,17 @@ func (l *winEventLog) Read() ([]Record, error) {
incrementMetric(dropReasons, err)
continue
}

r.Offset = checkpoint.EventLogState{
Name: l.channelName,
RecordNumber: r.RecordID,
Timestamp: r.TimeCreated.SystemTime,
}
if r.Offset.Bookmark, err = l.createBookmarkFromEvent(h); err != nil {
logp.Warn("%s failed creating bookmark: %v", l.logPrefix, err)
}
records = append(records, r)
l.lastRead = r.RecordID
l.lastRead = r.Offset
}

debugf("%s Read() is returning %d records", l.logPrefix, len(records))
Expand Down Expand Up @@ -300,6 +316,17 @@ func newWinEventLog(options *common.Config) (EventLog, error) {
return l, nil
}

func (l *winEventLog) createBookmarkFromEvent(evtHandle win.EvtHandle) (string, error) {
bmHandle, err := win.CreateBookmarkFromEvent(evtHandle)
if err != nil {
return "", err
}
l.outputBuf.Reset()
err = win.RenderBookmarkXML(bmHandle, l.renderBuf, l.outputBuf)
win.Close(bmHandle)
return string(l.outputBuf.Bytes()), err
}

func init() {
// Register wineventlog API if it is available.
available, _ := win.IsAvailable()
Expand Down
70 changes: 52 additions & 18 deletions winlogbeat/sys/wineventlog/wineventlog_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,27 +198,18 @@ func RenderEvent(
// XML will not include the message, and in this case RenderEvent should be
// used.
func RenderEventXML(eventHandle EvtHandle, renderBuf []byte, out io.Writer) error {
var bufferUsed, propertyCount uint32
err := _EvtRender(0, eventHandle, EvtRenderEventXml, uint32(len(renderBuf)),
&renderBuf[0], &bufferUsed, &propertyCount)
if err == ERROR_INSUFFICIENT_BUFFER {
return sys.InsufficientBufferError{err, int(bufferUsed)}
}
if err != nil {
return err
}
return renderXML(eventHandle, EvtRenderEventXml, renderBuf, out)
}

if int(bufferUsed) > len(renderBuf) {
return fmt.Errorf("Windows EvtRender reported that wrote %d bytes "+
"to the buffer, but the buffer can only hold %d bytes",
bufferUsed, len(renderBuf))
}
return sys.UTF16ToUTF8Bytes(renderBuf[:bufferUsed], out)
// RenderBookmarkXML renders a bookmark as XML.
func RenderBookmarkXML(bookmarkHandle EvtHandle, renderBuf []byte, out io.Writer) error {
return renderXML(bookmarkHandle, EvtRenderBookmark, renderBuf, out)
}

// CreateBookmark creates a new handle to a bookmark. Close must be called on
// returned EvtHandle when finished with the handle.
func CreateBookmark(channel string, recordID uint64) (EvtHandle, error) {
// CreateBookmarkFromRecordID creates a new bookmark pointing to the given recordID
// within the supplied channel. Close must be called on returned EvtHandle when
// finished with the handle.
func CreateBookmarkFromRecordID(channel string, recordID uint64) (EvtHandle, error) {
xml := fmt.Sprintf(bookmarkTemplate, channel, recordID)
p, err := syscall.UTF16PtrFromString(xml)
if err != nil {
Expand All @@ -233,6 +224,30 @@ func CreateBookmark(channel string, recordID uint64) (EvtHandle, error) {
return h, nil
}

// CreateBookmarkFromEvent creates a new bookmark pointing to the given event.
// Close must be called on returned EvtHandle when finished with the handle.
func CreateBookmarkFromEvent(handle EvtHandle) (EvtHandle, error) {
h, err := _EvtCreateBookmark(nil)
if err != nil {
return 0, err
}
if err = _EvtUpdateBookmark(h, handle); err != nil {
return 0, err
}
return h, nil
}

// CreateBookmarkFromXML creates a new bookmark from the serialised representation
// of an existing bookmark. Close must be called on returned EvtHandle when
// finished with the handle.
func CreateBookmarkFromXML(bookmarkXML string) (EvtHandle, error) {
xml, err := syscall.UTF16PtrFromString(bookmarkXML)
if err != nil {
return 0, err
}
return _EvtCreateBookmark(xml)
}

// CreateRenderContext creates a render context. Close must be called on
// returned EvtHandle when finished with the handle.
func CreateRenderContext(valuePaths []string, flag EvtRenderContextFlag) (EvtHandle, error) {
Expand Down Expand Up @@ -412,3 +427,22 @@ func evtRenderProviderName(renderBuf []byte, eventHandle EvtHandle) (string, err
reader := bytes.NewReader(renderBuf)
return readString(renderBuf, reader)
}

func renderXML(eventHandle EvtHandle, flag EvtRenderFlag, renderBuf []byte, out io.Writer) error {
var bufferUsed, propertyCount uint32
err := _EvtRender(0, eventHandle, flag, uint32(len(renderBuf)),
&renderBuf[0], &bufferUsed, &propertyCount)
if err == ERROR_INSUFFICIENT_BUFFER {
return sys.InsufficientBufferError{err, int(bufferUsed)}
}
if err != nil {
return err
}

if int(bufferUsed) > len(renderBuf) {
return fmt.Errorf("Windows EvtRender reported that wrote %d bytes "+
"to the buffer, but the buffer can only hold %d bytes",
bufferUsed, len(renderBuf))
}
return sys.UTF16ToUTF8Bytes(renderBuf[:bufferUsed], out)
}
Loading

0 comments on commit e3f055f

Please sign in to comment.