Skip to content

Commit

Permalink
abstraction method for processing receipt bloom
Browse files Browse the repository at this point in the history
  • Loading branch information
j75689 committed Oct 12, 2021
1 parent 6be63ca commit 81f2a3e
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 60 deletions.
2 changes: 1 addition & 1 deletion core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (b *BlockGen) AddTxWithChain(bc *BlockChain, tx *types.Transaction) {
b.SetCoinbase(common.Address{})
}
b.statedb.Prepare(tx.Hash(), common.Hash{}, len(b.txs))
receipt, err := ApplyTransaction(b.config, bc, &b.header.Coinbase, b.gasPool, b.statedb, b.header, tx, &b.header.GasUsed, vm.Config{}, nil)
receipt, err := ApplyTransaction(b.config, bc, &b.header.Coinbase, b.gasPool, b.statedb, b.header, tx, &b.header.GasUsed, vm.Config{}, NewReceiptBloomGenertor())
if err != nil {
panic(err)
}
Expand Down
65 changes: 65 additions & 0 deletions core/receipt_processer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package core

import (
"bytes"
"sync"

"github.com/ethereum/go-ethereum/core/types"
)

type ReceiptProcesser interface {
Apply(receipt *types.Receipt)
}

var (
_ ReceiptProcesser = (*ReceiptBloomGenertor)(nil)
_ ReceiptProcesser = (*AsyncReceiptBloomGenertor)(nil)
)

func NewReceiptBloomGenertor() *ReceiptBloomGenertor {
return &ReceiptBloomGenertor{}
}

type ReceiptBloomGenertor struct {
}

func (p *ReceiptBloomGenertor) Apply(receipt *types.Receipt) {
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
}

func NewAsyncReceiptBloomGenertor(length, workerSize int) *AsyncReceiptBloomGenertor {
generator := &AsyncReceiptBloomGenertor{
receipts: make(chan *types.Receipt, length),
wg: sync.WaitGroup{},
}
generator.startWorker(workerSize)
return generator
}

type AsyncReceiptBloomGenertor struct {
receipts chan *types.Receipt
wg sync.WaitGroup
}

func (p *AsyncReceiptBloomGenertor) startWorker(workerSize int) {
p.wg.Add(workerSize)
for i := 0; i < workerSize; i++ {
go func() {
defer p.wg.Done()
for receipt := range p.receipts {
if receipt != nil && bytes.Equal(receipt.Bloom[:], types.EmptyBloom[:]) {
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
}
}
}()
}
}

func (p *AsyncReceiptBloomGenertor) Apply(receipt *types.Receipt) {
p.receipts <- receipt
}

func (p *AsyncReceiptBloomGenertor) Close() {
close(p.receipts)
p.wg.Wait()
}
51 changes: 11 additions & 40 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,19 +363,6 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty
return diffLayer.Receipts, allLogs, gasUsed, nil
}

// Use for Bloom value calculation on channel
type BloomTxMap struct {
Txhash common.Hash
Bloom types.Bloom
}

func BloomGenerator(jobs <-chan *types.Receipt, results chan<- BloomTxMap) {
for receipt := range jobs {
results <- BloomTxMap{receipt.TxHash, types.CreateBloom(types.Receipts{receipt})}
}
close(results)
}

// Process processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb and applying any rewards to both
// the processor (coinbase) and any included uncles.
Expand Down Expand Up @@ -403,14 +390,13 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
blockContext := NewEVMBlockContext(header, p.bc, nil)
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)

txNum := len(block.Transactions())
// Iterate over and process the individual transactions
posa, isPoSA := p.engine.(consensus.PoSA)
commonTxs := make([]*types.Transaction, 0, len(block.Transactions()))
commonTxs := make([]*types.Transaction, 0, txNum)

// initilise bloom workers
bloomProcessors := make(chan *types.Receipt, len(block.Transactions()))
bloomResults := make(chan BloomTxMap, cap(bloomProcessors))
go BloomGenerator(bloomProcessors, bloomResults)
// initilise bloom processors
bloomProcessors := NewAsyncReceiptBloomGenertor(txNum, gopool.Threads(txNum))

// usually do have two tx, one for validator set contract, another for system reward contract.
systemTxs := make([]*types.Transaction, 0, 2)
Expand All @@ -437,20 +423,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
commonTxs = append(commonTxs, tx)
receipts = append(receipts, receipt)
}

