Skip to content

Commit

Permalink
Reuse a byte buffer for holding XML
Browse files Browse the repository at this point in the history
Previously the data was read into a []byte encoded as UTF16. Then that
data was converted to []uint16 so that we can use utf16.Decode(). Then
the []rune slice was converted to a string which did another data copy.
The XML was unmarshalled from the string.

This PR changes the code to convert the UTF16 []byte directly to UTF8 and
puts the result into a reusable bytes.Buffer. The XML is then unmarshalled
directly from the data in buffer.

```
BenchmarkUTF16ToUTF8-4   	 2000000	      1044 ns/op        4 B/op      1 allocs/op
```

Original
```
--- PASS: TestBenchmarkBatchReadSize (68.04s)
        bench_test.go100: batch_size=10, total_events=20000, batch_time=5.682627ms, events_per_sec=1759.7494961397256, bytes_alloced_per_event=44 kB, total_allocs=4923840
        bench_test.go100: batch_size=100, total_events=30000, batch_time=53.850879ms, events_per_sec=1856.9799018508127, bytes_alloced_per_event=44 kB, total_allocs=7354285
        bench_test.go100: batch_size=500, total_events=25000, batch_time=271.118774ms, events_per_sec=1844.2101689350366, bytes_alloced_per_event=43 kB, total_allocs=6125665
        bench_test.go100: batch_size=1000, total_events=30000, batch_time=558.03918ms, events_per_sec=1791.9888707455987, bytes_alloced_per_event=43 kB, total_allocs=7350324
```

After elastic#3113
```
--- PASS: TestBenchmarkBatchReadSize (71.85s)
        bench_test.go:100: batch_size=10, total_events=30000, batch_time=5.713873ms, events_per_sec=1750.1264028794478, bytes_alloced_per_event=25 kB, total_allocs=7385820
        bench_test.go:100: batch_size=100, total_events=30000, batch_time=52.454484ms, events_per_sec=1906.4147118480853, bytes_alloced_per_event=24 kB, total_allocs=7354318
        bench_test.go:100: batch_size=500, total_events=25000, batch_time=260.56659ms, events_per_sec=1918.8952812407758, bytes_alloced_per_event=24 kB, total_allocs=6125688
        bench_test.go:100: batch_size=1000, total_events=30000, batch_time=530.468816ms, events_per_sec=1885.124949550286, bytes_alloced_per_event=24 kB, total_allocs=7350360
```

After this PR
```
--- PASS: TestBenchmarkBatchReadSize (75.71s)
        bench_test.go:100: batch_size=10, total_events=20000, batch_time=5.784644ms, events_per_sec=1728.7148526339736, bytes_alloced_per_event=14 kB, total_allocs=4863853
        bench_test.go:100: batch_size=100, total_events=30000, batch_time=55.51756ms, events_per_sec=1801.2318985200359, bytes_alloced_per_event=14 kB, total_allocs=7264293
        bench_test.go:100: batch_size=500, total_events=25000, batch_time=276.171282ms, events_per_sec=1810.4706484289702, bytes_alloced_per_event=14 kB, total_allocs=6050671
        bench_test.go:100: batch_size=1000, total_events=30000, batch_time=619.758713ms, events_per_sec=1613.5311678950125, bytes_alloced_per_event=14 kB, total_allocs=7260333
```
  • Loading branch information
andrewkroh committed Dec 6, 2016
1 parent 46baa43 commit 516a307
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v5.0.1...master[Check the HEAD diff]
*Winlogbeat*

- Add `event_logs.batch_read_size` configuration option. {pull}2641[2641]
- Reduced amount of memory allocated while reading event log records. {pull}3113[3113] {pull}3118[3113]

==== Deprecated

Expand Down
36 changes: 21 additions & 15 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
package eventlog

