diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ca16223768..f2e40934994 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `Processor.OnEmit` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636) - `SimpleProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log` now returns `false` if the exporter is `nil`. (#5665) +- Update the concurrency requirements of `Exporter` in `go.opentelemetry.io/otel/sdk/log`. (#5666) +- `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` synchronizes `OnEmit` calls. (#5666) ### Fixed diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 92ff1429a0b..e4e3c5402bf 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -15,10 +15,6 @@ import ( ) // Exporter handles the delivery of log records to external receivers. -// -// Any of the Exporter's methods may be called concurrently with itself -// or with other methods. It is the responsibility of the Exporter to manage -// this concurrency. type Exporter interface { // Export transmits log records to a receiver. // @@ -34,6 +30,9 @@ type Exporter interface { // // Before modifying a Record, the implementation must use Record.Clone // to create a copy that shares no state with the original. + // + // Export should never be called concurrently with other Export calls. + // However, it may be called concurrently with other methods. Export(ctx context.Context, records []Record) error // Shutdown is called when the SDK shuts down. Any cleanup or release of @@ -44,6 +43,8 @@ type Exporter interface { // // After Shutdown is called, calls to Export, Shutdown, or ForceFlush // should perform no operation and return nil error. + // + // Shutdown may be called concurrently with itself or with other methods. Shutdown(ctx context.Context) error // ForceFlush exports log records to the configured Exporter that have not yet @@ -51,6 +52,8 @@ type Exporter interface { // // The deadline or cancellation of the passed context must be honored. An // appropriate error should be returned in these situations. + // + // ForceFlush may be called concurrently with itself or with other methods. ForceFlush(ctx context.Context) error } diff --git a/sdk/log/simple.go b/sdk/log/simple.go index cf7c315a62d..863be8df787 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -15,6 +15,7 @@ var _ Processor = (*SimpleProcessor)(nil) // // Use [NewSimpleProcessor] to create a SimpleProcessor. type SimpleProcessor struct { + mu sync.Mutex exporter Exporter } @@ -43,6 +44,9 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { return nil } + s.mu.Lock() + defer s.mu.Unlock() + records := simpleProcRecordsPool.Get().(*[]Record) (*records)[0] = *r defer func() { diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index b20f2071e1b..cc71360f8fc 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -5,6 +5,8 @@ package log_test import ( "context" + "io" + "strings" "sync" "testing" @@ -70,6 +72,25 @@ func TestSimpleProcessorForceFlush(t *testing.T) { require.True(t, e.forceFlushCalled, "exporter ForceFlush not called") } +type writerExporter struct { + io.Writer +} + +func (e *writerExporter) Export(_ context.Context, records []log.Record) error { + for _, r := range records { + _, _ = io.WriteString(e.Writer, r.Body().String()) + } + return nil +} + +func (e *writerExporter) Shutdown(context.Context) error { + return nil +} + +func (e *writerExporter) ForceFlush(context.Context) error { + return nil +} + func TestSimpleProcessorEmpty(t *testing.T) { assert.NotPanics(t, func() { var s log.SimpleProcessor @@ -91,7 +112,8 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { r := new(log.Record) r.SetSeverityText("test") ctx := context.Background() - s := log.NewSimpleProcessor(nil) + e := &writerExporter{new(strings.Builder)} + s := log.NewSimpleProcessor(e) for i := 0; i < goRoutineN; i++ { go func() { defer wg.Done()