Skip to content

Commit

Permalink
eth/filters: add global logs cache
Browse files Browse the repository at this point in the history
This adds a cache for block logs which is shared by all filters. In
order to share the cache reference, filters now need to be created
through a 'filter system' object.
  • Loading branch information
fjl committed Aug 2, 2022
1 parent 201e998 commit 4e5a699
Show file tree
Hide file tree
Showing 19 changed files with 205 additions and 172 deletions.
31 changes: 13 additions & 18 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type SimulatedBackend struct {
pendingState *state.StateDB // Currently pending state that will be the active on request
pendingReceipts types.Receipts // Currently receipts for the pending block

events *filters.EventSystem // Event system for filtering log events live
events *filters.EventSystem // for filtering log events live
filterSystem *filters.FilterSystem // for filtering database logs

config *params.ChainConfig
}
Expand All @@ -86,7 +87,11 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis
blockchain: blockchain,
config: genesis.Config,
}
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)

filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

backend.rollback(blockchain.CurrentBlock())
return backend
}
Expand Down Expand Up @@ -689,7 +694,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
var filter *filters.Filter
if query.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics)
} else {
// Initialize unset filter boundaries to run from genesis to chain head
from := int64(0)
Expand All @@ -701,7 +706,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -827,7 +832,8 @@ type filterBackend struct {
backend *SimulatedBackend
}

func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }

func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") }

func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumber) (*types.Header, error) {
Expand All @@ -853,19 +859,8 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ
return rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config()), nil
}

func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
number := rawdb.ReadHeaderNumber(fb.db, hash)
if number == nil {
return nil, nil
}
receipts := rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config())
if receipts == nil {
return nil, nil
}
logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
}
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
logs := rawdb.ReadLogs(fb.db, hash, number, fb.bc.Config())
return logs, nil
}

Expand Down
20 changes: 18 additions & 2 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ import (
"github.com/ethereum/go-ethereum/accounts/usbwallet"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/naoina/toml"
)

Expand Down Expand Up @@ -163,7 +166,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
override := ctx.Bool(utils.OverrideTerminalTotalDifficultyPassed.Name)
cfg.Eth.OverrideTerminalTotalDifficultyPassed = &override
}

backend, eth := utils.RegisterEthService(stack, &cfg.Eth)

// Warn users to migrate if they have a legacy freezer format.
if eth != nil && !ctx.IsSet(utils.IgnoreLegacyReceiptsFlag.Name) {
firstIdx := uint64(0)
Expand All @@ -181,10 +186,21 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
utils.Fatalf("Database has receipts with a legacy format. Please run `geth db freezer-migrate`.")
}
}
// Configure GraphQL if requested

// Enable filter RPC API.
filterBackend := backend.(filters.Backend)
filterSystem := filters.NewFilterSystem(filterBackend, filters.Config{})
isLightClient := cfg.Eth.SyncMode == downloader.LightSync
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, isLightClient),
}})

// Configure GraphQL if requested.
if ctx.IsSet(utils.GraphQLEnabledFlag.Name) {
utils.RegisterGraphQLService(stack, backend, cfg.Node)
utils.RegisterGraphQLService(stack, backend, filterSystem, cfg.Node)
}

// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL)
Expand Down
6 changes: 4 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
ethcatalyst "github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
Expand Down Expand Up @@ -2014,8 +2015,9 @@ func RegisterEthStatsService(stack *node.Node, backend ethapi.Backend, url strin
}