import (
"bytes"
"fmt"
"io"
"syscall"
"time"

Expand Down Expand Up @@ -75,9 +77,10 @@ type winEventLog struct {
maxRead int // Maximum number returned in one Read.
lastRead uint64 // Record number of the last read event.

render func(event win.EvtHandle) (string, error) // Function for rendering the event to XML.
renderBuf []byte // Buffer used for rendering event.
cache *messageFilesCache // Cached mapping of source name to event message file handles.
render func(event win.EvtHandle, out io.Writer) error // Function for rendering the event to XML.
renderBuf []byte // Buffer used for rendering event.
outputBuf *bytes.Buffer // Buffer for receiving XML
cache *messageFilesCache // Cached mapping of source name to event message file handles.

logPrefix string // String to prefix on log messages.
eventMetadata common.EventMetadata // Field and tags to add to each event.
Expand Down Expand Up @@ -132,20 +135,22 @@ func (l *winEventLog) Read() ([]Record, error) {

var records []Record
for _, h := range handles {
x, err := l.render(h)
l.outputBuf.Reset()
err := l.render(h, l.outputBuf)
if bufErr, ok := err.(sys.InsufficientBufferError); ok {
detailf("%s Increasing render buffer size to %d", l.logPrefix,
bufErr.RequiredSize)
l.renderBuf = make([]byte, bufErr.RequiredSize)
x, err = l.render(h)
l.outputBuf.Reset()
err = l.render(h, l.outputBuf)
}
if err != nil && x == "" {
if err != nil && l.outputBuf.Len() == 0 {
logp.Err("%s Dropping event with rendering error. %v", l.logPrefix, err)
incrementMetric(dropReasons, err)
continue
}

r, err := l.buildRecordFromXML(x, err)
r, err := l.buildRecordFromXML(l.outputBuf.Bytes(), err)
if err != nil {
logp.Err("%s Dropping event. %v", l.logPrefix, err)
incrementMetric(dropReasons, err)
Expand Down Expand Up @@ -192,8 +197,8 @@ func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) {
}
}

func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record, error) {
e, err := sys.UnmarshalEventXML([]byte(x))
func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record, error) {
e, err := sys.UnmarshalEventXML(x)
if err != nil {
return Record{}, fmt.Errorf("Failed to unmarshal XML='%s'. %v", x, err)
}
Expand All @@ -213,7 +218,7 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record,
}

if logp.IsDebug(detailSelector) {
detailf("%s XML=%s Event=%+v", l.logPrefix, x, e)
detailf("%s XML=%s Event=%+v", l.logPrefix, string(x), e)
}

r := Record{
Expand All @@ -223,7 +228,7 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record,
}

if l.config.IncludeXML {
r.XML = x
r.XML = string(x)
}

return r, nil
Expand Down Expand Up @@ -270,6 +275,7 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) {
channelName: c.Name,
maxRead: c.BatchReadSize,
renderBuf: make([]byte, renderBufferSize),
outputBuf: bytes.NewBuffer(make([]byte, renderBufferSize)),
cache: newMessageFilesCache(c.Name, eventMetadataHandle, freeHandle),
logPrefix: fmt.Sprintf("WinEventLog[%s]", c.Name),
eventMetadata: c.EventMetadata,
Expand All @@ -281,12 +287,12 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) {
switch {
case c.Forwarded == nil && c.Name == "ForwardedEvents",
c.Forwarded != nil && *c.Forwarded == true:
l.render = func(event win.EvtHandle) (string, error) {
return win.RenderEventXML(event, l.renderBuf)
l.render = func(event win.EvtHandle, out io.Writer) error {
return win.RenderEventXML(event, l.renderBuf, out)
}
default:
l.render = func(event win.EvtHandle) (string, error) {
return win.RenderEvent(event, 0, l.renderBuf, l.cache.get)
l.render = func(event win.EvtHandle, out io.Writer) error {
return win.RenderEvent(event, 0, l.renderBuf, l.cache.get, out)
}
}

Expand Down
58 changes: 58 additions & 0 deletions winlogbeat/sys/strings.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,69 @@
package sys

import (
"errors"
"fmt"
"io"
"strings"
"unicode/utf16"
"unicode/utf8"
)

// The conditions replacementChar==unicode.ReplacementChar and
// maxRune==unicode.MaxRune are verified in the tests.
// Defining them locally avoids this package depending on package unicode.

const (
replacementChar = '\uFFFD' // Unicode replacement character
maxRune = '\U0010FFFF' // Maximum valid Unicode code point.
)

const (
// 0xd800-0xdc00 encodes the high 10 bits of a pair.
// 0xdc00-0xe000 encodes the low 10 bits of a pair.
// the value is those 20 bits plus 0x10000.
surr1 = 0xd800
surr2 = 0xdc00
surr3 = 0xe000

surrSelf = 0x10000
)

var ErrBufferTooSmall = errors.New("buffer too small")

func UTF16ToUTF8Bytes(in []byte, out io.Writer) error {
if len(in)%2 != 0 {
return fmt.Errorf("input buffer must have an even length (length=%d)", len(in))
}

var runeBuf [4]byte
var v1, v2 uint16
for i := 0; i < len(in); i += 2 {
v1 = uint16(in[i]) | uint16(in[i+1])<<8

switch {
case v1 < surr1, surr3 <= v1:
n := utf8.EncodeRune(runeBuf[:], rune(v1))
out.Write(runeBuf[:n])
case surr1 <= v1 && v1 < surr2 && len(in) > i+2:
v2 = uint16(in[i+2]) | uint16(in[i+3])<<8
if surr2 <= v2 && v2 < surr3 {
// valid surrogate sequence
r := utf16.DecodeRune(rune(v1), rune(v2))
n := utf8.EncodeRune(runeBuf[:], r)
out.Write(runeBuf[:n])
}
i += 2
default:
// invalid surrogate sequence
n := utf8.EncodeRune(runeBuf[:], replacementChar)
out.Write(runeBuf[:n])
}
}

return nil
}

// UTF16BytesToString returns a string that is decoded from the UTF-16 bytes.
// The byte slice must be of even length otherwise an error will be returned.
// The integer returned is the offset to the start of the next string with
Expand Down
23 changes: 23 additions & 0 deletions winlogbeat/sys/strings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,26 @@ func BenchmarkUTF16BytesToString(b *testing.B) {
}
})
}

func TestUTF16ToUTF8(t *testing.T) {
input := "abc白鵬翔\u145A6"
utf16Bytes := toUTF16Bytes(input)

outputBuf := &bytes.Buffer{}
err := UTF16ToUTF8Bytes(utf16Bytes, outputBuf)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, []byte(input), outputBuf.Bytes())
}

func BenchmarkUTF16ToUTF8(b *testing.B) {
utf16Bytes := toUTF16Bytes("A logon was attempted using explicit credentials.")
outputBuf := &bytes.Buffer{}
b.ResetTimer()

for i := 0; i < b.N; i++ {
UTF16ToUTF8Bytes(utf16Bytes, outputBuf)
outputBuf.Reset()
}
}
45 changes: 22 additions & 23 deletions winlogbeat/sys/wineventlog/wineventlog_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,11 @@ func RenderEvent(
lang uint32,
renderBuf []byte,
pubHandleProvider func(string) sys.MessageFiles,
) (string, error) {
out io.Writer,
) error {
providerName, err := evtRenderProviderName(renderBuf, eventHandle)
if err != nil {
return "", err
return err
}

var publisherHandle uintptr
Expand All @@ -173,46 +174,45 @@ func RenderEvent(
}

// Only a single string is returned when rendering XML.
xml, err := FormatEventString(EvtFormatMessageXml,
eventHandle, providerName, EvtHandle(publisherHandle), lang, renderBuf)
err = FormatEventString(EvtFormatMessageXml,
eventHandle, providerName, EvtHandle(publisherHandle), lang, renderBuf, out)

// Recover by rendering the XML without the RenderingInfo (message string).
if err != nil {
// Do not try to recover from InsufficientBufferErrors because these
// can be retried with a larger buffer.
if _, ok := err.(sys.InsufficientBufferError); ok {
return "", err
return err
}

xml, err = RenderEventXML(eventHandle, renderBuf)
err = RenderEventXML(eventHandle, renderBuf, out)
}

return xml, err
return err
}

// RenderEventXML renders the event as XML. If the event is already rendered, as
// in a forwarded event whose content type is "RenderedText", then the XML will
// include the RenderingInfo (message). If the event is not rendered then the
// XML will not include the message, and in this case RenderEvent should be
// used.
func RenderEventXML(eventHandle EvtHandle, renderBuf []byte) (string, error) {
func RenderEventXML(eventHandle EvtHandle, renderBuf []byte, out io.Writer) error {
var bufferUsed, propertyCount uint32
err := _EvtRender(0, eventHandle, EvtRenderEventXml, uint32(len(renderBuf)),
&renderBuf[0], &bufferUsed, &propertyCount)
if err == ERROR_INSUFFICIENT_BUFFER {
return "", sys.InsufficientBufferError{err, int(bufferUsed)}
return sys.InsufficientBufferError{err, int(bufferUsed)}
}
if err != nil {
return "", err
return err
}

if int(bufferUsed) > len(renderBuf) {
return "", fmt.Errorf("Windows EvtRender reported that wrote %d bytes "+
return fmt.Errorf("Windows EvtRender reported that wrote %d bytes "+
"to the buffer, but the buffer can only hold %d bytes",
bufferUsed, len(renderBuf))
}
xml, _, err := sys.UTF16BytesToString(renderBuf[:bufferUsed])
return xml, err
return sys.UTF16ToUTF8Bytes(renderBuf[:bufferUsed], out)
}

// CreateBookmark creates a new handle to a bookmark. Close must be called on
Expand Down Expand Up @@ -299,24 +299,25 @@ func FormatEventString(
publisherHandle EvtHandle,
lang uint32,
buffer []byte,
) (string, error) {
out io.Writer,
) error {
// Open a publisher handle if one was not provided.
ph := publisherHandle
if ph == 0 {
ph, err := OpenPublisherMetadata(0, publisher, 0)
if err != nil {
return "", err
return err
}
defer _EvtClose(ph)
}

// Create a buffer if one was not provider.
// Create a buffer if one was not provided.
var bufferUsed uint32
if buffer == nil {
err := _EvtFormatMessage(ph, eventHandle, 0, 0, 0, messageFlag,
0, nil, &bufferUsed)
if err != nil && err != ERROR_INSUFFICIENT_BUFFER {
return "", err
return err
}

bufferUsed *= 2
Expand All @@ -328,16 +329,15 @@ func FormatEventString(
uint32(len(buffer)/2), &buffer[0], &bufferUsed)
bufferUsed *= 2
if err == ERROR_INSUFFICIENT_BUFFER {
return "", sys.InsufficientBufferError{err, int(bufferUsed)}
return sys.InsufficientBufferError{err, int(bufferUsed)}
}
if err != nil {
return "", err
return err
}

// This assumes there is only a single string value to read. This will
// not work to read keys (when messageFlag == EvtFormatMessageKeyword).
value, _, err := sys.UTF16BytesToString(buffer[0:bufferUsed])
return value, err
return sys.UTF16ToUTF8Bytes(buffer[:bufferUsed], out)
}

// offset reads a pointer value from the reader then calculates an offset from
Expand Down Expand Up @@ -400,8 +400,7 @@ func readString(buffer []byte, reader io.Reader) (string, error) {
func evtRenderProviderName(renderBuf []byte, eventHandle EvtHandle) (string, error) {
var bufferUsed, propertyCount uint32
err := _EvtRender(providerNameContext, eventHandle, EvtRenderEventValues,
uint32(len(renderBuf)), &renderBuf[0], &bufferUsed,
&propertyCount)
uint32(len(renderBuf)), &renderBuf[0], &bufferUsed, &propertyCount)
if err == ERROR_INSUFFICIENT_BUFFER {
return "", sys.InsufficientBufferError{err, int(bufferUsed)}
}
Expand Down

0 comments on commit 516a307

Please sign in to comment.