From 26970079e060fd4167adf4e7abf09500502fcfbc Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 22 Jul 2016 19:09:10 +0200 Subject: [PATCH] Make format string thread safe Do not store evaluation context in formatter, but allocate context via shared sync.Pool. --- libbeat/common/fmtstr/formatevents.go | 86 +++++++++++++++------- libbeat/common/fmtstr/formatstring.go | 24 +++--- libbeat/common/fmtstr/formatstring_test.go | 2 +- 3 files changed, 73 insertions(+), 39 deletions(-) diff --git a/libbeat/common/fmtstr/formatevents.go b/libbeat/common/fmtstr/formatevents.go index a3c7932c38d..ae80dff6f3a 100644 --- a/libbeat/common/fmtstr/formatevents.go +++ b/libbeat/common/fmtstr/formatevents.go @@ -7,6 +7,7 @@ import ( "reflect" "strconv" "strings" + "sync" "time" "github.com/elastic/beats/libbeat/common" @@ -25,29 +26,24 @@ import ( // `%{[field.name]:default value}`. type EventFormatString struct { formatter StringFormatter - ctx *eventEvalContext fields []fieldInfo timestamp bool } type eventFieldEvaler struct { - ctx *eventEvalContext index int } type defaultEventFieldEvaler struct { - ctx *eventEvalContext index int defaultValue string } type eventTimestampEvaler struct { - ctx *eventEvalContext formatter *dtfmt.Formatter } type eventFieldCompiler struct { - ctx *eventEvalContext keys map[string]keyInfo timestamp bool index int @@ -66,6 +62,7 @@ type keyInfo struct { type eventEvalContext struct { keys []string ts time.Time + buf *bytes.Buffer } var ( @@ -73,6 +70,25 @@ var ( errConvertString = errors.New("can not convert to string") ) +var eventCtxPool = &sync.Pool{ + New: func() interface{} { return &eventEvalContext{} }, +} + +func newEventCtx(sz int) *eventEvalContext { + ctx := eventCtxPool.Get().(*eventEvalContext) + if ctx.keys == nil || cap(ctx.keys) < sz { + ctx.keys = make([]string, 0, sz) + } else { + ctx.keys = ctx.keys[:0] + } + + return ctx +} + +func releaseCtx(c *eventEvalContext) { + eventCtxPool.Put(c) +} + // MustCompileEvent copmiles an event format string into an runnable // EventFormatString. Generates a panic if compilation fails. func MustCompileEvent(in string) *EventFormatString { @@ -88,7 +104,6 @@ func MustCompileEvent(in string) *EventFormatString { func CompileEvent(in string) (*EventFormatString, error) { ctx := &eventEvalContext{} efComp := &eventFieldCompiler{ - ctx: ctx, keys: map[string]keyInfo{}, index: 0, timestamp: false, @@ -110,7 +125,6 @@ func CompileEvent(in string) (*EventFormatString, error) { ctx.keys = make([]string, len(keys)) efs := &EventFormatString{ formatter: sf, - ctx: ctx, fields: keys, timestamp: efComp.timestamp, } @@ -157,24 +171,43 @@ func (fs *EventFormatString) Fields() []string { // Run executes the format string returning a new expanded string or an error // if execution or event field expansion fails. func (fs *EventFormatString) Run(event common.MapStr) (string, error) { - if err := fs.collectFields(event); err != nil { + ctx := newEventCtx(len(fs.fields)) + defer releaseCtx(ctx) + + if ctx.buf == nil { + ctx.buf = bytes.NewBuffer(nil) + } else { + ctx.buf.Reset() + } + + if err := fs.collectFields(ctx, event); err != nil { + return "", err + } + err := fs.formatter.Eval(ctx, ctx.buf) + if err != nil { return "", err } - return fs.formatter.Run() + return ctx.buf.String(), nil } // Eval executes the format string, writing the resulting string into the provided output buffer. Returns error if execution or event field expansion fails. func (fs *EventFormatString) Eval(out *bytes.Buffer, event common.MapStr) error { - if err := fs.collectFields(event); err != nil { + ctx := newEventCtx(len(fs.fields)) + defer releaseCtx(ctx) + + if err := fs.collectFields(ctx, event); err != nil { return err } - return fs.formatter.Eval(out) + return fs.formatter.Eval(ctx, out) } // collectFields tries to extract and convert all required fields into an array // of strings. -func (fs *EventFormatString) collectFields(event common.MapStr) error { - for i, fi := range fs.fields { +func (fs *EventFormatString) collectFields( + ctx *eventEvalContext, + event common.MapStr, +) error { + for _, fi := range fs.fields { s, err := fieldString(event, fi.path) if err != nil { if fi.required { @@ -183,7 +216,7 @@ func (fs *EventFormatString) collectFields(event common.MapStr) error { s = "" } - fs.ctx.keys[i] = s + ctx.keys = append(ctx.keys, s) } if fs.timestamp { @@ -194,9 +227,9 @@ func (fs *EventFormatString) collectFields(event common.MapStr) error { switch t := timestamp.(type) { case common.Time: - fs.ctx.ts = time.Time(t) + ctx.ts = time.Time(t) case time.Time: - fs.ctx.ts = t + ctx.ts = t default: return errors.New("unknown timestamp type") } @@ -261,10 +294,10 @@ func (e *eventFieldCompiler) compileEventField( idx := info.index if len(ops) == 0 { - return &eventFieldEvaler{e.ctx, idx}, nil + return &eventFieldEvaler{idx}, nil } - return &defaultEventFieldEvaler{e.ctx, idx, defaultValue}, nil + return &defaultEventFieldEvaler{idx, defaultValue}, nil } func (e *eventFieldCompiler) compileTimestamp( @@ -281,25 +314,27 @@ func (e *eventFieldCompiler) compileTimestamp( } e.timestamp = true - return &eventTimestampEvaler{e.ctx, formatter}, nil + return &eventTimestampEvaler{formatter}, nil } -func (e *eventFieldEvaler) Eval(out *bytes.Buffer) error { +func (e *eventFieldEvaler) Eval(c interface{}, out *bytes.Buffer) error { type stringer interface { String() string } - s := e.ctx.keys[e.index] + ctx := c.(*eventEvalContext) + s := ctx.keys[e.index] _, err := out.WriteString(s) return err } -func (e *defaultEventFieldEvaler) Eval(out *bytes.Buffer) error { +func (e *defaultEventFieldEvaler) Eval(c interface{}, out *bytes.Buffer) error { type stringer interface { String() string } - s := e.ctx.keys[e.index] + ctx := c.(*eventEvalContext) + s := ctx.keys[e.index] if s == "" { s = e.defaultValue } @@ -307,8 +342,9 @@ func (e *defaultEventFieldEvaler) Eval(out *bytes.Buffer) error { return err } -func (e *eventTimestampEvaler) Eval(out *bytes.Buffer) error { - _, err := e.formatter.Write(out, e.ctx.ts) +func (e *eventTimestampEvaler) Eval(c interface{}, out *bytes.Buffer) error { + ctx := c.(*eventEvalContext) + _, err := e.formatter.Write(out, ctx.ts) return err } diff --git a/libbeat/common/fmtstr/formatstring.go b/libbeat/common/fmtstr/formatstring.go index 2a6512151ce..ee46a057375 100644 --- a/libbeat/common/fmtstr/formatstring.go +++ b/libbeat/common/fmtstr/formatstring.go @@ -11,7 +11,7 @@ import ( type FormatEvaler interface { // Eval will execute the format and writes the results into // the provided output buffer. Returns error on failure. - Eval(out *bytes.Buffer) error + Eval(ctx interface{}, out *bytes.Buffer) error } // StringFormatter interface extends FormatEvaler adding support for querying @@ -20,7 +20,7 @@ type StringFormatter interface { FormatEvaler // Run execute the formatter returning the generated string. - Run() (string, error) + Run(ctx interface{}) (string, error) // IsConst returns true, if execution of formatter will always return the // same constant string. @@ -42,7 +42,6 @@ type constStringFormatter struct { } type execStringFormatter struct { - buf *bytes.Buffer evalers []FormatEvaler } @@ -145,7 +144,6 @@ func compile(ctx *compileCtx, in string) (StringFormatter, error) { // create executable string formatter fmt := execStringFormatter{ evalers: evalers, - buf: bytes.NewBuffer(nil), } return fmt, nil } @@ -184,12 +182,12 @@ func optimize(in []FormatEvaler) []FormatEvaler { return out } -func (f constStringFormatter) Eval(out *bytes.Buffer) error { +func (f constStringFormatter) Eval(_ interface{}, out *bytes.Buffer) error { _, err := out.WriteString(f.s) return err } -func (f constStringFormatter) Run() (string, error) { +func (f constStringFormatter) Run(_ interface{}) (string, error) { return f.s, nil } @@ -197,21 +195,21 @@ func (f constStringFormatter) IsConst() bool { return true } -func (f execStringFormatter) Eval(out *bytes.Buffer) error { +func (f execStringFormatter) Eval(ctx interface{}, out *bytes.Buffer) error { for _, evaler := range f.evalers { - if err := evaler.Eval(out); err != nil { + if err := evaler.Eval(ctx, out); err != nil { return err } } return nil } -func (f execStringFormatter) Run() (string, error) { - f.buf.Reset() - if err := f.Eval(f.buf); err != nil { +func (f execStringFormatter) Run(ctx interface{}) (string, error) { + buf := bytes.NewBuffer(nil) + if err := f.Eval(ctx, buf); err != nil { return "", err } - return f.buf.String(), nil + return buf.String(), nil } func (f execStringFormatter) IsConst() bool { @@ -224,7 +222,7 @@ func (e StringElement) compile(ctx *compileCtx) (FormatEvaler, error) { // Eval write the string elements constant string value into // output buffer. -func (e StringElement) Eval(out *bytes.Buffer) error { +func (e StringElement) Eval(_ interface{}, out *bytes.Buffer) error { _, err := out.WriteString(e.s) return err } diff --git a/libbeat/common/fmtstr/formatstring_test.go b/libbeat/common/fmtstr/formatstring_test.go index 83c9bbe0870..1b1da63649a 100644 --- a/libbeat/common/fmtstr/formatstring_test.go +++ b/libbeat/common/fmtstr/formatstring_test.go @@ -123,7 +123,7 @@ func TestFormatString(t *testing.T) { } // run string formatter - actual, err := sf.Run() + actual, err := sf.Run(nil) // test validation if test.dyn == nil {