Skip to content

Commit

Permalink
Revert "Move log.Processor.Enabled to independent FilterProcessor
Browse files Browse the repository at this point in the history
… interfaced type (open-telemetry#5692)"

This reverts commit 002c0a4.
  • Loading branch information
pellared committed Sep 13, 2024
1 parent 23f7b41 commit 91010b0
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 193 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ The next release will require at least [Go 1.22].
- `SimpleProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log` now returns `false` if the exporter is `nil`. (#5665)
- Update the concurrency requirements of `Exporter` in `go.opentelemetry.io/otel/sdk/log`. (#5666)
- `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` synchronizes `OnEmit` calls. (#5666)
- The `Processor` interface in `go.opentelemetry.io/otel/sdk/log` no longer includes the `Enabled` method.
See the `FilterProcessor` interface type added in `go.opentelemetry.io/otel/sdk/log/internal/x` to continue providing this functionality. (#5692)
- The `SimpleProcessor` type in `go.opentelemetry.io/otel/sdk/log` is no longer comparable. (#5693)
- The `BatchProcessor` type in `go.opentelemetry.io/otel/sdk/log` is no longer comparable. (#5693)

Expand Down
5 changes: 5 additions & 0 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
return nil
}

// Enabled returns if b is enabled.
func (b *BatchProcessor) Enabled(context.Context, Record) bool {
return !b.stopped.Load() && b.q != nil
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchProcessor) Shutdown(ctx context.Context) error {
if b.stopped.Swap(true) || b.q == nil {
Expand Down
9 changes: 9 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestEmptyBatchConfig(t *testing.T) {
ctx := context.Background()
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, *record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
Expand Down Expand Up @@ -269,6 +270,14 @@ func TestBatchProcessor(t *testing.T) {
assert.Equal(t, 3, e.ExportN())
})

t.Run("Enabled", func(t *testing.T) {
b := NewBatchProcessor(defaultNoopExporter)
assert.True(t, b.Enabled(ctx, Record{}))

_ = b.Shutdown(ctx)
assert.False(t, b.Enabled(ctx, Record{}))
})

t.Run("Shutdown", func(t *testing.T) {
t.Run("Error", func(t *testing.T) {
e := newTestExporter(assert.AnError)
Expand Down
3 changes: 0 additions & 3 deletions sdk/log/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,5 @@ at a single endpoint their origin is decipherable.
See [go.opentelemetry.io/otel/log] for more information about
the OpenTelemetry Logs Bridge API.
See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about the
experimental features.
*/
package log // import "go.opentelemetry.io/otel/sdk/log"
25 changes: 7 additions & 18 deletions sdk/log/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"strings"
"sync"

logapi "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/global"
Expand Down Expand Up @@ -59,7 +58,7 @@ func ExampleProcessor_filtering() {
// Wrap the processor so that it ignores processing log records
// when a context deriving from WithIgnoreLogs is passed
// to the logging methods.
processor = &ContextFilterProcessor{Processor: processor}
processor = &ContextFilterProcessor{processor}

// The created processor can then be registered with
// the OpenTelemetry Logs SDK using the WithProcessor option.
Expand All @@ -82,15 +81,6 @@ func WithIgnoreLogs(ctx context.Context) context.Context {
// [WithIgnoreLogs] is passed to its methods.
type ContextFilterProcessor struct {
log.Processor

lazyFilter sync.Once
// Use the experimental FilterProcessor interface
// (go.opentelemetry.io/otel/sdk/log/internal/x).
filter filter
}

type filter interface {
Enabled(ctx context.Context, param logapi.EnabledParameters) bool
}

func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
Expand All @@ -100,13 +90,8 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record)
return p.Processor.OnEmit(ctx, record)
}

func (p *ContextFilterProcessor) Enabled(ctx context.Context, param logapi.EnabledParameters) bool {
p.lazyFilter.Do(func() {
if f, ok := p.Processor.(filter); ok {
p.filter = f
}
})
return !ignoreLogs(ctx) && (p.filter == nil || p.filter.Enabled(ctx, param))
func (p *ContextFilterProcessor) Enabled(ctx context.Context, record log.Record) bool {
return !ignoreLogs(ctx) && p.Processor.Enabled(ctx, record)
}

func ignoreLogs(ctx context.Context) bool {
Expand Down Expand Up @@ -135,6 +120,10 @@ func ExampleProcessor_redact() {
// from attributes containing "token" in the key.
type RedactTokensProcessor struct{}

func (p *RedactTokensProcessor) Enabled(ctx context.Context, record log.Record) bool {
return true
}

// OnEmit redacts values from attributes containing "token" in the key
// by replacing them with a REDACTED value.
func (p *RedactTokensProcessor) OnEmit(ctx context.Context, record *log.Record) error {
Expand Down
35 changes: 0 additions & 35 deletions sdk/log/internal/x/README.md

This file was deleted.

47 changes: 0 additions & 47 deletions sdk/log/internal/x/x.go

This file was deleted.

32 changes: 8 additions & 24 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -43,29 +42,14 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
}
}

// Enabled returns true if at least one Processor held by the LoggerProvider
// that created the logger will process param for the provided context and param.
//
// If it is not possible to definitively determine the param will be
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process.
func (l *logger) Enabled(ctx context.Context, param log.EnabledParameters) bool {
fltrs := l.provider.filterProcessors()
// If there are more Processors than FilterProcessors we cannot be sure
// that all Processors will drop the record. Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(fltrs) || anyEnabled(ctx, param, fltrs)
}

func anyEnabled(ctx context.Context, param log.EnabledParameters, fltrs []x.FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, param) {
// At least one Processor will process the Record.
return true
}
}
// No Processor will process the record
func (l *logger) Enabled(ctx context.Context, r log.EnabledParameters) bool {
// TODO
// newRecord := l.newRecord(ctx, r)
// for _, p := range l.provider.processors {
// if enabled := p.Enabled(ctx, newRecord); enabled {
// return true
// }
// }
return false
}

Expand Down
26 changes: 2 additions & 24 deletions sdk/log/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,8 @@ func TestLoggerEmit(t *testing.T) {
}

func TestLoggerEnabled(t *testing.T) {
p0 := newFltrProcessor("0", true)
p1 := newFltrProcessor("1", true)
p2WithDisabled := newFltrProcessor("2", false)
p0, p1, p2WithDisabled := newProcessor("0"), newProcessor("1"), newProcessor("2")
p2WithDisabled.enabled = false

testCases := []struct {
name string
Expand Down Expand Up @@ -274,24 +273,3 @@ func TestLoggerEnabled(t *testing.T) {
})
}
}

func BenchmarkLoggerEnabled(b *testing.B) {
provider := NewLoggerProvider(
WithProcessor(newFltrProcessor("0", false)),
WithProcessor(newFltrProcessor("1", true)),
)
logger := provider.Logger("BenchmarkLoggerEnabled")
ctx, param := context.Background(), log.EnabledParameters{}
param.SetSeverity(log.SeverityDebug)

var enabled bool

b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
enabled = logger.Enabled(ctx, param)
}

_ = enabled
}
24 changes: 21 additions & 3 deletions sdk/log/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ import (
// Any of the Processor's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Processor to manage
// this concurrency.
//
// See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about how
// a Processor can be extended to support experimental features.
type Processor interface {
// OnEmit is called when a Record is emitted.
//
Expand All @@ -38,6 +35,27 @@ type Processor interface {
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record *Record) error

// Enabled returns whether the Processor will process for the given context
// and record.
//
// The passed record is likely to be a partial record with only the
// bridge-relevant information being provided (e.g a record with only the
// Severity set). If a Logger needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and record, and will be false if the Processor will not
// process. The returned value may be true or false in an indeterminate
// state. An implementation should default to returning true for an
// indeterminate state, but may return false if valid reasons in particular
// circumstances exist (e.g. performance, correctness).
//
// The SDK invokes the processors sequentially in the same order as
// they were registered using [WithProcessor] until any processor returns true.
//
// Implementations should not modify the record.
Enabled(ctx context.Context, record Record) bool

// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
Expand Down
15 changes: 0 additions & 15 deletions sdk/log/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/log/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -67,9 +66,6 @@ type LoggerProvider struct {
attributeCountLimit int
attributeValueLengthLimit int

fltrProcessorsOnce sync.Once
fltrProcessors []x.FilterProcessor

loggersMu sync.Mutex
loggers map[instrumentation.Scope]*logger

Expand Down Expand Up @@ -97,17 +93,6 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider {
}
}

func (p *LoggerProvider) filterProcessors() []x.FilterProcessor {
p.fltrProcessorsOnce.Do(func() {
for _, proc := range p.processors {
if f, ok := proc.(x.FilterProcessor); ok {
p.fltrProcessors = append(p.fltrProcessors, f)
}
}
})
return p.fltrProcessors
}

// Logger returns a new [log.Logger] with the provided name and configuration.
//
// If p is shut down, a [noop.Logger] instance is returned.
Expand Down
Loading

0 comments on commit 91010b0

Please sign in to comment.