Skip to content

Commit

Permalink
[processor/logstransform]: Fix shutdown ordering leading to panic (#3…
Browse files Browse the repository at this point in the history
…1153)

**Description:**
* re-order how we start and stop the different goroutines in the
logstransform processor

The idea is, we start the goroutines from the consumer end up to the
producer end, then shut them down in reverse order. This is similar to
how stanza itself starts and stops it's operators, for instance (starts
in reverse topological order, stops in topological order).

**Link to tracking Issue:** Closes #31139

**Testing:**
Added a unit test. This unit test regularly fails on the code on main
(panics), but works consistently on this branch (I've run it 100 times
to makes sure).
```sh
go test -timeout 10m -count=100 -v -run '^TestProcessorShutdownWithSlowOperator$' github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor
```
  • Loading branch information
BinaryFissionGames authored Feb 9, 2024
1 parent 7320350 commit a932e91
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 34 deletions.
13 changes: 13 additions & 0 deletions .chloggen/fix_log-transform-shutdown.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: logstransformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix potential panic on shutdown due to incorrect shutdown order

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31139]
125 changes: 91 additions & 34 deletions processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type logsTransformProcessor struct {
emitter *adapter.LogEmitter
converter *adapter.Converter
fromConverter *adapter.FromPdataConverter
wg sync.WaitGroup
shutdownFns []component.ShutdownFunc
}

Expand Down Expand Up @@ -64,25 +63,81 @@ func (ltp *logsTransformProcessor) Capabilities() consumer.Capabilities {
}

func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error {
for _, fn := range ltp.shutdownFns {
ltp.logger.Info("Stopping logs transform processor")
// We call the shutdown functions in reverse order, so that the last thing we started
// is stopped first.
for i := len(ltp.shutdownFns) - 1; i >= 0; i-- {
fn := ltp.shutdownFns[i]

if err := fn(ctx); err != nil {
return err
}
}
ltp.wg.Wait()

return nil
}

func (ltp *logsTransformProcessor) Start(ctx context.Context, _ component.Host) error {
// create all objects before starting them, since the loops (consumerLoop, converterLoop) depend on these converters not being nil.
ltp.converter = adapter.NewConverter(ltp.logger)

wkrCount := int(math.Max(1, float64(runtime.NumCPU())))
ltp.fromConverter = adapter.NewFromPdataConverter(wkrCount, ltp.logger)

// data flows in this order:
// ConsumeLogs: receives logs and forwards them for conversion to stanza format ->
// fromConverter: converts logs to stanza format ->
// converterLoop: forwards converted logs to the stanza pipeline ->
// pipeline: performs user configured operations on the logs ->
// emitterLoop: forwards output stanza logs for conversion to OTLP ->
// converter: converts stanza logs to OTLP ->
// consumerLoop: sends the converted OTLP logs to the next consumer
//
// We should start these components in reverse order of the data flow, then stop them in order of the data flow,
// in order to allow for pipeline draining.
ltp.startConsumerLoop(ctx)
ltp.startConverter()
ltp.startEmitterLoop(ctx)
err := ltp.startPipeline()
if err != nil {
return err
}
ltp.startConverterLoop(ctx)
ltp.startFromConverter()

return nil
}

func (ltp *logsTransformProcessor) startFromConverter() {
ltp.fromConverter.Start()

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.fromConverter.Stop()
return nil
})
}

// startConverterLoop starts the converter loop, which reads all the logs translated by the fromConverter and then forwards
// them to pipeline
func (ltp *logsTransformProcessor) startConverterLoop(ctx context.Context) {
wg := &sync.WaitGroup{}
wg.Add(1)
go ltp.converterLoop(ctx, wg)

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
wg.Wait()
return nil
})
}

func (ltp *logsTransformProcessor) startPipeline() error {
// There is no need for this processor to use storage
err := ltp.pipe.Start(storage.NewNopClient())
if err != nil {
return err
}

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.logger.Info("Stopping logs transform processor")
return ltp.pipe.Stop()
})

Expand All @@ -92,40 +147,42 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, _ component.Host)
}
ltp.firstOperator = pipelineOperators[0]

wkrCount := int(math.Max(1, float64(runtime.NumCPU())))
return nil
}

ltp.converter = adapter.NewConverter(ltp.logger)
// startEmitterLoop starts the loop which reads all the logs modified by the pipeline and then forwards
// them to converter
func (ltp *logsTransformProcessor) startEmitterLoop(ctx context.Context) {
wg := &sync.WaitGroup{}
wg.Add(1)
go ltp.emitterLoop(ctx, wg)

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
wg.Wait()
return nil
})
}

