From 516a307ba7ba2ad6ef74bbbfc2b446a79fa028b2 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Mon, 5 Dec 2016 12:29:57 -0500 Subject: [PATCH] Reuse a byte buffer for holding XML Previously the data was read into a []byte encoded as UTF16. Then that data was converted to []uint16 so that we can use utf16.Decode(). Then the []rune slice was converted to a string which did another data copy. The XML was unmarshalled from the string. This PR changes the code to convert the UTF16 []byte directly to UTF8 and puts the result into a reusable bytes.Buffer. The XML is then unmarshalled directly from the data in buffer. ``` BenchmarkUTF16ToUTF8-4 2000000 1044 ns/op 4 B/op 1 allocs/op ``` Original ``` --- PASS: TestBenchmarkBatchReadSize (68.04s) bench_test.go100: batch_size=10, total_events=20000, batch_time=5.682627ms, events_per_sec=1759.7494961397256, bytes_alloced_per_event=44 kB, total_allocs=4923840 bench_test.go100: batch_size=100, total_events=30000, batch_time=53.850879ms, events_per_sec=1856.9799018508127, bytes_alloced_per_event=44 kB, total_allocs=7354285 bench_test.go100: batch_size=500, total_events=25000, batch_time=271.118774ms, events_per_sec=1844.2101689350366, bytes_alloced_per_event=43 kB, total_allocs=6125665 bench_test.go100: batch_size=1000, total_events=30000, batch_time=558.03918ms, events_per_sec=1791.9888707455987, bytes_alloced_per_event=43 kB, total_allocs=7350324 ``` After #3113 ``` --- PASS: TestBenchmarkBatchReadSize (71.85s) bench_test.go:100: batch_size=10, total_events=30000, batch_time=5.713873ms, events_per_sec=1750.1264028794478, bytes_alloced_per_event=25 kB, total_allocs=7385820 bench_test.go:100: batch_size=100, total_events=30000, batch_time=52.454484ms, events_per_sec=1906.4147118480853, bytes_alloced_per_event=24 kB, total_allocs=7354318 bench_test.go:100: batch_size=500, total_events=25000, batch_time=260.56659ms, events_per_sec=1918.8952812407758, bytes_alloced_per_event=24 kB, total_allocs=6125688 bench_test.go:100: batch_size=1000, total_events=30000, batch_time=530.468816ms, events_per_sec=1885.124949550286, bytes_alloced_per_event=24 kB, total_allocs=7350360 ``` After this PR ``` --- PASS: TestBenchmarkBatchReadSize (75.71s) bench_test.go:100: batch_size=10, total_events=20000, batch_time=5.784644ms, events_per_sec=1728.7148526339736, bytes_alloced_per_event=14 kB, total_allocs=4863853 bench_test.go:100: batch_size=100, total_events=30000, batch_time=55.51756ms, events_per_sec=1801.2318985200359, bytes_alloced_per_event=14 kB, total_allocs=7264293 bench_test.go:100: batch_size=500, total_events=25000, batch_time=276.171282ms, events_per_sec=1810.4706484289702, bytes_alloced_per_event=14 kB, total_allocs=6050671 bench_test.go:100: batch_size=1000, total_events=30000, batch_time=619.758713ms, events_per_sec=1613.5311678950125, bytes_alloced_per_event=14 kB, total_allocs=7260333 ``` --- CHANGELOG.asciidoc | 1 + winlogbeat/eventlog/wineventlog.go | 36 +++++++----- winlogbeat/sys/strings.go | 58 +++++++++++++++++++ winlogbeat/sys/strings_test.go | 23 ++++++++ .../sys/wineventlog/wineventlog_windows.go | 45 +++++++------- 5 files changed, 125 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 075c07f1f39..a215979dd8e 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v5.0.1...master[Check the HEAD diff] *Winlogbeat* - Add `event_logs.batch_read_size` configuration option. {pull}2641[2641] +- Reduced amount of memory allocated while reading event log records. {pull}3113[3113] {pull}3118[3113] ==== Deprecated diff --git a/winlogbeat/eventlog/wineventlog.go b/winlogbeat/eventlog/wineventlog.go index 7caab618bf1..760e714dd7a 100644 --- a/winlogbeat/eventlog/wineventlog.go +++ b/winlogbeat/eventlog/wineventlog.go @@ -3,7 +3,9 @@ package eventlog import ( + "bytes" "fmt" + "io" "syscall" "time" @@ -75,9 +77,10 @@ type winEventLog struct { maxRead int // Maximum number returned in one Read. lastRead uint64 // Record number of the last read event. - render func(event win.EvtHandle) (string, error) // Function for rendering the event to XML. - renderBuf []byte // Buffer used for rendering event. - cache *messageFilesCache // Cached mapping of source name to event message file handles. + render func(event win.EvtHandle, out io.Writer) error // Function for rendering the event to XML. + renderBuf []byte // Buffer used for rendering event. + outputBuf *bytes.Buffer // Buffer for receiving XML + cache *messageFilesCache // Cached mapping of source name to event message file handles. logPrefix string // String to prefix on log messages. eventMetadata common.EventMetadata // Field and tags to add to each event. @@ -132,20 +135,22 @@ func (l *winEventLog) Read() ([]Record, error) { var records []Record for _, h := range handles { - x, err := l.render(h) + l.outputBuf.Reset() + err := l.render(h, l.outputBuf) if bufErr, ok := err.(sys.InsufficientBufferError); ok { detailf("%s Increasing render buffer size to %d", l.logPrefix, bufErr.RequiredSize) l.renderBuf = make([]byte, bufErr.RequiredSize) - x, err = l.render(h) + l.outputBuf.Reset() + err = l.render(h, l.outputBuf) } - if err != nil && x == "" { + if err != nil && l.outputBuf.Len() == 0 { logp.Err("%s Dropping event with rendering error. %v", l.logPrefix, err) incrementMetric(dropReasons, err) continue } - r, err := l.buildRecordFromXML(x, err) + r, err := l.buildRecordFromXML(l.outputBuf.Bytes(), err) if err != nil { logp.Err("%s Dropping event. %v", l.logPrefix, err) incrementMetric(dropReasons, err) @@ -192,8 +197,8 @@ func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) { } } -func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record, error) { - e, err := sys.UnmarshalEventXML([]byte(x)) +func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record, error) { + e, err := sys.UnmarshalEventXML(x) if err != nil { return Record{}, fmt.Errorf("Failed to unmarshal XML='%s'. %v", x, err) } @@ -213,7 +218,7 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record, } if logp.IsDebug(detailSelector) { - detailf("%s XML=%s Event=%+v", l.logPrefix, x, e) + detailf("%s XML=%s Event=%+v", l.logPrefix, string(x), e) } r := Record{ @@ -223,7 +228,7 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record, } if l.config.IncludeXML { - r.XML = x + r.XML = string(x) } return r, nil @@ -270,6 +275,7 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) { channelName: c.Name, maxRead: c.BatchReadSize, renderBuf: make([]byte, renderBufferSize), + outputBuf: bytes.NewBuffer(make([]byte, renderBufferSize)), cache: newMessageFilesCache(c.Name, eventMetadataHandle, freeHandle), logPrefix: fmt.Sprintf("WinEventLog[%s]", c.Name), eventMetadata: c.EventMetadata, @@ -281,12 +287,12 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) { switch { case c.Forwarded == nil && c.Name == "ForwardedEvents", c.Forwarded != nil && *c.Forwarded == true: - l.render = func(event win.EvtHandle) (string, error) { - return win.RenderEventXML(event, l.renderBuf) + l.render = func(event win.EvtHandle, out io.Writer) error { + return win.RenderEventXML(event, l.renderBuf, out) } default: - l.render = func(event win.EvtHandle) (string, error) { - return win.RenderEvent(event, 0, l.renderBuf, l.cache.get) + l.render = func(event win.EvtHandle, out io.Writer) error { + return win.RenderEvent(event, 0, l.renderBuf, l.cache.get, out) } } diff --git a/winlogbeat/sys/strings.go b/winlogbeat/sys/strings.go index ecd71dcaaf8..ce9d4db99f4 100644 --- a/winlogbeat/sys/strings.go +++ b/winlogbeat/sys/strings.go @@ -1,11 +1,69 @@ package sys import ( + "errors" "fmt" + "io" "strings" "unicode/utf16" + "unicode/utf8" ) +// The conditions replacementChar==unicode.ReplacementChar and +// maxRune==unicode.MaxRune are verified in the tests. +// Defining them locally avoids this package depending on package unicode. + +const ( + replacementChar = '\uFFFD' // Unicode replacement character + maxRune = '\U0010FFFF' // Maximum valid Unicode code point. +) + +const ( + // 0xd800-0xdc00 encodes the high 10 bits of a pair. + // 0xdc00-0xe000 encodes the low 10 bits of a pair. + // the value is those 20 bits plus 0x10000. + surr1 = 0xd800 + surr2 = 0xdc00 + surr3 = 0xe000 + + surrSelf = 0x10000 +) + +var ErrBufferTooSmall = errors.New("buffer too small") + +func UTF16ToUTF8Bytes(in []byte, out io.Writer) error { + if len(in)%2 != 0 { + return fmt.Errorf("input buffer must have an even length (length=%d)", len(in)) + } + + var runeBuf [4]byte + var v1, v2 uint16 + for i := 0; i < len(in); i += 2 { + v1 = uint16(in[i]) | uint16(in[i+1])<<8 + + switch { + case v1 < surr1, surr3 <= v1: + n := utf8.EncodeRune(runeBuf[:], rune(v1)) + out.Write(runeBuf[:n]) + case surr1 <= v1 && v1 < surr2 && len(in) > i+2: + v2 = uint16(in[i+2]) | uint16(in[i+3])<<8 + if surr2 <= v2 && v2 < surr3 { + // valid surrogate sequence + r := utf16.DecodeRune(rune(v1), rune(v2)) + n := utf8.EncodeRune(runeBuf[:], r) + out.Write(runeBuf[:n]) + } + i += 2 + default: + // invalid surrogate sequence + n := utf8.EncodeRune(runeBuf[:], replacementChar) + out.Write(runeBuf[:n]) + } + } + + return nil +} + // UTF16BytesToString returns a string that is decoded from the UTF-16 bytes. // The byte slice must be of even length otherwise an error will be returned. // The integer returned is the offset to the start of the next string with diff --git a/winlogbeat/sys/strings_test.go b/winlogbeat/sys/strings_test.go index 48cae4831aa..f7e3443e49d 100644 --- a/winlogbeat/sys/strings_test.go +++ b/winlogbeat/sys/strings_test.go @@ -75,3 +75,26 @@ func BenchmarkUTF16BytesToString(b *testing.B) { } }) } + +func TestUTF16ToUTF8(t *testing.T) { + input := "abc白鵬翔\u145A6" + utf16Bytes := toUTF16Bytes(input) + + outputBuf := &bytes.Buffer{} + err := UTF16ToUTF8Bytes(utf16Bytes, outputBuf) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, []byte(input), outputBuf.Bytes()) +} + +func BenchmarkUTF16ToUTF8(b *testing.B) { + utf16Bytes := toUTF16Bytes("A logon was attempted using explicit credentials.") + outputBuf := &bytes.Buffer{} + b.ResetTimer() + + for i := 0; i < b.N; i++ { + UTF16ToUTF8Bytes(utf16Bytes, outputBuf) + outputBuf.Reset() + } +} diff --git a/winlogbeat/sys/wineventlog/wineventlog_windows.go b/winlogbeat/sys/wineventlog/wineventlog_windows.go index 138c0da5aa0..5da75e3a617 100644 --- a/winlogbeat/sys/wineventlog/wineventlog_windows.go +++ b/winlogbeat/sys/wineventlog/wineventlog_windows.go @@ -156,10 +156,11 @@ func RenderEvent( lang uint32, renderBuf []byte, pubHandleProvider func(string) sys.MessageFiles, -) (string, error) { + out io.Writer, +) error { providerName, err := evtRenderProviderName(renderBuf, eventHandle) if err != nil { - return "", err + return err } var publisherHandle uintptr @@ -173,21 +174,21 @@ func RenderEvent( } // Only a single string is returned when rendering XML. - xml, err := FormatEventString(EvtFormatMessageXml, - eventHandle, providerName, EvtHandle(publisherHandle), lang, renderBuf) + err = FormatEventString(EvtFormatMessageXml, + eventHandle, providerName, EvtHandle(publisherHandle), lang, renderBuf, out) // Recover by rendering the XML without the RenderingInfo (message string). if err != nil { // Do not try to recover from InsufficientBufferErrors because these // can be retried with a larger buffer. if _, ok := err.(sys.InsufficientBufferError); ok { - return "", err + return err } - xml, err = RenderEventXML(eventHandle, renderBuf) + err = RenderEventXML(eventHandle, renderBuf, out) } - return xml, err + return err } // RenderEventXML renders the event as XML. If the event is already rendered, as @@ -195,24 +196,23 @@ func RenderEvent( // include the RenderingInfo (message). If the event is not rendered then the // XML will not include the message, and in this case RenderEvent should be // used. -func RenderEventXML(eventHandle EvtHandle, renderBuf []byte) (string, error) { +func RenderEventXML(eventHandle EvtHandle, renderBuf []byte, out io.Writer) error { var bufferUsed, propertyCount uint32 err := _EvtRender(0, eventHandle, EvtRenderEventXml, uint32(len(renderBuf)), &renderBuf[0], &bufferUsed, &propertyCount) if err == ERROR_INSUFFICIENT_BUFFER { - return "", sys.InsufficientBufferError{err, int(bufferUsed)} + return sys.InsufficientBufferError{err, int(bufferUsed)} } if err != nil { - return "", err + return err } if int(bufferUsed) > len(renderBuf) { - return "", fmt.Errorf("Windows EvtRender reported that wrote %d bytes "+ + return fmt.Errorf("Windows EvtRender reported that wrote %d bytes "+ "to the buffer, but the buffer can only hold %d bytes", bufferUsed, len(renderBuf)) } - xml, _, err := sys.UTF16BytesToString(renderBuf[:bufferUsed]) - return xml, err + return sys.UTF16ToUTF8Bytes(renderBuf[:bufferUsed], out) } // CreateBookmark creates a new handle to a bookmark. Close must be called on @@ -299,24 +299,25 @@ func FormatEventString( publisherHandle EvtHandle, lang uint32, buffer []byte, -) (string, error) { + out io.Writer, +) error { // Open a publisher handle if one was not provided. ph := publisherHandle if ph == 0 { ph, err := OpenPublisherMetadata(0, publisher, 0) if err != nil { - return "", err + return err } defer _EvtClose(ph) } - // Create a buffer if one was not provider. + // Create a buffer if one was not provided. var bufferUsed uint32 if buffer == nil { err := _EvtFormatMessage(ph, eventHandle, 0, 0, 0, messageFlag, 0, nil, &bufferUsed) if err != nil && err != ERROR_INSUFFICIENT_BUFFER { - return "", err + return err } bufferUsed *= 2 @@ -328,16 +329,15 @@ func FormatEventString( uint32(len(buffer)/2), &buffer[0], &bufferUsed) bufferUsed *= 2 if err == ERROR_INSUFFICIENT_BUFFER { - return "", sys.InsufficientBufferError{err, int(bufferUsed)} + return sys.InsufficientBufferError{err, int(bufferUsed)} } if err != nil { - return "", err + return err } // This assumes there is only a single string value to read. This will // not work to read keys (when messageFlag == EvtFormatMessageKeyword). - value, _, err := sys.UTF16BytesToString(buffer[0:bufferUsed]) - return value, err + return sys.UTF16ToUTF8Bytes(buffer[:bufferUsed], out) } // offset reads a pointer value from the reader then calculates an offset from @@ -400,8 +400,7 @@ func readString(buffer []byte, reader io.Reader) (string, error) { func evtRenderProviderName(renderBuf []byte, eventHandle EvtHandle) (string, error) { var bufferUsed, propertyCount uint32 err := _EvtRender(providerNameContext, eventHandle, EvtRenderEventValues, - uint32(len(renderBuf)), &renderBuf[0], &bufferUsed, - &propertyCount) + uint32(len(renderBuf)), &renderBuf[0], &bufferUsed, &propertyCount) if err == ERROR_INSUFFICIENT_BUFFER { return "", sys.InsufficientBufferError{err, int(bufferUsed)} }