Skip to content

Commit

Permalink
Merge pull request #421 from ethpandaops/fix/batch-processor-channel-…
Browse files Browse the repository at this point in the history
…handling

refactor(processor): improve batch processor shutdown and channel handling
  • Loading branch information
mattevans authored Dec 3, 2024
2 parents c79f13e + 3827f91 commit 12e9090
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,21 +439,27 @@ func (bvp *BatchItemProcessor[T]) waitForBatchCompletion(ctx context.Context, it

func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) {
log := bvp.log.WithField("module", "batch_builder")

var batch []*TraceableItem[T]

for {
select {
case <-bvp.stopWorkersCh:
log.Info("Stopping batch builder")

return
case item := <-bvp.queue:
case item, ok := <-bvp.queue:
if !ok {
// Channel is closed, send any remaining items in the batch for processing
// before shutting down.
if len(batch) > 0 {
bvp.sendBatch(batch, "shutdown")
}

return
}

if item == nil {
bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1))

bvp.log.Warnf("Attempted to build a batch with a nil item. This item has been dropped. This probably shouldn't happen and is likely a bug.")

continue
}

Expand Down Expand Up @@ -505,19 +511,21 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) {
}

func (bvp *BatchItemProcessor[T]) drainQueue() {
bvp.log.Info("Draining queue: waiting for the batch builder to pull all the items from the queue")
bvp.log.Info("Draining queue: waiting for the batch builder to process remaining items")

// First wait for queue to be processed
for len(bvp.queue) > 0 {
time.Sleep(10 * time.Millisecond)
}

bvp.log.Info("Draining queue: waiting for workers to finish processing batches")

for len(bvp.queue) > 0 {
<-bvp.queue
// Then wait for any in-flight batches
for len(bvp.batchCh) > 0 {
time.Sleep(10 * time.Millisecond)
}

bvp.log.Info("Draining queue: all batches finished")
bvp.log.Info("Draining queue: all items processed")

close(bvp.queue)
}
Expand Down

0 comments on commit 12e9090

Please sign in to comment.