func (ltp *logsTransformProcessor) startConverter() {
ltp.converter.Start()

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.converter.Stop()
return nil
})
}

// startConsumerLoop starts the loop which reads all the logs produced by the converter
// (aggregated by Resource) and then places them on the next consumer
func (ltp *logsTransformProcessor) startConsumerLoop(ctx context.Context) {
wg := &sync.WaitGroup{}
wg.Add(1)
go ltp.consumerLoop(ctx, wg)

ltp.fromConverter = adapter.NewFromPdataConverter(wkrCount, ltp.logger)
ltp.fromConverter.Start()
ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.fromConverter.Stop()
wg.Wait()
return nil
})
// Below we're starting 3 loops:
// * first which reads all the logs translated by the fromConverter and then forwards
// them to pipeline
// ...
ltp.wg.Add(1)
go ltp.converterLoop(ctx)

// * second which reads all the logs modified by the pipeline and then forwards
// them to converter
// ...
ltp.wg.Add(1)
go ltp.emitterLoop(ctx)

// ...
// * third which reads all the logs produced by the converter
// (aggregated by Resource) and then places them on the next consumer
ltp.wg.Add(1)
go ltp.consumerLoop(ctx)
return nil
}

func (ltp *logsTransformProcessor) ConsumeLogs(_ context.Context, ld plog.Logs) error {
Expand All @@ -135,8 +192,8 @@ func (ltp *logsTransformProcessor) ConsumeLogs(_ context.Context, ld plog.Logs)

// converterLoop reads the log entries produced by the fromConverter and sends them
// into the pipeline
func (ltp *logsTransformProcessor) converterLoop(ctx context.Context) {
defer ltp.wg.Done()
func (ltp *logsTransformProcessor) converterLoop(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
Expand All @@ -163,8 +220,8 @@ func (ltp *logsTransformProcessor) converterLoop(ctx context.Context) {

// emitterLoop reads the log entries produced by the emitter and batches them
// in converter.
func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context) {
defer ltp.wg.Done()
func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
Expand All @@ -185,8 +242,8 @@ func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context) {
}

// consumerLoop reads converter log entries and calls the consumer to consumer them.
func (ltp *logsTransformProcessor) consumerLoop(ctx context.Context) {
defer ltp.wg.Done()
func (ltp *logsTransformProcessor) consumerLoop(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
Expand Down
86 changes: 86 additions & 0 deletions processor/logstransformprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor/processortest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
Expand Down Expand Up @@ -195,3 +196,88 @@ func generateLogData(messages []testLogMessage) plog.Logs {

return ld
}

// laggy operator is a test operator that simulates heavy processing that takes a large amount of time.
// The heavy processing only occurs for every 100th log
type laggyOperator struct {
helper.WriterOperator
logsCount int
}

func (t *laggyOperator) Process(ctx context.Context, e *entry.Entry) error {

// Wait for a large amount of time every 100 logs
if t.logsCount%100 == 0 {
time.Sleep(100 * time.Millisecond)
}

t.logsCount++

t.Write(ctx, e)
return nil
}

func (t *laggyOperator) CanProcess() bool {
return true
}

type laggyOperatorConfig struct {
helper.WriterConfig
}

func (l *laggyOperatorConfig) Build(s *zap.SugaredLogger) (operator.Operator, error) {
wo, err := l.WriterConfig.Build(s)
if err != nil {
return nil, err
}

return &laggyOperator{
WriterOperator: wo,
}, nil
}

func TestProcessorShutdownWithSlowOperator(t *testing.T) {
operator.Register("laggy", func() operator.Builder { return &laggyOperatorConfig{} })

config := &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{
{
Builder: func() *laggyOperatorConfig {
l := &laggyOperatorConfig{}
l.OperatorType = "laggy"
return l
}(),
},
},
},
}

tln := new(consumertest.LogsSink)
factory := NewFactory()
ltp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), config, tln)
require.NoError(t, err)
assert.True(t, ltp.Capabilities().MutatesData)

err = ltp.Start(context.Background(), nil)
require.NoError(t, err)

testLog := plog.NewLogs()
scopeLogs := testLog.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty()

for i := 0; i < 500; i++ {
lr := scopeLogs.LogRecords().AppendEmpty()
lr.Body().SetStr("Test message")
}

// The idea is to check that shutdown, when there are a lot of entries, doesn't try to write logs to
// a closed channel, since that'll cause a panic.
// In order to test, we send a lot of logs to be consumed, then shutdown immediately.

err = ltp.ConsumeLogs(context.Background(), testLog)
require.NoError(t, err)

err = ltp.Shutdown(context.Background())
require.NoError(t, err)
}

0 comments on commit a932e91

Please sign in to comment.