Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce locks during decision logging #6797

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 92 additions & 94 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,21 +415,42 @@ func (c *Config) validateAndInjectDefaults(services []string, pluginsList []stri

// Plugin implements decision log buffering and uploading.
type Plugin struct {
manager *plugins.Manager
config Config
buffer *logBuffer
enc *chunkEncoder
mtx sync.Mutex
stop chan chan struct{}
reconfig chan reconfigure
mask *rego.PreparedEvalQuery
maskMutex sync.Mutex
drop *rego.PreparedEvalQuery
dropMutex sync.Mutex
limiter *rate.Limiter
metrics metrics.Metrics
logger logging.Logger
status *lstat.Status
manager *plugins.Manager
config Config
buffer *logBuffer
enc *chunkEncoder
mtx sync.Mutex
stop chan chan struct{}
reconfig chan reconfigure
preparedMask prepareOnce
preparedDrop prepareOnce
limiter *rate.Limiter
metrics metrics.Metrics
logger logging.Logger
status *lstat.Status
}

type prepareOnce struct {
once *sync.Once
preparedQuery *rego.PreparedEvalQuery
err error
}

func newPrepareOnce() *prepareOnce {
return &prepareOnce{
once: new(sync.Once),
}
}

func (po *prepareOnce) drop() {
po.once = new(sync.Once)
}

func (po *prepareOnce) prepareOnce(f func() (*rego.PreparedEvalQuery, error)) (*rego.PreparedEvalQuery, error) {
po.once.Do(func() {
po.preparedQuery, po.err = f()
})
return po.preparedQuery, po.err
}

type reconfigure struct {
Expand Down Expand Up @@ -513,14 +534,16 @@ func (b *ConfigBuilder) Parse() (*Config, error) {
func New(parsedConfig *Config, manager *plugins.Manager) *Plugin {

plugin := &Plugin{
manager: manager,
config: *parsedConfig,
stop: make(chan chan struct{}),
buffer: newLogBuffer(*parsedConfig.Reporting.BufferSizeLimitBytes),
enc: newChunkEncoder(*parsedConfig.Reporting.UploadSizeLimitBytes),
reconfig: make(chan reconfigure),
logger: manager.Logger().WithFields(map[string]interface{}{"plugin": Name}),
status: &lstat.Status{},
manager: manager,
config: *parsedConfig,
stop: make(chan chan struct{}),
buffer: newLogBuffer(*parsedConfig.Reporting.BufferSizeLimitBytes),
enc: newChunkEncoder(*parsedConfig.Reporting.UploadSizeLimitBytes),
reconfig: make(chan reconfigure),
logger: manager.Logger().WithFields(map[string]interface{}{"plugin": Name}),
status: &lstat.Status{},
preparedDrop: *newPrepareOnce(),
preparedMask: *newPrepareOnce(),
}

if parsedConfig.Reporting.MaxDecisionsPerSecond != nil {
Expand Down Expand Up @@ -713,13 +736,8 @@ func (p *Plugin) Reconfigure(_ context.Context, config interface{}) {
done := make(chan struct{})
p.reconfig <- reconfigure{config: config, done: done}

p.maskMutex.Lock()
defer p.maskMutex.Unlock()
p.mask = nil

p.dropMutex.Lock()
defer p.dropMutex.Unlock()
p.drop = nil
p.preparedMask.drop()
p.preparedDrop.drop()

<-done
}
Expand Down Expand Up @@ -753,13 +771,8 @@ func (p *Plugin) Trigger(ctx context.Context) error {
// fires. This indicates a new compiler instance is available. The decision
// logger needs to prepare a new masking query.
func (p *Plugin) compilerUpdated(storage.Transaction) {
p.maskMutex.Lock()
defer p.maskMutex.Unlock()
p.mask = nil

p.dropMutex.Lock()
defer p.dropMutex.Unlock()
p.drop = nil
p.preparedMask.drop()
p.preparedDrop.drop()
}

func (p *Plugin) loop() {
Expand Down Expand Up @@ -975,42 +988,33 @@ func (p *Plugin) bufferChunk(buffer *logBuffer, bs []byte) {
}

func (p *Plugin) maskEvent(ctx context.Context, txn storage.Transaction, input ast.Value, event *EventV1) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A benefit of the old mutex-based approach was that on error, the PrepareForEval() call would be retried on subsequent mask/drop calls. Now, if the first (and only) call to PrepareForEval() fails for the current configuration, we'll end up with a broken PreparedEvalQuery that'll cause a panic when used.

I've made a commit to your branch with a proposed fix and tests asserting the behavior. The fix simply re-emits the error for subsequent calls. I think it's unlikely that subsequent PrepareForEval() calls would have a different outcome, so we don't need to retain the old behavior where we retry.

Thoughts?
This change shouldn't have an impact on performance, but if you're running separate tests on your end, please let us know how this fares.


mask, err := func() (rego.PreparedEvalQuery, error) {

p.maskMutex.Lock()
defer p.maskMutex.Unlock()

if p.mask == nil {

query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.maskDecisionRef)))

r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

pq, err := r.PrepareForEval(context.Background())
if err != nil {
return rego.PreparedEvalQuery{}, err
}

p.mask = &pq
pq, err := p.preparedMask.prepareOnce(func() (*rego.PreparedEvalQuery, error) {
var pq rego.PreparedEvalQuery

query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.maskDecisionRef)))

r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

pq, err := r.PrepareForEval(context.Background())
if err != nil {
return nil, err
}

return *p.mask, nil
}()
return &pq, nil
})

if err != nil {
return err
}

rs, err := mask.Eval(
rs, err := pq.Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(txn),
Expand Down Expand Up @@ -1038,40 +1042,34 @@ func (p *Plugin) maskEvent(ctx context.Context, txn storage.Transaction, input a
}

func (p *Plugin) dropEvent(ctx context.Context, txn storage.Transaction, input ast.Value) (bool, error) {
var err error

drop, err := func() (rego.PreparedEvalQuery, error) {

p.dropMutex.Lock()
defer p.dropMutex.Unlock()

if p.drop == nil {
query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.dropDecisionRef)))
r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

pq, err := r.PrepareForEval(context.Background())
if err != nil {
return rego.PreparedEvalQuery{}, err
}

p.drop = &pq
pq, err := p.preparedDrop.prepareOnce(func() (*rego.PreparedEvalQuery, error) {
var pq rego.PreparedEvalQuery

query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.dropDecisionRef)))
r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

pq, err := r.PrepareForEval(context.Background())
if err != nil {
return nil, err
}

return *p.drop, nil
}()
return &pq, nil
})

