Skip to content

Commit

Permalink
filter logs partially and them update the filter in a single call
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasmenendez committed Jun 23, 2024
1 parent 09f8392 commit 4c4b318
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 37 deletions.
27 changes: 21 additions & 6 deletions db/treedb/treedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,23 @@ func (tdb *TreeDB) Add(key, value []byte) error {
return wTx.Commit()
}

// Test checks if a key is in the tree.
func (tdb *TreeDB) Test(key []byte) (bool, error) {
// AddKey adds a key to the tree with nil value. It accepts variadic keys.
func (tdb *TreeDB) AddKey(key ...[]byte) error {
if tdb.tree == nil {
return ErrNotInitialized
}
wTx := tdb.tree.DB().WriteTx()
defer wTx.Discard()
for _, k := range key {
if err := tdb.tree.Add(wTx, k, nil); err != nil {
return err
}
}
return wTx.Commit()
}

// TestKey checks if a key is in the tree.
func (tdb *TreeDB) TestKey(key []byte) (bool, error) {
if tdb.tree == nil {
return false, ErrNotInitialized
}
Expand All @@ -114,15 +129,15 @@ func (tdb *TreeDB) Test(key []byte) (bool, error) {
return true, nil
}

// TestAndAdd checks if a key is in the tree, if not, add it to the tree. It
// TestAndAddKey checks if a key is in the tree, if not, add it to the tree. It
// is the combination of Test and conditional Add.
func (tdb *TreeDB) TestAndAdd(key, value []byte) (bool, error) {
exists, err := tdb.Test(key)
func (tdb *TreeDB) TestAndAddKey(key []byte) (bool, error) {
exists, err := tdb.TestKey(key)
if err != nil {
return false, err
}
if exists {
return true, nil
}
return false, tdb.Add(key, value)
return false, tdb.AddKey(key)
}
6 changes: 3 additions & 3 deletions scanner/providers/holders_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ type BlocksDelta struct {
// for example, if a token is rescanned. It allows to implement different
// filters, such as in-memory, disk, merkle tree, etc.
type Filter interface {
Add(key, value []byte) error
Test(key []byte) (bool, error)
TestAndAdd(key, value []byte) (bool, error)
AddKey(key ...[]byte) error
TestKey(key []byte) (bool, error)
TestAndAddKey(key []byte) (bool, error)
}

// HolderProvider is the interface that wraps the basic methods to interact with
Expand Down
27 changes: 24 additions & 3 deletions scanner/providers/web3/erc20_provider.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package web3

import (
"bytes"
"context"
"crypto/sha256"
"errors"
Expand Down Expand Up @@ -160,6 +161,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
balances := make(map[common.Address]*big.Int)
// iterate the logs and update the balances
log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs))
processedLogs := &partialProcessedLogs{}
for _, currentLog := range logs {
// skip the log if it has been removed
if currentLog.Removed {
Expand All @@ -179,7 +181,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
}
// check if the log has been already processed and add it to the filter
// if it is not already included
processed, err := p.isLogAlreadyProcessed(currentLog)
processed, err := p.isLogAlreadyProcessed(currentLog, processedLogs)
if err != nil {
return nil, &providers.BlocksDelta{
Block: lastBlock,
Expand Down Expand Up @@ -209,6 +211,9 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
balances[logData.From] = new(big.Int).Neg(logData.Value)
}
}
if err := p.filter.AddKey(processedLogs.ids...); err != nil {
return nil, nil, errors.Join(ErrAddingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err))
}
log.Infow("logs parsed",
"count", len(balances),
"new_logs", newTransfers,
Expand Down Expand Up @@ -391,7 +396,7 @@ func (p *ERC20HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map[
// number and log index. It returns true if the log has been already processed
// or false if it has not been processed yet. If some error occurs, it returns
// false and the error.
func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error) {
func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProcessedLogs) (bool, error) {
// if the filter is not defined, return false
if p.filter == nil {
return false, nil
Expand All @@ -403,6 +408,22 @@ func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error) {
if _, err := hashFn.Write([]byte(transferID)); err != nil {
return false, err
}
// check if the hash is in the filter
hID := hashFn.Sum(nil)[:8]
return p.filter.TestAndAdd(hID, nil)
exists, err := p.filter.TestKey(hID)
if err != nil {
return false, err
}
if exists {
return true, nil
}
// if the hash is not in the filter, check if it is in the partial filter
for _, id := range pl.ids {
if bytes.Equal(id, hID) {
return true, nil
}
}
// add the hash to the partial filter if it has not been processed and return
pl.ids = append(pl.ids, hID)
return false, nil
}
30 changes: 26 additions & 4 deletions scanner/providers/web3/erc721_provider.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package web3

import (
"bytes"
"context"
"crypto/sha256"
"errors"
Expand Down Expand Up @@ -157,6 +158,8 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
alreadyProcessedLogs := uint64(0)
balances := make(map[common.Address]*big.Int)
// iterate the logs and update the balances
log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs))
processedLogs := &partialProcessedLogs{}
for _, currentLog := range logs {
// skip the log if it has been removed
if currentLog.Removed {
Expand All @@ -176,7 +179,7 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
}
// check if the log has been already processed and add it to the filter
// if it is not already included
processed, err := p.isLogAlreadyProcessed(currentLog)
processed, err := p.isLogAlreadyProcessed(currentLog, processedLogs)
if err != nil {
return nil, &providers.BlocksDelta{
Block: lastBlock,
Expand Down Expand Up @@ -206,7 +209,10 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
balances[logData.From] = big.NewInt(-1)
}
}
log.Infow("saving blocks",
if err := p.filter.AddKey(processedLogs.ids...); err != nil {
return nil, nil, errors.Join(ErrAddingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err))
}
log.Infow("logs parsed",
"count", len(balances),
"new_logs", newTransfers,
"already_processed_logs", alreadyProcessedLogs,
Expand Down Expand Up @@ -386,7 +392,7 @@ func (p *ERC721HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map
// number and log index. It returns true if the log has been already processed
// or false if it has not been processed yet. If some error occurs, it returns
// false and the error.
func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error) {
func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProcessedLogs) (bool, error) {
// if the filter is not defined, return false
if p.filter == nil {
return false, nil
Expand All @@ -398,6 +404,22 @@ func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error)
if _, err := hashFn.Write([]byte(transferID)); err != nil {
return false, err
}
// check if the hash is in the filter
hID := hashFn.Sum(nil)[:8]
return p.filter.TestAndAdd(hID, nil)
exists, err := p.filter.TestKey(hID)
if err != nil {
return false, err
}
if exists {
return true, nil
}
// if the hash is not in the filter, check if it is in the partial filter
for _, id := range pl.ids {
if bytes.Equal(id, hID) {
return true, nil
}
}
// add the hash to the partial filter if it has not been processed and return
pl.ids = append(pl.ids, hID)
return false, nil
}
30 changes: 26 additions & 4 deletions scanner/providers/web3/erc777_provider.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package web3

import (
"bytes"
"context"
"crypto/sha256"
"errors"
Expand Down Expand Up @@ -157,6 +158,8 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
alreadyProcessedLogs := uint64(0)
balances := make(map[common.Address]*big.Int)
// iterate the logs and update the balances
log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs))
processedLogs := &partialProcessedLogs{}
for _, currentLog := range logs {
// skip the log if it has been removed
if currentLog.Removed {
Expand All @@ -176,7 +179,7 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
}
// check if the log has been already processed and add it to the filter
// if it is not already included
processed, err := p.isLogAlreadyProcessed(currentLog)
processed, err := p.isLogAlreadyProcessed(currentLog, processedLogs)
if err != nil {
return nil, &providers.BlocksDelta{
Block: lastBlock,
Expand Down Expand Up @@ -206,7 +209,10 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
balances[logData.From] = big.NewInt(-1)
}
}
log.Infow("saving blocks",
if err := p.filter.AddKey(processedLogs.ids...); err != nil {
return nil, nil, errors.Join(ErrAddingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err))
}
log.Infow("logs parsed",
"count", len(balances),
"new_logs", newTransfers,
"already_processed_logs", alreadyProcessedLogs,
Expand Down Expand Up @@ -386,7 +392,7 @@ func (p *ERC777HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map
// number and log index. It returns true if the log has been already processed
// or false if it has not been processed yet. If some error occurs, it returns
// false and the error.
func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error) {
func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProcessedLogs) (bool, error) {
// if the filter is not defined, return false
if p.filter == nil {
return false, nil
Expand All @@ -398,6 +404,22 @@ func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error)
if _, err := hashFn.Write([]byte(transferID)); err != nil {
return false, err
}
// check if the hash is in the filter
hID := hashFn.Sum(nil)[:8]
return p.filter.TestAndAdd(hID, nil)
exists, err := p.filter.TestKey(hID)
if err != nil {
return false, err
}
if exists {
return true, nil
}
// if the hash is not in the filter, check if it is in the partial filter
for _, id := range pl.ids {
if bytes.Equal(id, hID) {
return true, nil
}
}
// add the hash to the partial filter if it has not been processed and return
pl.ids = append(pl.ids, hID)
return false, nil
}
1 change: 1 addition & 0 deletions scanner/providers/web3/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ var (
ErrParsingTokenLogs = fmt.Errorf("error parsing token logs")
ErrCheckingProcessedLogs = fmt.Errorf("error checking processed logs")
ErrGettingTotalSupply = fmt.Errorf("error getting total supply")
ErrAddingProcessedLogs = fmt.Errorf("error adding processed logs to the filter")
)
8 changes: 8 additions & 0 deletions scanner/providers/web3/web3_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ type Web3ProviderConfig struct {
DB *db.Database
}

// partialProcessedLogs struct is used to store the logs that are partially
// processed by the provider. It is used to avoid to process the same logs
// multiple times if the provider is rescanned and to store the logs that are
// already processed in a single call to the token filter.
type partialProcessedLogs struct {
ids [][]byte
}

// creationBlock function returns the block number of the creation of a contract
// address. It uses the `eth_getCode` method to get the contract code at the
// block number provided. If the method is not supported, it returns 0 and nil.
Expand Down
16 changes: 0 additions & 16 deletions scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,6 @@ func (s *Scanner) Start(ctx context.Context) {
// get the status of the token in the updater queue
status := s.updater.RequestStatus(reqID, true)
if status != nil {
log.Infow("token status in the updater queue",
"address", token.Address.Hex(),
"chainID", token.ChainID,
"externalID", token.ExternalID,
"lastBlock", status.LastBlock,
"lastTotalSupply", status.LastTotalSupply,
"totalNewLogs", status.TotalNewLogs,
"totalAlreadyProcessedLogs", status.TotalAlreadyProcessedLogs,
"totalLogs", status.TotalLogs,
"done", status.Done)
// if the token is in the updater queue, update the
// internal token status and continue to the next token
// only if the token is done
Expand Down Expand Up @@ -159,12 +149,6 @@ func (s *Scanner) Start(ctx context.Context) {
log.Warnw("error enqueuing token", "error", err)
continue
}
log.Infow("token enqueued from the scanner",
"address", token.Address.Hex(),
"chainID", token.ChainID,
"externalID", token.ExternalID,
"from", token.LastBlock,
"to", lastNetworkBlock)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion scanner/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (u *Updater) Start(ctx context.Context, concurrentTokens int) {
default:
req, id := u.next()
if req == nil {
log.Info("no more requests to process, sleeping...")
time.Sleep(u.coolDown)
continue
}
Expand Down

0 comments on commit 4c4b318

Please sign in to comment.