Skip to content

Commit

Permalink
Flush data on shutdown in stanza adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Feb 8, 2024
1 parent 07e5786 commit 9747d92
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 97 deletions.
82 changes: 26 additions & 56 deletions pkg/stanza/adapter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
"runtime"
Expand Down Expand Up @@ -56,8 +55,6 @@ type Converter struct {
pLogsChan chan plog.Logs

stopOnce sync.Once
stopChan chan struct{}

// workerChan is an internal communication channel that gets the log
// entries from Batch() calls and it receives the data in workerLoop().
workerChan chan []*entry.Entry
Expand Down Expand Up @@ -95,7 +92,6 @@ func NewConverter(logger *zap.Logger, opts ...converterOption) *Converter {
workerChan: make(chan []*entry.Entry),
workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))),
pLogsChan: make(chan plog.Logs),
stopChan: make(chan struct{}),
flushChan: make(chan plog.Logs),
logger: logger,
}
Expand All @@ -113,15 +109,14 @@ func (c *Converter) Start() {
go c.workerLoop()
}

c.wg.Add(1)
go c.flushLoop()
}

func (c *Converter) Stop() {
c.stopOnce.Do(func() {
close(c.stopChan)
close(c.workerChan)
c.wg.Wait()
close(c.pLogsChan)
close(c.flushChan)
})
}

