Skip to content

Commit

Permalink
Fix for errno 1734 when calling EvtNext
Browse files Browse the repository at this point in the history
When reading a batch of large event log records the Windows function
EvtNext returns errno 1734 (0x6C6) which is RPC_S_INVALID_BOUND ("The
array bounds are invalid."). This seems to be a bug in Windows because
there is no documentation about this behavior.

This fix handles the error by resetting the event log subscription
handle (so events are not lost) and then retries the EvtNext call
with maxHandles/2.

Fixes elastic#3076
  • Loading branch information
andrewkroh committed Dec 5, 2016
1 parent 1744740 commit 226eb10
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v5.0.1...master[Check the HEAD diff]
- Fix registry cleanup issue when files falling under ignore_older after restart. {issue}2818[2818]

*Winlogbeat*
- Fix for "The array bounds are invalid" error when reading large events. {issue}3076[3076]

==== Added

Expand Down
26 changes: 23 additions & 3 deletions winlogbeat/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"strconv"
"syscall"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand All @@ -23,9 +24,14 @@ var (
detailf = logp.MakeDebug(detailSelector)
)

// dropReasons contains counters for the number of dropped events for each
// reason.
var dropReasons = expvar.NewMap("drop_reasons")
var (
// dropReasons contains counters for the number of dropped events for each
// reason.
dropReasons = expvar.NewMap("drop_reasons")

// readErrors contains counters for the read error types that occur.
readErrors = expvar.NewMap("read_errors")
)

// EventLog is an interface to a Windows Event Log.
type EventLog interface {
Expand Down Expand Up @@ -177,3 +183,17 @@ func isZero(i interface{}) bool {
}
return false
}

// incrementMetric increments a value in the specified expvar.Map. The key
// should be a windows syscall.Errno or a string. Any other types will be
// reported under the "other" key.
func incrementMetric(v *expvar.Map, key interface{}) {
switch t := key.(type) {
default:
v.Add("other", 1)
case string:
v.Add(t, 1)
case syscall.Errno:
v.Add(strconv.Itoa(int(t)), 1)
}
}
1 change: 1 addition & 0 deletions winlogbeat/eventlog/eventlogging.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (l *eventLogging) Close() error {
// by attempting to correct the error through closing and reopening the event
// log.
func (l *eventLogging) readRetryErrorHandler(err error) error {
incrementMetric(readErrors, err)
if errno, ok := err.(syscall.Errno); ok {
var reopen bool

Expand Down
14 changes: 13 additions & 1 deletion winlogbeat/eventlog/eventlogging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package eventlog

import (
"fmt"
"os/exec"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -35,6 +37,8 @@ const (

const allLevels = elog.Success | elog.AuditFailure | elog.AuditSuccess | elog.Error | elog.Info | elog.Warning

const gigabyte = 1 << 30

// Test messages.
var messages = map[uint32]struct {
eventType uint16
Expand Down Expand Up @@ -72,7 +76,7 @@ var oneTimeLogpInit sync.Once
func configureLogp() {
oneTimeLogpInit.Do(func() {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"eventlog", "eventlog_detail"})
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"eventlog"})
logp.Info("DEBUG enabled for eventlog.")
} else {
logp.LogInit(logp.LOG_WARNING, "", false, true, []string{})
Expand Down Expand Up @@ -143,6 +147,14 @@ func uninstallLog(provider, source string, log *elog.Log) error {
return errs.Err()
}

// setLogSize set the maximum number of bytes that an event log can hold.
func setLogSize(t testing.TB, provider string, sizeBytes int) {
output, err := exec.Command("wevtutil.exe", "sl", "/ms:"+strconv.Itoa(sizeBytes), providerName).CombinedOutput()
if err != nil {
t.Fatal("failed to set log size", err, string(output))
}
}

// Verify that all messages are read from the event log.
func TestRead(t *testing.T) {

Expand Down
61 changes: 36 additions & 25 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package eventlog

import (
"fmt"
"strconv"
"syscall"
"time"

Expand All @@ -13,6 +12,7 @@ import (
"github.com/elastic/beats/winlogbeat/sys"
win "github.com/elastic/beats/winlogbeat/sys/wineventlog"
"github.com/joeshaw/multierror"
"github.com/pkg/errors"
"golang.org/x/sys/windows"
)

Expand Down Expand Up @@ -73,6 +73,7 @@ type winEventLog struct {
channelName string // Name of the channel from which to read.
subscription win.EvtHandle // Handle to the subscription.
maxRead int // Maximum number returned in one Read.
lastRead uint64 // Record number of the last read event.

render func(event win.EvtHandle) (string, error) // Function for rendering the event to XML.
renderBuf []byte // Buffer used for rendering event.
Expand Down Expand Up @@ -118,13 +119,8 @@ func (l *winEventLog) Open(recordNumber uint64) error {
}

func (l *winEventLog) Read() ([]Record, error) {
handles, err := win.EventHandles(l.subscription, l.maxRead)
if err == win.ERROR_NO_MORE_ITEMS {
detailf("%s No more events", l.logPrefix)
return nil, nil
}
if err != nil {
logp.Warn("%s EventHandles returned error %v", l.logPrefix, err)
handles, _, err := l.eventHandles(l.maxRead)
if err != nil || len(handles) == 0 {
return nil, err
}
defer func() {
Expand All @@ -145,17 +141,18 @@ func (l *winEventLog) Read() ([]Record, error) {
}
if err != nil && x == "" {
logp.Err("%s Dropping event with rendering error. %v", l.logPrefix, err)
reportDrop(err)
incrementMetric(dropReasons, err)
continue
}

r, err := l.buildRecordFromXML(x, err)
if err != nil {
logp.Err("%s Dropping event. %v", l.logPrefix, err)
reportDrop("unmarshal")
incrementMetric(dropReasons, err)
continue
}
records = append(records, r)
l.lastRead = r.RecordID
}

debugf("%s Read() is returning %d records", l.logPrefix, len(records))
Expand All @@ -167,6 +164,34 @@ func (l *winEventLog) Close() error {
return win.Close(l.subscription)
}

func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) {
handles, err := win.EventHandles(l.subscription, maxRead)
switch err {
case nil:
if l.maxRead > maxRead {
debugf("%s Recovered from RPC_S_INVALID_BOUND error (errno 1734) "+
"by decreasing batch_read_size to %v", l.logPrefix, maxRead)
}
return handles, maxRead, nil
case win.ERROR_NO_MORE_ITEMS:
detailf("%s No more events", l.logPrefix)
return nil, maxRead, nil
case win.RPC_S_INVALID_BOUND:
incrementMetric(readErrors, err)
if err := l.Close(); err != nil {
return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND")
}
if err := l.Open(l.lastRead); err != nil {
return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND")
}
return l.eventHandles(maxRead / 2)
default:
incrementMetric(readErrors, err)
logp.Warn("%s EventHandles returned error %v", l.logPrefix, err)
return nil, 0, err
}
}

func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record, error) {
e, err := sys.UnmarshalEventXML([]byte(x))
if err != nil {
Expand Down Expand Up @@ -204,20 +229,6 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record,
return r, nil
}

// reportDrop reports a dropped event log record and the reason as an expvar
// metric. The reason should be a windows syscall.Errno or a string. Any other
// types will be reported under the "other" key.
func reportDrop(reason interface{}) {
switch t := reason.(type) {
default:
dropReasons.Add("other", 1)
case string:
dropReasons.Add(t, 1)
case syscall.Errno:
dropReasons.Add(strconv.Itoa(int(t)), 1)
}
}

// newWinEventLog creates and returns a new EventLog for reading event logs
// using the Windows Event Log.
func newWinEventLog(options map[string]interface{}) (EventLog, error) {
Expand Down Expand Up @@ -283,7 +294,7 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) {
}

func init() {
// Register eventlogging API if it is available.
// Register wineventlog API if it is available.
available, _ := win.IsAvailable()
if available {
Register(winEventLogAPIName, 0, newWinEventLog, win.Channels)
Expand Down
69 changes: 69 additions & 0 deletions winlogbeat/eventlog/wineventlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
package eventlog

import (
"expvar"
"strconv"
"testing"

elog "github.com/andrewkroh/sys/windows/svc/eventlog"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -52,3 +55,69 @@ func TestWinEventLogBatchReadSize(t *testing.T) {

assert.Len(t, records, batchReadSize)
}

// TestReadLargeBatchSize tests reading from an event log using a large
// read_batch_size parameter. When combined with large messages this causes
// EvtNext (wineventlog.EventRecords) to fail with RPC_S_INVALID_BOUND error.
func TestReadLargeBatchSize(t *testing.T) {
configureLogp()
log, err := initLog(providerName, sourceName, eventCreateMsgFile)
if err != nil {
t.Fatal(err)
}
defer func() {
err := uninstallLog(providerName, sourceName, log)
if err != nil {
t.Fatal(err)
}
}()

setLogSize(t, providerName, gigabyte)

// Publish large test messages.
totalEvents := 1000
for i := 0; i < totalEvents; i++ {
err = log.Report(elog.Info, uint32(i%1000), []string{strconv.Itoa(i) + " " + randString(31800)})
if err != nil {
t.Fatal("ReportEvent error", err)
}
}

eventlog, err := newWinEventLog(map[string]interface{}{"name": providerName, "batch_read_size": 1024})
if err != nil {
t.Fatal(err)
}
err = eventlog.Open(0)
if err != nil {
t.Fatal(err)
}
defer func() {
err := eventlog.Close()
if err != nil {
t.Fatal(err)
}
}()

var eventCount int
for eventCount < totalEvents {
records, err := eventlog.Read()
if err != nil {
t.Fatal("read error", err)
}
if len(records) == 0 {
t.Fatal("read returned 0 records")
}
eventCount += len(records)
}

t.Logf("number of records returned: %v", eventCount)

wineventlog := eventlog.(*winEventLog)
assert.Equal(t, 1024, wineventlog.maxRead)

expvar.Do(func(kv expvar.KeyValue) {
if kv.Key == "read_errors" {
t.Log(kv)
}
})
}
1 change: 1 addition & 0 deletions winlogbeat/sys/wineventlog/syscall_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
ERROR_INSUFFICIENT_BUFFER syscall.Errno = 122
ERROR_NO_MORE_ITEMS syscall.Errno = 259
ERROR_NONE_MAPPED syscall.Errno = 1332
RPC_S_INVALID_BOUND syscall.Errno = 1734
ERROR_INVALID_OPERATION syscall.Errno = 4317
ERROR_EVT_MESSAGE_NOT_FOUND syscall.Errno = 15027
ERROR_EVT_MESSAGE_ID_NOT_FOUND syscall.Errno = 15028
Expand Down
4 changes: 4 additions & 0 deletions winlogbeat/sys/wineventlog/wineventlog_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func Subscribe(
// handles available to return. Close must be called on each returned EvtHandle
// when finished with the handle.
func EventHandles(subscription EvtHandle, maxHandles int) ([]EvtHandle, error) {
if maxHandles < 1 {
return nil, fmt.Errorf("maxHandles must be greater than 0")
}

eventHandles := make([]EvtHandle, maxHandles)
var numRead uint32

Expand Down

0 comments on commit 226eb10

Please sign in to comment.