diff --git a/CHANGELOG.md b/CHANGELOG.md index f6ad01d49fe..18083080a39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,10 @@ The next release will require at least [Go 1.22]. - Zero value of `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` no longer panics. (#5665) - Add `Walk` function to `TraceState` in `go.opentelemetry.io/otel/trace` to iterate all the key-value pairs. (#5651) - Bridge the trace state in `go.opentelemetry.io/otel/bridge/opencensus`. (#5651) +- The `FilterProcessor` interface type is added in `go.opentelemetry.io/otel/sdk/log/internal/x`. + This is an optional and experimental interface that log `Processor`s can implement to instruct the `Logger` if a `Record` will be processed or not. + It replaces the existing `Enabled` method that is removed from the `Processor` interface itself. + It does not fall within the scope of the OpenTelemetry Go versioning and stability [policy](./VERSIONING.md) and it may be changed in backwards incompatible ways or removed in feature releases. (#5692) - Support [Go 1.23]. (#5720) ### Changed @@ -30,6 +34,8 @@ The next release will require at least [Go 1.22]. - `SimpleProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log` now returns `false` if the exporter is `nil`. (#5665) - Update the concurrency requirements of `Exporter` in `go.opentelemetry.io/otel/sdk/log`. (#5666) - `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` synchronizes `OnEmit` calls. (#5666) +- The `Processor` interface in `go.opentelemetry.io/otel/sdk/log` no longer includes the `Enabled` method. + See the `FilterProcessor` interface type added in `go.opentelemetry.io/otel/sdk/log/internal/x` to continue providing this functionality. (#5692) - The `SimpleProcessor` type in `go.opentelemetry.io/otel/sdk/log` is no longer comparable. (#5693) - The `BatchProcessor` type in `go.opentelemetry.io/otel/sdk/log` is no longer comparable. (#5693) - `NewMemberRaw`, `NewKeyProperty` and `NewKeyValuePropertyRaw` in `go.opentelemetry.io/otel/baggage` allow UTF-8 string in key. (#5132) @@ -50,6 +56,11 @@ The next release will require at least [Go 1.22]. - Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` and `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#5641) - Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#5650) +### Removed + +- The `Enabled` method of the `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` is removed. (#5692) +- The `Enabled` method of the `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` is removed. (#5692) + diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 3faf4d1d4ea..197fcbad43d 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -196,11 +196,6 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { return nil } -// Enabled returns if b is enabled. -func (b *BatchProcessor) Enabled(context.Context, Record) bool { - return !b.stopped.Load() && b.q != nil -} - // Shutdown flushes queued log records and shuts down the decorated exporter. func (b *BatchProcessor) Shutdown(ctx context.Context) error { if b.stopped.Swap(true) || b.q == nil { diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 0b9ece02a3d..a4c1b5f094f 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -47,7 +47,6 @@ func TestEmptyBatchConfig(t *testing.T) { ctx := context.Background() record := new(Record) assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit") - assert.False(t, bp.Enabled(ctx, *record), "Enabled") assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush") assert.NoError(t, bp.Shutdown(ctx), "Shutdown") }) @@ -270,14 +269,6 @@ func TestBatchProcessor(t *testing.T) { assert.Equal(t, 3, e.ExportN()) }) - t.Run("Enabled", func(t *testing.T) { - b := NewBatchProcessor(defaultNoopExporter) - assert.True(t, b.Enabled(ctx, Record{})) - - _ = b.Shutdown(ctx) - assert.False(t, b.Enabled(ctx, Record{})) - }) - t.Run("Shutdown", func(t *testing.T) { t.Run("Error", func(t *testing.T) { e := newTestExporter(assert.AnError) diff --git a/sdk/log/doc.go b/sdk/log/doc.go index 6a1f1b0e915..14a581db6b6 100644 --- a/sdk/log/doc.go +++ b/sdk/log/doc.go @@ -32,5 +32,8 @@ at a single endpoint their origin is decipherable. See [go.opentelemetry.io/otel/log] for more information about the OpenTelemetry Logs Bridge API. + +See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about the +experimental features. */ package log // import "go.opentelemetry.io/otel/sdk/log" diff --git a/sdk/log/example_test.go b/sdk/log/example_test.go index 7b697db3ea7..3b6c32b8961 100644 --- a/sdk/log/example_test.go +++ b/sdk/log/example_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "strings" + "sync" logapi "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/log/global" @@ -58,7 +59,7 @@ func ExampleProcessor_filtering() { // Wrap the processor so that it ignores processing log records // when a context deriving from WithIgnoreLogs is passed // to the logging methods. - processor = &ContextFilterProcessor{processor} + processor = &ContextFilterProcessor{Processor: processor} // The created processor can then be registered with // the OpenTelemetry Logs SDK using the WithProcessor option. @@ -81,6 +82,15 @@ func WithIgnoreLogs(ctx context.Context) context.Context { // [WithIgnoreLogs] is passed to its methods. type ContextFilterProcessor struct { log.Processor + + lazyFilter sync.Once + // Use the experimental FilterProcessor interface + // (go.opentelemetry.io/otel/sdk/log/internal/x). + filter filter +} + +type filter interface { + Enabled(ctx context.Context, record log.Record) bool } func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error { @@ -91,7 +101,12 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) } func (p *ContextFilterProcessor) Enabled(ctx context.Context, record log.Record) bool { - return !ignoreLogs(ctx) && p.Processor.Enabled(ctx, record) + p.lazyFilter.Do(func() { + if f, ok := p.Processor.(filter); ok { + p.filter = f + } + }) + return !ignoreLogs(ctx) && (p.filter == nil || p.filter.Enabled(ctx, record)) } func ignoreLogs(ctx context.Context) bool { diff --git a/sdk/log/internal/x/README.md b/sdk/log/internal/x/README.md new file mode 100644 index 00000000000..73f4db626af --- /dev/null +++ b/sdk/log/internal/x/README.md @@ -0,0 +1,35 @@ +# Experimental Features + +The Logs SDK contains features that have not yet stabilized. +These features are added to the OpenTelemetry Go Logs SDK prior to stabilization so that users can start experimenting with them and provide feedback. + +These feature may change in backwards incompatible ways as feedback is applied. +See the [Compatibility and Stability](#compatibility-and-stability) section for more information. + +## Features + +- [Filter Processors](#filter-processor) + +### Filter Processor + +Users of logging libraries often want to know if a log `Record` will be processed or dropped before they perform complex operations to construct the `Record`. +The [`Logger`] in the Logs Bridge API provides the `Enabled` method for just this use-case. +In order for the Logs Bridge SDK to effectively implement this API, it needs to be known if the registered [`Processor`]s are enabled for the `Record` within a context. +A [`Processor`] that knows, and can identify, what `Record` it will process or drop when it is passed to `OnEmit` can communicate this to the SDK `Logger` by implementing the `FilterProcessor`. + +By default, the SDK `Logger.Enabled` will return true when called. +Only if all the registered [`Processor`]s implement `FilterProcessor` and they all return `false` will `Logger.Enabled` return `false`. + +See the [`minsev`] [`Processor`] for an example use-case. +It is used to filter `Record`s out that a have a `Severity` below a threshold. + +[`Logger`]: https://pkg.go.dev/go.opentelemetry.io/otel/log#Logger +[`Processor`]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#Processor +[`minsev`]: https://pkg.go.dev/go.opentelemetry.io/contrib/processors/minsev + +## Compatibility and Stability + +Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md). +These features may be removed or modified in successive version releases, including patch versions. + +When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release. diff --git a/sdk/log/internal/x/x.go b/sdk/log/internal/x/x.go new file mode 100644 index 00000000000..9b3f8b7b069 --- /dev/null +++ b/sdk/log/internal/x/x.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package x contains support for Logs SDK experimental features. +package x // import "go.opentelemetry.io/otel/sdk/log/internal/x" + +import ( + "context" + + "go.opentelemetry.io/otel/log" +) + +// FilterProcessor is a [Processor] that knows, and can identify, what +// [log.Record] it will process or drop when it is passed to OnEmit. +// +// This is useful for users of logging libraries that want to know if a [log.Record] +// will be processed or dropped before they perform complex operations to +// construct the [log.Record]. +// +// [Processor] implementations that choose to support this by satisfying this +// interface are expected to re-evaluate the [log.Record]s passed to OnEmit, it is +// not expected that the caller to OnEmit will use the functionality from this +// interface prior to calling OnEmit. +// +// This should only be implemented for [Processor]s that can make reliable +// enough determination of this prior to processing a [log.Record] and where +// the result is dynamic. +// +// [Processor]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#Processor +type FilterProcessor interface { + // Enabled returns whether the Processor will process for the given context + // and record. + // + // The passed record is likely to be a partial record with only the + // bridge-relevant information being provided (e.g a record with only the + // Severity set). If a Logger needs more information than is provided, it + // is said to be in an indeterminate state (see below). + // + // The returned value will be true when the Processor will process for the + // provided context and record, and will be false if the Processor will not + // process. An implementation should default to returning true for an + // indeterminate state. + // + // Implementations should not modify the record. + Enabled(ctx context.Context, record log.Record) bool +} diff --git a/sdk/log/logger.go b/sdk/log/logger.go index 04c44ac5bb8..db41c057005 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/log/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/log/internal/x" "go.opentelemetry.io/otel/trace" ) @@ -42,13 +43,30 @@ func (l *logger) Emit(ctx context.Context, r log.Record) { } } -func (l *logger) Enabled(ctx context.Context, r log.Record) bool { - newRecord := l.newRecord(ctx, r) - for _, p := range l.provider.processors { - if enabled := p.Enabled(ctx, newRecord); enabled { +// Enabled returns true if at least one Processor held by the LoggerProvider +// that created the logger will process the record for the provided context. +// +// If it is not possible to definitively determine the record will be +// processed, true will be returned by default. A value of false will only be +// returned if it can be positively verified that no Processor will process the +// record. +func (l *logger) Enabled(ctx context.Context, record log.Record) bool { + fltrs := l.provider.filterProcessors() + // If there are more Processors than FilterProcessors we cannot be sure + // that all Processors will drop the record. Therefore, return true. + // + // If all Processors are FilterProcessors, check if any is enabled. + return len(l.provider.processors) > len(fltrs) || anyEnabled(ctx, record, fltrs) +} + +func anyEnabled(ctx context.Context, r log.Record, fltrs []x.FilterProcessor) bool { + for _, f := range fltrs { + if f.Enabled(ctx, r) { + // At least one Processor will process the Record. return true } } + // No Processor will process the record return false } diff --git a/sdk/log/logger_test.go b/sdk/log/logger_test.go index 6443bf77d71..bfa967232ec 100644 --- a/sdk/log/logger_test.go +++ b/sdk/log/logger_test.go @@ -215,8 +215,9 @@ func TestLoggerEmit(t *testing.T) { } func TestLoggerEnabled(t *testing.T) { - p0, p1, p2WithDisabled := newProcessor("0"), newProcessor("1"), newProcessor("2") - p2WithDisabled.enabled = false + p0 := newFltrProcessor("0", true) + p1 := newFltrProcessor("1", true) + p2WithDisabled := newFltrProcessor("2", false) testCases := []struct { name string @@ -273,3 +274,24 @@ func TestLoggerEnabled(t *testing.T) { }) } } + +func BenchmarkLoggerEnabled(b *testing.B) { + provider := NewLoggerProvider( + WithProcessor(newFltrProcessor("0", false)), + WithProcessor(newFltrProcessor("1", true)), + ) + logger := provider.Logger("BenchmarkLoggerEnabled") + ctx, r := context.Background(), log.Record{} + r.SetSeverityText("test") + + var enabled bool + + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + enabled = logger.Enabled(ctx, r) + } + + _ = enabled +} diff --git a/sdk/log/processor.go b/sdk/log/processor.go index 937f69f2597..fcab34c7a48 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -12,6 +12,9 @@ import ( // Any of the Processor's methods may be called concurrently with itself // or with other methods. It is the responsibility of the Processor to manage // this concurrency. +// +// See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about how +// a Processor can be extended to support experimental features. type Processor interface { // OnEmit is called when a Record is emitted. // @@ -35,27 +38,6 @@ type Processor interface { // to create a copy that shares no state with the original. OnEmit(ctx context.Context, record *Record) error - // Enabled returns whether the Processor will process for the given context - // and record. - // - // The passed record is likely to be a partial record with only the - // bridge-relevant information being provided (e.g a record with only the - // Severity set). If a Logger needs more information than is provided, it - // is said to be in an indeterminate state (see below). - // - // The returned value will be true when the Processor will process for the - // provided context and record, and will be false if the Processor will not - // process. The returned value may be true or false in an indeterminate - // state. An implementation should default to returning true for an - // indeterminate state, but may return false if valid reasons in particular - // circumstances exist (e.g. performance, correctness). - // - // The SDK invokes the processors sequentially in the same order as - // they were registered using [WithProcessor] until any processor returns true. - // - // Implementations should not modify the record. - Enabled(ctx context.Context, record Record) bool - // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. // diff --git a/sdk/log/provider.go b/sdk/log/provider.go index ede1b77c18e..9d16d801898 100644 --- a/sdk/log/provider.go +++ b/sdk/log/provider.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel/log/embedded" "go.opentelemetry.io/otel/log/noop" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/log/internal/x" "go.opentelemetry.io/otel/sdk/resource" ) @@ -65,6 +66,9 @@ type LoggerProvider struct { attributeCountLimit int attributeValueLengthLimit int + fltrProcessorsOnce sync.Once + fltrProcessors []x.FilterProcessor + loggersMu sync.Mutex loggers map[instrumentation.Scope]*logger @@ -92,6 +96,17 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider { } } +func (p *LoggerProvider) filterProcessors() []x.FilterProcessor { + p.fltrProcessorsOnce.Do(func() { + for _, proc := range p.processors { + if f, ok := proc.(x.FilterProcessor); ok { + p.fltrProcessors = append(p.fltrProcessors, f) + } + } + }) + return p.fltrProcessors +} + // Logger returns a new [log.Logger] with the provided name and configuration. // // If p is shut down, a [noop.Logger] instance is returned. diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index 39071b6a98f..4dd9cf327e9 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/log/noop" + "go.opentelemetry.io/otel/sdk/log/internal/x" "go.opentelemetry.io/otel/sdk/resource" ) @@ -31,11 +32,10 @@ type processor struct { forceFlushCalls int records []Record - enabled bool } func newProcessor(name string) *processor { - return &processor{Name: name, enabled: true} + return &processor{Name: name} } func (p *processor) OnEmit(ctx context.Context, r *Record) error { @@ -47,10 +47,6 @@ func (p *processor) OnEmit(ctx context.Context, r *Record) error { return nil } -func (p *processor) Enabled(context.Context, Record) bool { - return p.enabled -} - func (p *processor) Shutdown(context.Context) error { p.shutdownCalls++ return p.Err @@ -61,6 +57,25 @@ func (p *processor) ForceFlush(context.Context) error { return p.Err } +type fltrProcessor struct { + *processor + + enabled bool +} + +var _ x.FilterProcessor = (*fltrProcessor)(nil) + +func newFltrProcessor(name string, enabled bool) *fltrProcessor { + return &fltrProcessor{ + processor: newProcessor(name), + enabled: enabled, + } +} + +func (p *fltrProcessor) Enabled(context.Context, log.Record) bool { + return p.enabled +} + func TestNewLoggerProviderConfiguration(t *testing.T) { t.Cleanup(func(orig otel.ErrorHandler) func() { otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { diff --git a/sdk/log/simple.go b/sdk/log/simple.go index b426e414fff..1b2a68961b3 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -58,11 +58,6 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { return s.exporter.Export(ctx, *records) } -// Enabled returns true if the exporter is not nil. -func (s *SimpleProcessor) Enabled(context.Context, Record) bool { - return s.exporter != nil -} - // Shutdown shuts down the expoter. func (s *SimpleProcessor) Shutdown(ctx context.Context) error { if s.exporter == nil { diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index cc71360f8fc..f8719bfe551 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -52,12 +52,6 @@ func TestSimpleProcessorOnEmit(t *testing.T) { assert.Equal(t, []log.Record{*r}, e.records) } -func TestSimpleProcessorEnabled(t *testing.T) { - e := new(exporter) - s := log.NewSimpleProcessor(e) - assert.True(t, s.Enabled(context.Background(), log.Record{})) -} - func TestSimpleProcessorShutdown(t *testing.T) { e := new(exporter) s := log.NewSimpleProcessor(e) @@ -97,7 +91,6 @@ func TestSimpleProcessorEmpty(t *testing.T) { ctx := context.Background() record := new(log.Record) assert.NoError(t, s.OnEmit(ctx, record), "OnEmit") - assert.False(t, s.Enabled(ctx, *record), "Enabled") assert.NoError(t, s.ForceFlush(ctx), "ForceFlush") assert.NoError(t, s.Shutdown(ctx), "Shutdown") }) @@ -119,7 +112,6 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { defer wg.Done() _ = s.OnEmit(ctx, r) - _ = s.Enabled(ctx, *r) _ = s.Shutdown(ctx) _ = s.ForceFlush(ctx) }()