if err != nil {
return false, err
}

rs, err := drop.Eval(
rs, err := pq.Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(txn),
Expand Down
146 changes: 146 additions & 0 deletions plugins/logs/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2350,6 +2350,152 @@ func TestPluginDrop(t *testing.T) {
}
}

func TestPluginMaskErrorHandling(t *testing.T) {
rawPolicy := []byte(`
package system.log
drop {
endswith(input.path, "bar")
}`)
event := &EventV1{Path: "foo/bar"}

// Setup fixture. Populate store with simple drop policy.
ctx := context.Background()
store := inmem.New()

//checks if raw policy is valid and stores policy in store
err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error {
if err := store.UpsertPolicy(ctx, txn, "test.rego", rawPolicy); err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}

var output []string

// Create and start manager. Start is required so that stored policies
// get compiled and made available to the plugin.
manager, err := plugins.New(
nil,
"test",
store,
plugins.EnablePrintStatements(true),
plugins.PrintHook(appendingPrintHook{printed: &output}),
)
if err != nil {
t.Fatal(err)
}
if err := manager.Start(ctx); err != nil {
t.Fatal(err)
}

// Instantiate the plugin.
cfg := &Config{Service: "svc"}
trigger := plugins.DefaultTriggerMode
cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger)

plugin := New(cfg, manager)

if err := plugin.Start(ctx); err != nil {
t.Fatal(err)
}
input, err := event.AST()
if err != nil {
t.Fatal(err)
}

type badTransaction struct {
storage.Transaction
}

expErr := "storage_invalid_txn_error: unexpected transaction type *logs.badTransaction"
err = plugin.maskEvent(ctx, &badTransaction{}, input, event)
if err.Error() != expErr {
t.Fatalf("Expected error %v got %v", expErr, err)
}

// We expect the same error on a second call, even though the mask query failed to prepare and won't be prepared again.
err = plugin.maskEvent(ctx, nil, input, event)
if err.Error() != expErr {
t.Fatalf("Expected error %v got %v", expErr, err)
}
}

func TestPluginDropErrorHandling(t *testing.T) {
rawPolicy := []byte(`
package system.log
drop {
endswith(input.path, "bar")
}`)
event := &EventV1{Path: "foo/bar"}

// Setup fixture. Populate store with simple drop policy.
ctx := context.Background()
store := inmem.New()

//checks if raw policy is valid and stores policy in store
err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error {
if err := store.UpsertPolicy(ctx, txn, "test.rego", rawPolicy); err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}

var output []string

// Create and start manager. Start is required so that stored policies
// get compiled and made available to the plugin.
manager, err := plugins.New(
nil,
"test",
store,
plugins.EnablePrintStatements(true),
plugins.PrintHook(appendingPrintHook{printed: &output}),
)
if err != nil {
t.Fatal(err)
}
if err := manager.Start(ctx); err != nil {
t.Fatal(err)
}

// Instantiate the plugin.
cfg := &Config{Service: "svc"}
trigger := plugins.DefaultTriggerMode
cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger)

plugin := New(cfg, manager)

if err := plugin.Start(ctx); err != nil {
t.Fatal(err)
}
input, err := event.AST()
if err != nil {
t.Fatal(err)
}

type badTransaction struct {
storage.Transaction
}

expErr := "storage_invalid_txn_error: unexpected transaction type *logs.badTransaction"
_, err = plugin.dropEvent(ctx, &badTransaction{}, input)
if err.Error() != expErr {
t.Fatalf("Expected error %v got %v", expErr, err)
}

// We expect the same error on a second call, even though the drop query failed to prepare and won't be prepared again.
_, err = plugin.dropEvent(ctx, nil, input)
if err.Error() != expErr {
t.Fatalf("Expected error %v got %v", expErr, err)
}
}

type testFixtureOptions struct {
ConsoleLogger *test.Logger
ReportingUploadSizeLimitBytes int64
Expand Down