Skip to content

Commit

Permalink
Add Pod object to K8s meta (#710)
Browse files Browse the repository at this point in the history
* move k8s gatherer to meta

* k8s pod info in meta

* fix get meta from previous value

* use graph dependency in meta templater

* add k8s meta pod example

* use dominikbraun/graph for TopologicalSort

* fix after review

* fix in templater when single value from templated

* add comments in templater

* error on template execute to meta

* set default value in meta templater

* K8sMetaWaitTimeout 120s

* check default value in templater

* table TestTemplaterRender

* catch in templater nested variables

* add logger to meta templater

* tests in templater for nil objects

* use cache in meta template render

* add meta_cache_size setting

* dont calculate cache key for objects in meta templater

* add request_uuid to http meta
  • Loading branch information
DmitryRomanov authored Dec 3, 2024
1 parent 8e654a3 commit 67a1249
Show file tree
Hide file tree
Showing 23 changed files with 659 additions and 248 deletions.
8 changes: 4 additions & 4 deletions cmd/file.d/file.d_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/throttle"
_ "github.com/ozontech/file.d/plugin/input/fake"
k8s2 "github.com/ozontech/file.d/plugin/input/k8s"
"github.com/ozontech/file.d/plugin/input/k8s/meta"
_ "github.com/ozontech/file.d/plugin/output/devnull"
_ "github.com/ozontech/file.d/plugin/output/kafka"
uuid "github.com/satori/go.uuid"
Expand Down Expand Up @@ -78,9 +78,9 @@ func TestEndToEnd(t *testing.T) {
rand.Seed(0)

// disable k8s environment
k8s2.DisableMetaUpdates = true
k8s2.MetaWaitTimeout = time.Millisecond
k8s2.MaintenanceInterval = time.Millisecond * 100
meta.DisableMetaUpdates = true
meta.MetaWaitTimeout = time.Millisecond
meta.MaintenanceInterval = time.Millisecond * 100

filesDir := t.TempDir()
offsetsDir := t.TempDir()
Expand Down
7 changes: 7 additions & 0 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
isStrict := pipeline.DefaultIsStrict
eventTimeout := pipeline.DefaultEventTimeout
metricHoldDuration := pipeline.DefaultMetricHoldDuration
metaCacheSize := pipeline.DefaultMetaCacheSize

if settings != nil {
val := settings.Get("capacity").MustInt()
if val != 0 {
capacity = val
}

val = settings.Get("meta_cache_size").MustInt()
if val != 0 {
metaCacheSize = val
}

val = settings.Get("avg_log_size").MustInt()
if val != 0 {
avgInputEventSize = val
Expand Down Expand Up @@ -117,6 +123,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
Decoder: decoder,
DecoderParams: decoderParams,
Capacity: capacity,
MetaCacheSize: metaCacheSize,
AvgEventSize: avgInputEventSize,
MaxEventSize: maxInputEventSize,
CutOffEventByLimit: cutOffEventByLimit,
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ require (
github.com/bufbuild/protocompile v0.13.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/dominikbraun/graph v0.23.0
github.com/elliotchance/orderedmap/v2 v2.4.0
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/go-faster/jx v1.1.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/hashicorp/vault/api v1.9.2
github.com/jackc/pgconn v1.14.1
github.com/jackc/pgproto3/v2 v2.3.2
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ github.com/dmarkham/enumer v1.5.8/go.mod h1:d10o8R3t/gROm2p3BXqTkMt2+HMuxEmWCXzo
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo=
github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc=
github.com/elliotchance/orderedmap/v2 v2.4.0 h1:6tUmMwD9F998FNpwFxA5E6NQvSpk2PVw7RKsVq3+2Cw=
github.com/elliotchance/orderedmap/v2 v2.4.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q=
github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE=
github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -176,6 +180,8 @@ github.com/hashicorp/go-sockaddr v1.0.2 h1:ztczhD1jLxIRjVejw8gFomI1BQZOe2WoVOu0S
github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/vault/api v1.9.2 h1:YjkZLJ7K3inKgMZ0wzCU9OHqc+UqMQyXsPXnf3Cl2as=
Expand Down
198 changes: 184 additions & 14 deletions pipeline/metadata/templater.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,145 @@ import (
"bytes"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"text/template"

"github.com/dominikbraun/graph"
"github.com/elliotchance/orderedmap/v2"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/ozontech/file.d/cfg"
"go.uber.org/zap"
)

type MetaData map[string]string

type ValueType string

const (
// when template for a sinlge value (e.g., "{{ .key }}")
SingleValueType ValueType = "single"

// when template is complex and we have to use template.Execute (e.g., "value_{{ .key }}")
TemplateValueType ValueType = "template"
)

// MetaTemplate holds the template and its original string
type MetaTemplate struct {
Template *template.Template
Source string
}

// NewMetaTemplate creates a new TemplateWrapper with default function
func NewMetaTemplate(source string) *MetaTemplate {
tmpl := template.Must(template.New("").Funcs(template.FuncMap{
"default": func(defaultValue string, value interface{}) interface{} {
if value == nil || value == "" {
return defaultValue
}
return value
},
}).Parse(source))
return &MetaTemplate{Template: tmpl, Source: source}
}

type MetaTemplater struct {
templates map[string]*template.Template
templates map[string]*MetaTemplate
singleValues map[string]string
valueTypes *orderedmap.OrderedMap[string, ValueType]
poolBuffer sync.Pool
logger *zap.Logger
cache *lru.Cache[string, MetaData]
}

func NewMetaTemplater(templates cfg.MetaTemplates) *MetaTemplater {
compiledTemplates := make(map[string]*template.Template)
func NewMetaTemplater(templates cfg.MetaTemplates, logger *zap.Logger, cacheSize int) *MetaTemplater {
// Regular expression to find ALL keys in the template strings (e.g., {{ .key }})
re := regexp.MustCompile(`{{\s*([^}]+)\s*}}`)

// Graph to manage dependencies between templates
g := graph.New(graph.StringHash, graph.Directed(), graph.PreventCycles())

// Build a dependency graph based on the templates
for name, template := range templates {
matches := re.FindAllStringSubmatch(template, -1)
_ = g.AddVertex(name)

// Iterate over all matches found in the template
for _, match := range matches {
if len(match) <= 1 {
continue
}
expression := strings.TrimSpace(match[1])
components := strings.Fields(expression)
for _, component := range components {
// catch all variables
if !strings.HasPrefix(component, ".") {
continue
}

parts := strings.Split(component, ".")
if len(parts) == 0 {
continue
}

// extract top-nested variable (e.g., .headers.sub_header.sub_sub_header => .headers)
topNestedVariable := parts[1]
if _, exists := templates[topNestedVariable]; !exists {
continue
}

if _, err := g.Vertex(topNestedVariable); err != nil {
// The key vertex has not been added before
_ = g.AddVertex(topNestedVariable)
}
// for variable name we need get topNestedVariable
_ = g.AddEdge(topNestedVariable, name)
}
}
}

// Topological sort on the graph to determine the order of template processing
orderedParams, _ := graph.TopologicalSort(g)

compiledTemplates := make(map[string]*MetaTemplate)
singleValues := make(map[string]string)

// Regular expression to match single value templates (e.g., {{ .key }})
singleValueRegex := regexp.MustCompile(`^\{\{\ +\.(\w+)\ +\}\}$`)

for k, v := range templates {
// Ordered map to keep track of value types (single or template)
valueTypes := orderedmap.NewOrderedMap[string, ValueType]()

for i := 0; i <= len(orderedParams)-1; i++ {
k := orderedParams[i]
v := templates[k]
vals := singleValueRegex.FindStringSubmatch(v)
if len(vals) > 1 {
// "{{ .key }}" - signle value template
singleValues[k] = vals[1]
valueTypes.Set(k, SingleValueType)
} else {
compiledTemplates[k] = template.Must(template.New("").Parse(v))
// "value_{{ .key }}" - more complex template
compiledTemplates[k] = NewMetaTemplate(v)
valueTypes.Set(k, TemplateValueType)
}
}

cache, err := lru.New[string, MetaData](cacheSize)
if err != nil {
panic(err)
}

meta := MetaTemplater{
templates: compiledTemplates,
singleValues: singleValues,
valueTypes: valueTypes,
logger: logger,
poolBuffer: sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
},
cache: cache,
}

return &meta
Expand All @@ -48,29 +153,94 @@ type Data interface {
}

func (m *MetaTemplater) Render(data Data) (MetaData, error) {
values := data.GetData()
initValues := data.GetData()
meta := MetaData{}

// Create a unique cache key based on the input data
cacheKey := generateCacheKey(initValues)

// Check if the result is already cached
if cachedMeta, found := m.cache.Get(cacheKey); found {
return cachedMeta, nil
}

// for hold values
values := make(map[string]any, len(initValues)+m.valueTypes.Len())
for key, value := range initValues {
values[key] = value
}

var tplOutput *bytes.Buffer
if len(m.templates) > 0 {
tplOutput := m.poolBuffer.Get().(*bytes.Buffer)
tplOutput = m.poolBuffer.Get().(*bytes.Buffer)
defer m.poolBuffer.Put(tplOutput)
}

for k, tmpl := range m.templates {
// Iterate over the keys in valueTypes to process each template or single value
for _, k := range m.valueTypes.Keys() {
v, _ := m.valueTypes.Get(k)
if v == SingleValueType {
tmpl := m.singleValues[k]
if val, ok := values[tmpl]; ok {
meta[k] = fmt.Sprintf("%v", val)
values[k] = meta[k]
} else {
m.logger.Error(
fmt.Sprintf("cannot render meta field %s: no value {{ .%s }}", k, tmpl),
)
}
} else if v == TemplateValueType {
tmpl := m.templates[k]
tplOutput.Reset()
err := tmpl.Execute(tplOutput, values)
err := tmpl.Template.Execute(tplOutput, values)
if err != nil {
return meta, err
m.logger.Error(
fmt.Sprintf("cannot render meta field %s", k),
zap.String("template", tmpl.Source),
zap.Error(err),
)
meta[k] = err.Error()
} else {
meta[k] = tplOutput.String()
values[k] = meta[k]
}
}
}

for k, tmpl := range m.singleValues {
if val, ok := values[tmpl]; ok {
meta[k] = fmt.Sprintf("%v", val)
m.cache.Add(cacheKey, meta)

return meta, nil
}

func generateCacheKey(data map[string]any) string {
var builder strings.Builder
builder.Grow(len(data) * 16) // Preallocate memory for the builder (estimate)

for k, v := range data {
switch v := v.(type) {
case string:
// Write the key and string value to the builder
builder.WriteString(k)
builder.WriteString(":")
builder.WriteString(v)
builder.WriteString("|")
case int:
// Write the key and integer value to the builder
builder.WriteString(k)
builder.WriteString(":")
builder.WriteString(strconv.Itoa(v))
builder.WriteString("|")
}
// If the value is not a string or int, skip it
}

return meta, nil
// Convert the builder to a string
key := builder.String()

// Remove the last "|" character if needed
if key != "" {
key = key[:len(key)-1] // Slice to remove the last character
}

return key
}
Loading

0 comments on commit 67a1249

Please sign in to comment.