// RegisterGraphQLService is a utility function to construct a new service and register it against a node.
func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.Config) {
if err := graphql.New(stack, backend, cfg.GraphQLCors, cfg.GraphQLVirtualHosts); err != nil {
func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cfg node.Config) {
err := graphql.New(stack, backend, filterSystem, cfg.GraphQLCors, cfg.GraphQLVirtualHosts)
if err != nil {
Fatalf("Failed to register the GraphQL service: %v", err)
}
}
Expand Down
5 changes: 0 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ const (
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
logsCacheLimit = 32
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
Expand Down Expand Up @@ -199,7 +198,6 @@ type BlockChain struct {
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
logsCache *lru.Cache // Cache for the most recent logs per block
blockCache *lru.Cache // Cache for the most recent entire blocks
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing
Expand Down Expand Up @@ -227,7 +225,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
receiptsCache, _ := lru.New(receiptsCacheLimit)
logsCache, _ := lru.New(logsCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
txLookupCache, _ := lru.New(txLookupCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
Expand All @@ -247,7 +244,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
logsCache: logsCache,
blockCache: blockCache,
txLookupCache: txLookupCache,
futureBlocks: futureBlocks,
Expand Down Expand Up @@ -685,7 +681,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.logsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()
Expand Down
17 changes: 0 additions & 17 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,6 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
return receipts
}

// GetLogsByHash retrieves the logs for all transactions in a given block.
func (bc *BlockChain) GetLogsByHash(hash common.Hash) [][]*types.Log {
if logs, ok := bc.logsCache.Get(hash); ok {
return logs.([][]*types.Log)
}
number := rawdb.ReadHeaderNumber(bc.db, hash)
if number == nil {
return nil
}
logs := rawdb.ReadLogs(bc.db, hash, *number, bc.chainConfig)
if logs == nil {
return nil
}
bc.logsCache.Add(hash, logs)
return logs
}

// GetUnclesInChain retrieves all the uncles from a given block backwards until
// a specific distance is reached.
func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
Expand Down
4 changes: 2 additions & 2 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type
return b.eth.blockchain.GetReceiptsByHash(hash), nil
}

func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
return b.eth.blockchain.GetLogsByHash(hash), nil
func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
return rawdb.ReadLogs(b.eth.chainDb, hash, number, b.ChainConfig()), nil
}

func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {
Expand Down
5 changes: 0 additions & 5 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -41,7 +40,6 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
Expand Down Expand Up @@ -315,9 +313,6 @@ func (s *Ethereum) APIs() []rpc.API {
}, {
Namespace: "eth",
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux),
}, {
Namespace: "eth",
Service: filters.NewFilterAPI(s.APIBackend, false, 5*time.Minute),
}, {
Namespace: "admin",
Service: NewAdminAPI(s),
Expand Down
20 changes: 10 additions & 10 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,22 @@ type filter struct {
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type FilterAPI struct {
backend Backend
sys *FilterSystem
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
timeout time.Duration
}

// NewFilterAPI returns a new FilterAPI instance.
func NewFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *FilterAPI {
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
api := &FilterAPI{
backend: backend,
events: NewEventSystem(backend, lightMode),
sys: system,
events: NewEventSystem(system, lightMode),
filters: make(map[rpc.ID]*filter),
timeout: timeout,
timeout: system.cfg.Timeout,
}
go api.timeoutLoop(timeout)
go api.timeoutLoop(system.cfg.Timeout)

return api
}
Expand Down Expand Up @@ -320,7 +320,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
var filter *Filter
if crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
Expand All @@ -332,7 +332,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
end = crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -371,7 +371,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
var filter *Filter
if f.crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
Expand All @@ -383,7 +383,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
end = f.crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down
16 changes: 11 additions & 5 deletions eth/filters/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,27 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {

b.Log("Running filter benchmarks...")
start = time.Now()
var backend *testBackend

var (
backend *testBackend
sys *FilterSystem
)
for i := 0; i < benchFilterCnt; i++ {
if i%20 == 0 {
db.Close()
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "", false)
backend = &testBackend{db: db, sections: cnt}
sys = NewFilterSystem(backend, Config{})
}
var addr common.Address
addr[0] = byte(i)
addr[1] = byte(i / 256)
filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
filter := sys.NewRangeFilter(0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
if _, err := filter.Logs(context.Background()); err != nil {
b.Error("filter.Find error:", err)
b.Error("filter.Logs error:", err)
}
}

d = time.Since(start)
b.Log("Finished running filter benchmarks")
b.Log(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks")
Expand Down Expand Up @@ -171,10 +176,11 @@ func BenchmarkNoBloomBits(b *testing.B) {

clearBloomBits(db)

_, sys := newTestFilterSystem(b, db, Config{})

b.Log("Running filter benchmarks...")
start := time.Now()
backend := &testBackend{db: db}
filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil)
filter := sys.NewRangeFilter(0, int64(*headNum), []common.Address{{}}, nil)
filter.Logs(context.Background())
d := time.Since(start)
b.Log("Finished running filter benchmarks")
Expand Down
Loading

0 comments on commit 4e5a699

Please sign in to comment.