close(bloomProcessors)
bloomMap := make(map[common.Hash]types.Bloom, cap(bloomProcessors))
for br := range bloomResults {
if _, ok := bloomMap[br.Txhash]; !ok {
bloomMap[br.Txhash] = br.Bloom
}
}
for _, receipt := range receipts {
if (receipt.Bloom == types.Bloom{}) {
receipt.Bloom = bloomMap[receipt.TxHash]
}
}

bloomProcessors.Close()

// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
err := p.engine.Finalize(p.bc, header, statedb, &commonTxs, block.Uncles(), &receipts, &systemTxs, usedGas)
Expand All @@ -464,7 +437,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
return statedb, receipts, allLogs, *usedGas, nil
}

func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM, bloomProcessors chan *types.Receipt) (*types.Receipt, error) {
func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM, receiptProcessers ...ReceiptProcesser) (*types.Receipt, error) {
// Create a new context to be used in the EVM environment.
txContext := NewEVMTxContext(msg)
evm.Reset(txContext, statedb)
Expand Down Expand Up @@ -502,22 +475,20 @@ func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainCon

// Set the receipt logs and create the bloom filter.
receipt.Logs = statedb.GetLogs(tx.Hash())
if bloomProcessors == nil {
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
} else {
bloomProcessors <- receipt
}
receipt.BlockHash = statedb.BlockHash()
receipt.BlockNumber = header.Number
receipt.TransactionIndex = uint(statedb.TxIndex())
for _, receiptProcesser := range receiptProcessers {
receiptProcesser.Apply(receipt)
}
return receipt, err
}

// ApplyTransaction attempts to apply a transaction to the given state database
// and uses the input parameters for its environment. It returns the receipt
// for the transaction, gas used and an error if the transaction failed,
// indicating the block was invalid.
func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config, bloomProcessors chan *types.Receipt) (*types.Receipt, error) {
func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config, receiptProcessers ...ReceiptProcesser) (*types.Receipt, error) {
msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
if err != nil {
return nil, err
Expand All @@ -530,5 +501,5 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo
vm.EVMInterpreterPool.Put(ite)
vm.EvmPool.Put(vmenv)
}()
return applyTransaction(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv, bloomProcessors)
return applyTransaction(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv, receiptProcessers...)
}
2 changes: 2 additions & 0 deletions core/types/bloom9.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
BloomBitLength = 8 * BloomByteLength
)

var EmptyBloom = Bloom{}

// Bloom represents a 2048 bit bloom filter.
type Bloom [BloomByteLength]byte

Expand Down
25 changes: 6 additions & 19 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/consensus/parlia"
Expand Down Expand Up @@ -736,10 +737,10 @@ func (w *worker) updateSnapshot() {
w.snapshotState = w.current.state.Copy()
}

func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address, bloomProcessors chan *types.Receipt) ([]*types.Log, error) {
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address, receiptProcessers ...core.ReceiptProcesser) ([]*types.Log, error) {
snap := w.current.state.Snapshot()

receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig(), bloomProcessors)
receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig(), receiptProcessers...)
if err != nil {
w.current.state.RevertToSnapshot(snap)
return nil, err
Expand Down Expand Up @@ -770,10 +771,8 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
defer stopTimer.Stop()
}

// initilise bloom workers
bloomProcessors := make(chan *types.Receipt, txs.CurrentSize())
bloomResults := make(chan core.BloomTxMap, cap(bloomProcessors))
go core.BloomGenerator(bloomProcessors, bloomResults)
// initilise bloom processors
bloomProcessors := core.NewAsyncReceiptBloomGenertor(txs.CurrentSize(), gopool.Threads(txs.CurrentSize()))

LOOP:
for {
Expand Down Expand Up @@ -865,19 +864,7 @@ LOOP:
txs.Shift()
}
}

close(bloomProcessors)
bloomMap := make(map[common.Hash]types.Bloom, cap(bloomProcessors))
for br := range bloomResults {
if _, ok := bloomMap[br.Txhash]; !ok {
bloomMap[br.Txhash] = br.Bloom
}
}
for _, receipt := range w.current.receipts {
if (receipt.Bloom == types.Bloom{}) {
receipt.Bloom = bloomMap[receipt.TxHash]
}
}
bloomProcessors.Close()

if !w.isRunning() && len(coalescedLogs) > 0 {
// We don't push the pendingLogsEvent while we are mining. The reason is that
Expand Down

0 comments on commit 81f2a3e

Please sign in to comment.