Expand All @@ -136,62 +131,43 @@ func (c *Converter) OutChannel() <-chan plog.Logs {
func (c *Converter) workerLoop() {
defer c.wg.Done()

for {

select {
case <-c.stopChan:
return
for entries := range c.workerChan {
resourceHashToIdx := make(map[uint64]int)

case entries, ok := <-c.workerChan:
pLogs := plog.NewLogs()
var sl plog.ScopeLogs
for _, e := range entries {
resourceID := HashResource(e.Resource)
resourceIdx, ok := resourceHashToIdx[resourceID]
if !ok {
return
}

resourceHashToIdx := make(map[uint64]int)

pLogs := plog.NewLogs()
var sl plog.ScopeLogs
for _, e := range entries {
resourceID := HashResource(e.Resource)
resourceIdx, ok := resourceHashToIdx[resourceID]
if !ok {
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()
rl := pLogs.ResourceLogs().AppendEmpty()
upsertToMap(e.Resource, rl.Resource().Attributes())
sl = rl.ScopeLogs().AppendEmpty()
} else {
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(0)
}
convertInto(e, sl.LogRecords().AppendEmpty())
}

// Send plogs directly to flushChan
select {
case c.flushChan <- pLogs:
case <-c.stopChan:
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()
rl := pLogs.ResourceLogs().AppendEmpty()
upsertToMap(e.Resource, rl.Resource().Attributes())
sl = rl.ScopeLogs().AppendEmpty()
} else {
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(0)
}
convertInto(e, sl.LogRecords().AppendEmpty())
}

// Send plogs directly to flushChan
c.flushChan <- pLogs
}
}

func (c *Converter) flushLoop() {
defer c.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for {
select {
case <-c.stopChan:
return

case pLogs := <-c.flushChan:
if err := c.flush(ctx, pLogs); err != nil {
c.logger.Debug("Problem sending log entries",
zap.Error(err),
)
}
for pLogs := range c.flushChan {
if err := c.flush(ctx, pLogs); err != nil {
c.logger.Debug("Problem sending log entries",
zap.Error(err),
)
}
}

close(c.pLogsChan)
}

// flush flushes provided plog.Logs entries onto a channel.
Expand All @@ -203,10 +179,6 @@ func (c *Converter) flush(ctx context.Context, pLogs plog.Logs) error {
return fmt.Errorf("flushing log entries interrupted, err: %w", ctx.Err())

case c.pLogsChan <- pLogs:

// The converter has been stopped so bail the flush.
case <-c.stopChan:
return errors.New("logs converter has been stopped")
}

return nil
Expand All @@ -217,8 +189,6 @@ func (c *Converter) Batch(e []*entry.Entry) error {
select {
case c.workerChan <- e:
return nil
case <-c.stopChan:
return errors.New("logs converter has been stopped")
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/adapter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,17 @@ func (e *LogEmitter) flusher(ctx context.Context) {
e.flush(ctx, oldBatch)
}
case <-ctx.Done():
if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 {
e.flush(ctx, oldBatch)
}
return
}
}
}

// flush flushes the provided batch to the log channel.
func (e *LogEmitter) flush(ctx context.Context, batch []*entry.Entry) {
select {
case e.logChan <- batch:
case <-ctx.Done():
}
e.logChan <- batch
}

// makeNewBatch replaces the current batch on the log emitter with a new batch, returning the old one
Expand Down
49 changes: 12 additions & 37 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,51 +83,27 @@ func (r *receiver) emitterLoop(ctx context.Context) {
defer r.wg.Done()

// Don't create done channel on every iteration.
doneChan := ctx.Done()
for {
select {
case <-doneChan:
r.logger.Debug("Receive loop stopped")
return

case e, ok := <-r.emitter.logChan:
if !ok {
continue
}

if err := r.converter.Batch(e); err != nil {
r.logger.Error("Could not add entry to batch", zap.Error(err))
}
for e := range r.emitter.logChan {
if err := r.converter.Batch(e); err != nil {
r.logger.Error("Could not add entry to batch", zap.Error(err))
}
}
r.converter.Stop()
}

// consumerLoop reads converter log entries and calls the consumer to consumer them.
func (r *receiver) consumerLoop(ctx context.Context) {
defer r.wg.Done()

// Don't create done channel on every iteration.
doneChan := ctx.Done()
pLogsChan := r.converter.OutChannel()
for {
select {
case <-doneChan:
r.logger.Debug("Consumer loop stopped")
return

case pLogs, ok := <-pLogsChan:
if !ok {
r.logger.Debug("Converter channel got closed")
continue
}
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
for pLogs := range pLogsChan {
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
}
}

Expand All @@ -139,8 +115,7 @@ func (r *receiver) Shutdown(ctx context.Context) error {

r.logger.Info("Stopping stanza receiver")
pipelineErr := r.pipe.Stop()
r.converter.Stop()
r.cancel()
// r.cancel()
r.wg.Wait()

if r.storageClient != nil {
Expand Down
43 changes: 43 additions & 0 deletions pkg/stanza/adapter/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -93,6 +95,47 @@ func TestHandleConsume(t *testing.T) {
require.NoError(t, logsReceiver.Shutdown(context.Background()))
}

func TestShutdownFlush(t *testing.T) {
mockConsumer := &consumertest.LogsSink{}
factory := NewFactory(TestReceiverType{}, component.StabilityLevelDevelopment)

logsReceiver, err := factory.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), factory.CreateDefaultConfig(), mockConsumer)
require.NoError(t, err, "receiver should successfully build")

err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err, "receiver start failed")

var consumedLogCount atomic.Int32
closeCh := make(chan struct{})
stanzaReceiver := logsReceiver.(*receiver)
go func() {
for {
select {
case <-closeCh:
require.NoError(t, logsReceiver.Shutdown(context.Background()))
return
default:
err := stanzaReceiver.emitter.Process(context.Background(), entry.New())
require.NoError(t, err)
}
consumedLogCount.Add(1)
}
}()
require.Eventually(t, func() bool {
return consumedLogCount.Load() > 100
}, 5*time.Second, 5*time.Millisecond)

close(closeCh)

// Eventually because of asynchronuous nature of the receiver.
require.EventuallyWithT(t,
func(t *assert.CollectT) {
assert.Equal(t, consumedLogCount.Load(), int32(mockConsumer.LogRecordCount()))
},
5*time.Second, 5*time.Millisecond,
)
}

func TestHandleConsumeRetry(t *testing.T) {
mockConsumer := consumerretry.NewMockLogsRejecter(2)
factory := NewFactory(TestReceiverType{}, component.StabilityLevelDevelopment)
Expand Down

0 comments on commit 9747d92

Please sign in to comment.