Skip to content

Commit

Permalink
fix(processor): Fix panic from nil items (#413)
Browse files Browse the repository at this point in the history
* fix(mimicry): Cap per-peer workers to 1

* fix(processor): Fix panic from nil items

* fix(processor): Fix panic from nil items

* style: Remove extra line in ExportTransactions function

* refactor: Update item creation in batch_test.go

* test: improve error messages in tests and add thread-safe getters

* test: Add test for blocking exporter in batch processor

---------

Co-authored-by: Matty Evans <mattevansnz@gmail.com>
  • Loading branch information
samcm and mattevans authored Nov 29, 2024
1 parent 15dfba3 commit a280136
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 22 deletions.
21 changes: 17 additions & 4 deletions pkg/mimicry/p2p/execution/event_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,31 @@ func (t TransactionExporter) Shutdown(ctx context.Context) error {
}

func (p *Peer) ExportTransactions(ctx context.Context, items []*TransactionHashItem) error {
if len(items) == 0 {
return nil
}

go func() {
hashes := make([]common.Hash, len(items))
seenMap := map[common.Hash]time.Time{}
var hashes []common.Hash

seenMap := make(map[common.Hash]time.Time, len(items))

for _, item := range items {
if item == nil {
continue
}

for i, item := range items {
exists := p.sharedCache.Transaction.Get(item.Hash.String())
if exists == nil {
hashes[i] = item.Hash
hashes = append(hashes, item.Hash)
seenMap[item.Hash] = item.Seen
}
}

if len(hashes) == 0 {
return
}

txs, err := p.client.GetPooledTransactions(ctx, hashes)
if err != nil {
p.log.WithError(err).Warn("Failed to get pooled transactions")
Expand Down
32 changes: 29 additions & 3 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,14 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
prepared := []*TraceableItem[T]{}

for _, i := range s[start:end] {
if i == nil {
bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1))

bvp.log.Warnf("Attempted to write a nil item. This item has been dropped. This probably shouldn't happen and is likely a bug.")

continue
}

item := &TraceableItem[T]{
item: i,
}
Expand Down Expand Up @@ -284,8 +292,18 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa
defer cancel()
}

// Since the batch processor filters out nil items upstream,
// we can optimize by pre-allocating the full slice size.
// Worst case is a few wasted allocations if any nil items slip through.
items := make([]*T, len(itemsBatch))

for i, item := range itemsBatch {
if item == nil {
bvp.log.Warnf("Attempted to export a nil item. This item has been dropped. This probably shouldn't happen and is likely a bug.")

continue
}

items[i] = item.item
}

Expand All @@ -298,10 +316,10 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa
bvp.metrics.ObserveExportDuration(bvp.name, duration)

if err != nil {
bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch)))
bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(items)))
} else {
bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch)))
bvp.metrics.ObserveBatchSize(bvp.name, float64(len(itemsBatch)))
bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(items)))
bvp.metrics.ObserveBatchSize(bvp.name, float64(len(items)))
}

for _, item := range itemsBatch {
Expand Down Expand Up @@ -427,6 +445,14 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) {

return
case item := <-bvp.queue:
if item == nil {
bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1))

bvp.log.Warnf("Attempted to build a batch with a nil item. This item has been dropped. This probably shouldn't happen and is likely a bug.")

continue
}

batch = append(batch, item)

if len(batch) >= bvp.o.MaxExportBatchSize {
Expand Down
Loading

0 comments on commit a280136

Please sign in to comment.