Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce locks during decision logging #6797

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 61 additions & 80 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,21 +415,21 @@ func (c *Config) validateAndInjectDefaults(services []string, pluginsList []stri

// Plugin implements decision log buffering and uploading.
type Plugin struct {
manager *plugins.Manager
config Config
buffer *logBuffer
enc *chunkEncoder
mtx sync.Mutex
stop chan chan struct{}
reconfig chan reconfigure
mask *rego.PreparedEvalQuery
maskMutex sync.Mutex
drop *rego.PreparedEvalQuery
dropMutex sync.Mutex
limiter *rate.Limiter
metrics metrics.Metrics
logger logging.Logger
status *lstat.Status
manager *plugins.Manager
config Config
buffer *logBuffer
enc *chunkEncoder
mtx sync.Mutex
stop chan chan struct{}
reconfig chan reconfigure
mask *rego.PreparedEvalQuery
maskOnce *sync.Once
drop *rego.PreparedEvalQuery
dropOnce *sync.Once
limiter *rate.Limiter
metrics metrics.Metrics
logger logging.Logger
status *lstat.Status
}

type reconfigure struct {
Expand Down Expand Up @@ -521,6 +521,8 @@ func New(parsedConfig *Config, manager *plugins.Manager) *Plugin {
reconfig: make(chan reconfigure),
logger: manager.Logger().WithFields(map[string]interface{}{"plugin": Name}),
status: &lstat.Status{},
dropOnce: new(sync.Once),
maskOnce: new(sync.Once),
}

if parsedConfig.Reporting.MaxDecisionsPerSecond != nil {
Expand Down Expand Up @@ -713,13 +715,8 @@ func (p *Plugin) Reconfigure(_ context.Context, config interface{}) {
done := make(chan struct{})
p.reconfig <- reconfigure{config: config, done: done}

p.maskMutex.Lock()
defer p.maskMutex.Unlock()
p.mask = nil

p.dropMutex.Lock()
defer p.dropMutex.Unlock()
p.drop = nil
p.maskOnce = new(sync.Once)
p.dropOnce = new(sync.Once)

<-done
}
Expand Down Expand Up @@ -753,13 +750,8 @@ func (p *Plugin) Trigger(ctx context.Context) error {
// fires. This indicates a new compiler instance is available. The decision
// logger needs to prepare a new masking query.
func (p *Plugin) compilerUpdated(storage.Transaction) {
p.maskMutex.Lock()
defer p.maskMutex.Unlock()
p.mask = nil

p.dropMutex.Lock()
defer p.dropMutex.Unlock()
p.drop = nil
p.maskOnce = new(sync.Once)
p.dropOnce = new(sync.Once)
}

func (p *Plugin) loop() {
Expand Down Expand Up @@ -975,42 +967,36 @@ func (p *Plugin) bufferChunk(buffer *logBuffer, bs []byte) {
}

func (p *Plugin) maskEvent(ctx context.Context, txn storage.Transaction, input ast.Value, event *EventV1) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A benefit of the old mutex-based approach was that on error, the PrepareForEval() call would be retried on subsequent mask/drop calls. Now, if the first (and only) call to PrepareForEval() fails for the current configuration, we'll end up with a broken PreparedEvalQuery that'll cause a panic when used.

I've made a commit to your branch with a proposed fix and tests asserting the behavior. The fix simply re-emits the error for subsequent calls. I think it's unlikely that subsequent PrepareForEval() calls would have a different outcome, so we don't need to retain the old behavior where we retry.

Thoughts?
This change shouldn't have an impact on performance, but if you're running separate tests on your end, please let us know how this fares.

var err error

mask, err := func() (rego.PreparedEvalQuery, error) {

p.maskMutex.Lock()
defer p.maskMutex.Unlock()

if p.mask == nil {

query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.maskDecisionRef)))
p.maskOnce.Do(func() {
var pq rego.PreparedEvalQuery

r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)
query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.maskDecisionRef)))

pq, err := r.PrepareForEval(context.Background())
if err != nil {
return rego.PreparedEvalQuery{}, err
}
r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

p.mask = &pq
pq, err = r.PrepareForEval(context.Background())
if err != nil {
pq = rego.PreparedEvalQuery{}
}

return *p.mask, nil
}()
p.mask = &pq
})

if err != nil {
return err
}

rs, err := mask.Eval(
rs, err := p.mask.Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(txn),
Expand Down Expand Up @@ -1038,40 +1024,35 @@ func (p *Plugin) maskEvent(ctx context.Context, txn storage.Transaction, input a
}

func (p *Plugin) dropEvent(ctx context.Context, txn storage.Transaction, input ast.Value) (bool, error) {
var err error

drop, err := func() (rego.PreparedEvalQuery, error) {

p.dropMutex.Lock()
defer p.dropMutex.Unlock()

if p.drop == nil {
query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.dropDecisionRef)))
r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

pq, err := r.PrepareForEval(context.Background())
if err != nil {
return rego.PreparedEvalQuery{}, err
}

p.drop = &pq
p.dropOnce.Do(func() {
var pq rego.PreparedEvalQuery

query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.dropDecisionRef)))
r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

pq, err = r.PrepareForEval(context.Background())
if err != nil {
pq = rego.PreparedEvalQuery{}
}

return *p.drop, nil
}()
p.drop = &pq
})

if err != nil {
return false, err
}

rs, err := drop.Eval(
rs, err := p.drop.Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(txn),
Expand Down