diff --git a/.golangci.yml b/.golangci.yml index c3746c33..e082039c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,10 +1,11 @@ run: go: '1.20' - skip-files: - - scanner/providers/gitcoin/gitcoin_stamps.go issues: max-same-issues: 0 exclude-use-default: false + exclude-files: + - scanner/providers/gitcoin/gitcoin_stamps.go + - scripts/* linters: enable: - misspell diff --git a/api/api.go b/api/api.go index 2bf2ecd3..a76ab7e2 100644 --- a/api/api.go +++ b/api/api.go @@ -18,6 +18,7 @@ import ( queries "github.com/vocdoni/census3/db/sqlc" "github.com/vocdoni/census3/helpers/queue" "github.com/vocdoni/census3/helpers/web3" + "github.com/vocdoni/census3/scanner" "github.com/vocdoni/census3/scanner/providers" "github.com/vocdoni/census3/scanner/providers/manager" web3provider "github.com/vocdoni/census3/scanner/providers/web3" @@ -39,8 +40,10 @@ type Census3APIConf struct { Hostname string Port int DataDir string + FiltersPath string GroupKey string Web3Providers *web3.Web3Pool + TokenUpdater *scanner.Updater HolderProviders *manager.ProviderManager AdminToken string } @@ -57,6 +60,8 @@ type census3API struct { holderProviders *manager.ProviderManager cache *lru.Cache[CacheKey, any] router *httprouter.HTTProuter + tokenUpdater *scanner.Updater + filtersPath string } func Init(db *db.DB, conf Census3APIConf) (*census3API, error) { @@ -70,8 +75,10 @@ func Init(db *db.DB, conf Census3APIConf) (*census3API, error) { w3p: conf.Web3Providers, queue: queue.NewBackgroundQueue(), holderProviders: conf.HolderProviders, + tokenUpdater: conf.TokenUpdater, cache: cache, router: &httprouter.HTTProuter{}, + filtersPath: conf.FiltersPath, } // get the current chainID log.Infow("starting API", "web3Providers", conf.Web3Providers.String()) diff --git a/api/errors.go b/api/errors.go index 9d9eb5f7..78fd15d9 100644 --- a/api/errors.go +++ b/api/errors.go @@ -128,6 +128,16 @@ var ( HTTPstatus: apirest.HTTPstatusNotFound, Err: fmt.Errorf("token holder not found for the token provided"), } + ErrNoSyncedToken = apirest.APIerror{ + Code: 4024, + HTTPstatus: apirest.HTTPstatusBadRequest, + Err: fmt.Errorf("token is not synced yet"), + } + ErrMalformedRescanQueueID = apirest.APIerror{ + Code: 4025, + HTTPstatus: apirest.HTTPstatusBadRequest, + Err: fmt.Errorf("malformed queue ID"), + } ErrCantCreateToken = apirest.APIerror{ Code: 5000, HTTPstatus: apirest.HTTPstatusInternalErr, diff --git a/api/tokens.go b/api/tokens.go index 2dc6c88d..45f5e022 100644 --- a/api/tokens.go +++ b/api/tokens.go @@ -19,11 +19,13 @@ import ( queries "github.com/vocdoni/census3/db/sqlc" "github.com/vocdoni/census3/helpers/lexer" "github.com/vocdoni/census3/metrics" + "github.com/vocdoni/census3/scanner" "github.com/vocdoni/census3/scanner/providers" "github.com/vocdoni/census3/scanner/providers/web3" "go.vocdoni.io/dvote/httprouter" api "go.vocdoni.io/dvote/httprouter/apirest" "go.vocdoni.io/dvote/log" + "go.vocdoni.io/dvote/util" ) func (capi *census3API) initTokenHandlers() error { @@ -43,6 +45,14 @@ func (capi *census3API) initTokenHandlers() error { api.MethodAccessTypePublic, capi.tokenStartBlock); err != nil { return err } + if err := capi.endpoint.RegisterMethod("/tokens/update/{tokenID}", "POST", + api.MethodAccessTypeAdmin, capi.rescanToken); err != nil { + return err + } + if err := capi.endpoint.RegisterMethod("/tokens/update/queue/{queueID}", "GET", + api.MethodAccessTypeAdmin, capi.checkRescanToken); err != nil { + return err + } if err := capi.endpoint.RegisterMethod("/tokens/{tokenID}", "DELETE", api.MethodAccessTypeAdmin, capi.launchDeleteToken); err != nil { return err @@ -607,6 +617,100 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext) return ctx.Send(res, api.HTTPstatusOK) } +// rescanToken function handler enqueues the rescan process for the token with +// the given ID. The token is scanned from the creation block to the last block +// stored in the database. It returns a 400 error if the provided ID is wrong or +// empty, a 404 error if the token is not found, a 500 error if something fails +// or a 200 response if the process is enqueued. It returns a queue ID to track +// the status of the process. +func (capi *census3API) rescanToken(msg *api.APIdata, ctx *httprouter.HTTPContext) error { + // get contract address from the tokenID query param and decode check if + // it is provided, if not return an error + strAddress := ctx.URLParam("tokenID") + if strAddress == "" { + return ErrMalformedToken.With("tokenID is required") + } + address := common.HexToAddress(strAddress) + // get chainID from query params and decode it as integer, if it's not + // provided or it's not a valid integer return an error + strChainID := ctx.Request.URL.Query().Get("chainID") + if strChainID == "" { + return ErrMalformedChainID.With("chainID is required") + } + chainID, err := strconv.Atoi(strChainID) + if err != nil { + return ErrMalformedChainID.WithErr(err) + } else if chainID < 0 { + return ErrMalformedChainID.With("chainID must be a positive number") + } + // get token information from the database + internalCtx, cancel := context.WithTimeout(ctx.Request.Context(), getTokenTimeout) + defer cancel() + tokenData, err := capi.db.QueriesRO.GetToken(internalCtx, + queries.GetTokenParams{ + ID: address.Bytes(), + ChainID: uint64(chainID), + }) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return ErrNotFoundToken.WithErr(err) + } + return ErrCantGetToken.WithErr(err) + } + // only the tokens that are already synced can be rescanned + if !tokenData.Synced { + return ErrNoSyncedToken + } + // enqueue the rescan token process + id := util.RandomHex(4) + if err := capi.tokenUpdater.SetRequest(id, &scanner.UpdateRequest{ + Address: address, + ChainID: uint64(chainID), + Type: tokenData.TypeID, + CreationBlock: uint64(tokenData.CreationBlock), + EndBlock: uint64(tokenData.LastBlock), + }); err != nil { + return ErrEncodeQueueItem.WithErr(err) + } + // encoding the result and response it + res, err := json.Marshal(QueueResponse{id}) + if err != nil { + return ErrEncodeQueueItem.WithErr(err) + } + return ctx.Send(res, api.HTTPstatusOK) +} + +// checkRescanToken function handler returns the status of the rescan process +// with the given queue ID. It returns a 400 error if the provided ID is wrong +// or empty, a 404 error if the token is not found in the queue or a 500 error +// if something fails. The response contains the address of the token, the chain +// ID, the status of the process, the number of logs scanned, the number of new +// logs found, and the number of duplicated logs. +func (capi *census3API) checkRescanToken(msg *api.APIdata, ctx *httprouter.HTTPContext) error { + queueID := ctx.URLParam("queueID") + if queueID == "" { + return ErrMalformedRescanQueueID + } + // get the rescan status from the updater + status := capi.tokenUpdater.RequestStatus(queueID, true) + if status == nil { + return ErrNotFoundToken.Withf("the ID %s does not exist in the queue", queueID) + } + // encoding the result and response it + response, err := json.Marshal(RescanTokenStatus{ + Address: status.Address.String(), + ChainID: status.ChainID, + Done: status.Done, + LogsScanned: status.TotalLogs, + NewLogs: status.TotalNewLogs, + DuplicatedLogs: status.TotalAlreadyProcessedLogs, + }) + if err != nil { + return ErrEncodeQueueItem.WithErr(err) + } + return ctx.Send(response, api.HTTPstatusOK) +} + func (capi *census3API) tokenStartBlock(msg *api.APIdata, ctx *httprouter.HTTPContext) error { req := Token{} if err := json.Unmarshal(msg.Data, &req); err != nil { diff --git a/api/types.go b/api/types.go index 3501f194..2b90b64d 100644 --- a/api/types.go +++ b/api/types.go @@ -161,3 +161,17 @@ type DeleteTokenQueueResponse struct { Done bool `json:"done"` Error error `json:"error"` } + +type RescanTokenResponse struct { + ID string `json:"ID"` +} + +type RescanTokenStatus struct { + Address string `json:"address"` + ChainID uint64 `json:"chainID"` + Done bool `json:"done"` + + LogsScanned uint64 `json:"logsScanned"` + NewLogs uint64 `json:"newLogs"` + DuplicatedLogs uint64 `json:"duplicatedLogs"` +} diff --git a/cmd/census3/main.go b/cmd/census3/main.go index 2c3ed74e..5fe4a6e5 100644 --- a/cmd/census3/main.go +++ b/cmd/census3/main.go @@ -24,6 +24,8 @@ import ( "github.com/vocdoni/census3/scanner/providers/manager" "github.com/vocdoni/census3/scanner/providers/poap" web3provider "github.com/vocdoni/census3/scanner/providers/web3" + dvotedb "go.vocdoni.io/dvote/db" + "go.vocdoni.io/dvote/db/metadb" "go.vocdoni.io/dvote/log" ) @@ -39,6 +41,7 @@ type Census3Config struct { adminToken string initialTokens string farcaster bool + filtersPath string } func main() { @@ -135,6 +138,12 @@ func main() { panic(err) } config.farcaster = pviper.GetBool("farcaster") + // set the filters path into the config, create the folder if it does not + // exitst yet + config.filtersPath = config.dataDir + "/filters" + if err := os.MkdirAll(config.filtersPath, os.ModePerm); err != nil { + log.Fatal(err) + } // init logger log.Init(config.logLevel, "stdout", nil) // check if the web3 providers are defined @@ -194,8 +203,15 @@ func main() { DB: farcasterDB, }) } + // init the filters database + filtersDB, err := metadb.New(dvotedb.TypePebble, config.filtersPath) + if err != nil { + log.Fatal(err) + } + // start the token updater with the database and the provider manager + updater := scanner.NewUpdater(database, w3p, pm, filtersDB, config.scannerCoolDown) // start the holder scanner with the database and the provider manager - hc := scanner.NewScanner(database, w3p, pm, config.scannerCoolDown) + hc := scanner.NewScanner(database, updater, w3p, pm, config.scannerCoolDown) // if the admin token is not defined, generate a random one if config.adminToken != "" { if _, err := uuid.Parse(config.adminToken); err != nil { @@ -216,6 +232,7 @@ func main() { GroupKey: config.connectKey, HolderProviders: pm, AdminToken: config.adminToken, + TokenUpdater: updater, }) if err != nil { log.Fatal(err) @@ -228,7 +245,8 @@ func main() { log.Info("initial tokens created, or at least tried to") }() // start the holder scanner - go hc.Start(ctx, config.scannerConcurrentTokens) + go hc.Start(ctx) + go updater.Start(ctx, config.scannerConcurrentTokens) metrics.NewCounter(fmt.Sprintf("census3_info{version=%q,chains=%q}", internal.Version, w3p.String())).Set(1) @@ -243,6 +261,7 @@ func main() { // closing database go func() { hc.Stop() + updater.Stop() if err := apiService.Stop(); err != nil { log.Fatal(err) } diff --git a/db/treedb/treedb.go b/db/treedb/treedb.go new file mode 100644 index 00000000..4804d825 --- /dev/null +++ b/db/treedb/treedb.go @@ -0,0 +1,204 @@ +package treedb + +// The treedb package provides a wrapper of key-value database that uses a +// merkle tree under the hood. Every tree is stored in the same database, but +// with a different prefix. + +import ( + "fmt" + + "go.vocdoni.io/dvote/db" + "go.vocdoni.io/dvote/db/prefixeddb" + "go.vocdoni.io/dvote/log" + "go.vocdoni.io/dvote/tree" + "go.vocdoni.io/dvote/tree/arbo" +) + +// filterTreeLevels is the number of levels of the tree used to store the +// filter. It is a constant to avoid re-creating the tree with a different +// number of levels. The available number of leaves is 2^filterTreeLevels. +// It also limits the size of the key to filterTreeLevels/8 bytes. +const filterTreeLevels = 64 + +// ErrNotInitialized is returned when no tree is initialized in a TreeDB +// instance, which means that LoadTree has not been called and the tree is +// not ready to be used. +var ErrNotInitialized = fmt.Errorf("tree not initialized, call LoadTree first") + +// TokenFilter is a filter associated with a token. +type TreeDB struct { + prefix string + parentDB db.Database + tree *tree.Tree +} + +// LoadTree loads a tree from the database identified by the given prefix. If it +// does not exist, it creates a new tree with the given prefix. It also creates +// the index if it does not exist. It returns an error if the tree cannot be +// loaded or created. +func LoadTree(db db.Database, prefix string) (*TreeDB, error) { + treeDB := prefixeddb.NewPrefixedDatabase(db, []byte(prefix)) + tree, err := tree.New(nil, tree.Options{ + DB: treeDB, + MaxLevels: filterTreeLevels, + HashFunc: arbo.HashFunctionBlake2b, + }) + if err != nil { + return nil, err + } + // ensure index is created + wTx := tree.DB().WriteTx() + defer wTx.Discard() + return &TreeDB{ + prefix: prefix, + parentDB: db, + tree: tree, + }, wTx.Commit() +} + +// Close closes the tree database. If the tree is not nil, it closes the +// underlying database. If the parent database is not nil, it closes it too. +// It returns an error if any of the databases cannot be closed. +func (tdb *TreeDB) Close() error { + if tdb.tree != nil { + if err := tdb.tree.DB().Close(); err != nil { + return err + } + } + if tdb.parentDB != nil { + return tdb.parentDB.Close() + } + return nil +} + +// Purge deletes a tree from the database identified by current prefix. It +// iterates over all the keys in the tree and deletes them. If some key cannot +// be deleted, it logs a warning and continues with the next key. It commits the +// transaction at the end. +func (tdb *TreeDB) Purge() error { + treeDB := prefixeddb.NewPrefixedDatabase(tdb.parentDB, []byte(tdb.prefix)) + wTx := treeDB.WriteTx() + if err := treeDB.Iterate(nil, func(k, _ []byte) bool { + if err := wTx.Delete(k); err != nil { + log.Warnw("error deleting key", "key", k, "err", err) + } + return true + }); err != nil { + return err + } + return wTx.Commit() +} + +// Add adds a key to the tree. It no write transaction is provided, it creates +// a new one and commits it at the end. It returns an error if the tree is not +// initialized, if there is an error adding the key-value pair or committing +// the transaction if it was created. If a transaction is provided, it does +// not commit or discard it. +func (tdb *TreeDB) Add(wtx db.WriteTx, key, value []byte) error { + if tdb.tree == nil { + return ErrNotInitialized + } + commitTx := wtx == nil + if commitTx { + wtx = tdb.tree.DB().WriteTx() + defer wtx.Discard() + } + if err := tdb.tree.Add(wtx, key, value); err != nil { + return err + } + if commitTx { + return wtx.Commit() + } + return nil +} + +// Del deletes a key from the tree. If no write transaction is provided, it +// creates a new one and commits it at the end. It returns an error if the tree +// is not initialized, if there is an error deleting the key-value pair or +// committing the transaction if it was provided. If a transaction is provided, +// it does not commit or discard it. +func (tdb *TreeDB) Del(wtx db.WriteTx, key []byte) error { + if tdb.tree == nil { + return ErrNotInitialized + } + commitTx := wtx == nil + if commitTx { + wtx = tdb.tree.DB().WriteTx() + defer wtx.Discard() + } + if err := tdb.tree.Del(wtx, key); err != nil { + return err + } + if commitTx { + return wtx.Commit() + } + return nil +} + +// AddBatch adds a batch of keys and values to the tree. It is more efficient +// than calling Add for each key-value pair. It returns an error if the length +// of keys and values is different, if the tree is not initialized, if there +// is an error adding a key-value pair or committing the transaction. It uses +// a new write transaction to add all the keys and commits it at the end. If +// something goes wrong, it returns an error and discards the transaction. +func (tdb *TreeDB) AddBatch(keys, values [][]byte) error { + if tdb.tree == nil { + return ErrNotInitialized + } + if len(keys) != len(values) { + return fmt.Errorf("keys and values must have the same length") + } + wTx := tdb.tree.DB().WriteTx() + defer wTx.Discard() + for i := range keys { + if err := tdb.tree.Add(wTx, keys[i], values[i]); err != nil { + return err + } + } + return wTx.Commit() +} + +// AddKey adds a key to the tree with nil value. It accepts variadic keys. It +// uses a new write transaction to add all the keys and commits it at the end. +// If something goes wrong, it returns an error and discards the transaction. +func (tdb *TreeDB) AddKey(key ...[]byte) error { + if tdb.tree == nil { + return ErrNotInitialized + } + wtx := tdb.tree.DB().WriteTx() + defer wtx.Discard() + for _, k := range key { + if err := tdb.tree.Add(wtx, k, nil); err != nil { + return err + } + } + return wtx.Commit() +} + +// CheckKeyKey checks if a key is in the tree. +func (tdb *TreeDB) CheckKey(key []byte) (bool, error) { + if tdb.tree == nil { + return false, ErrNotInitialized + } + _, err := tdb.tree.Get(nil, key) + if err != nil { + if err == arbo.ErrKeyNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +// CheckAndAddKey checks if a key is in the tree, if not, add it to the tree. It +// is the combination of CheckKey and conditional AddKey. +func (tdb *TreeDB) CheckAndAddKey(key []byte) (bool, error) { + exists, err := tdb.CheckKey(key) + if err != nil { + return false, err + } + if exists { + return true, nil + } + return false, tdb.AddKey(key) +} diff --git a/helpers/web3/web3_client.go b/helpers/web3/web3_client.go index ca0ca6f5..0b145d64 100644 --- a/helpers/web3/web3_client.go +++ b/helpers/web3/web3_client.go @@ -16,7 +16,7 @@ const defaultRetries = 3 var ( defaultTimeout = 2 * time.Second - filterLogsTimeout = 3 * time.Second + filterLogsTimeout = 15 * time.Second retrySleep = 200 * time.Millisecond ) @@ -32,6 +32,12 @@ type Client struct { // EthClient method returns the ethclient.Client for the chainID of the Client // instance. It returns an error if the chainID is not found in the pool. func (c *Client) EthClient() (*ethclient.Client, error) { + if c == nil { + return nil, fmt.Errorf("web3 client is nil") + } + if c.w3p == nil { + return nil, fmt.Errorf("web3 pool is nil") + } endpoint, err := c.w3p.Endpoint(c.chainID) if err != nil { return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) diff --git a/scanner/const.go b/scanner/const.go index 0c46989e..4b56ab5a 100644 --- a/scanner/const.go +++ b/scanner/const.go @@ -3,12 +3,20 @@ package scanner import "time" const ( + // READ_TIMEOUT is the timeout to get sorted tokens to scan from the database READ_TIMEOUT = time.Minute - SCAN_TIMEOUT = 5 * time.Minute + // SAVE_TIMEOUT is the timeout to save the scanned tokens to the database SAVE_TIMEOUT = 5 * time.Minute + // PREPARE_TIMEOUT is the timeout to prepare the tokens to scan (calculate + // the birth block number, etc.) + PREPARE_TIMEOUT = 5 * time.Minute + // UPDATE_TIMEOUT is the timeout to update the tokens from their holders + // providers + UPDATE_TIMEOUT = 15 * time.Minute ) const ( + coolDown = 15 * time.Second // time to wait between updates scanSleepTime = time.Second * 20 // time to sleep between scans scanSleepTimeOnceSync = time.Second * 120 // time to sleep between scans, once all the tokens are synced blockNumbersCooldown = 5 * time.Minute // time to wait to update latest block numbers of every supported networkd diff --git a/scanner/helpers.go b/scanner/helpers.go new file mode 100644 index 00000000..2eea3a92 --- /dev/null +++ b/scanner/helpers.go @@ -0,0 +1,134 @@ +package scanner + +import ( + "context" + "database/sql" + "errors" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/vocdoni/census3/db" + "github.com/vocdoni/census3/db/annotations" + queries "github.com/vocdoni/census3/db/sqlc" + "go.vocdoni.io/dvote/log" +) + +// SaveHolders saves the given holders in the database. It updates the token +// synced status if it is different from the received one. Then, it creates, +// updates or deletes the token holders in the database depending on the +// calculated balance. +// WARNING: the following code could produce holders with negative balances +// in the database. This is because the scanner does not know if the token +// holder is a contract or not, so it does not know if the balance is +// correct or not. The scanner assumes that the balance is correct and +// updates it in the database: +// 1. To get the correct holders from the database you must filter the +// holders with negative balances. +// 2. To get the correct balances you must use the contract methods to get +// the balances of the holders. +func SaveHolders(db *db.DB, ctx context.Context, token ScannerToken, + holders map[common.Address]*big.Int, newTransfers, lastBlock uint64, + synced bool, totalSupply *big.Int, +) (int, int, error) { + // create a tx to use it in the following queries + tx, err := db.RW.BeginTx(ctx, nil) + if err != nil { + return 0, 0, err + } + defer func() { + if err := tx.Rollback(); err != nil && !errors.Is(sql.ErrTxDone, err) { + log.Errorf("error rolling back tx: %v, token=%s chainID=%d externalID=%s", + err, token.Address.Hex(), token.ChainID, token.ExternalID) + } + }() + qtx := db.QueriesRW.WithTx(tx) + // create, update or delete token holders + created, updated := 0, 0 + for addr, balance := range holders { + // get the current token holder from the database + currentTokenHolder, err := qtx.GetTokenHolderEvenZero(ctx, queries.GetTokenHolderEvenZeroParams{ + TokenID: token.Address.Bytes(), + ChainID: token.ChainID, + ExternalID: token.ExternalID, + HolderID: addr.Bytes(), + }) + if err != nil { + if !errors.Is(sql.ErrNoRows, err) { + return created, updated, err + } + // if the token holder not exists, create it + _, err = qtx.CreateTokenHolder(ctx, queries.CreateTokenHolderParams{ + TokenID: token.Address.Bytes(), + ChainID: token.ChainID, + ExternalID: token.ExternalID, + HolderID: addr.Bytes(), + BlockID: lastBlock, + Balance: balance.String(), + }) + if err != nil { + return created, updated, err + } + created++ + continue + } + // parse the current balance of the holder + currentBalance, ok := new(big.Int).SetString(currentTokenHolder.Balance, 10) + if !ok { + return created, updated, fmt.Errorf("error parsing current token holder balance") + } + // if both balances are zero, continue with the next holder to prevent + // UNIQUES constraint errors + if balance.Cmp(big.NewInt(0)) == 0 && currentBalance.Cmp(big.NewInt(0)) == 0 { + continue + } + // calculate the new balance of the holder by adding the current balance + // and the new balance + newBalance := new(big.Int).Add(currentBalance, balance) + // update the token holder in the database with the new balance. + // WANING: the balance could be negative so you must filter the holders + // with negative balances to get the correct holders from the database. + _, err = qtx.UpdateTokenHolderBalance(ctx, queries.UpdateTokenHolderBalanceParams{ + TokenID: token.Address.Bytes(), + ChainID: token.ChainID, + ExternalID: token.ExternalID, + HolderID: addr.Bytes(), + BlockID: currentTokenHolder.BlockID, + NewBlockID: lastBlock, + Balance: newBalance.String(), + }) + if err != nil { + return created, updated, fmt.Errorf("error updating token holder: %w", err) + } + updated++ + } + // get the token info from the database to update ir + tokenInfo, err := qtx.GetToken(ctx, + queries.GetTokenParams{ + ID: token.Address.Bytes(), + ChainID: token.ChainID, + ExternalID: token.ExternalID, + }) + if err != nil { + return created, updated, err + } + // update the synced status, last block, the number of analysed transfers + // (for debug) and the total supply in the database + _, err = qtx.UpdateTokenStatus(ctx, queries.UpdateTokenStatusParams{ + ID: token.Address.Bytes(), + ChainID: token.ChainID, + ExternalID: token.ExternalID, + Synced: synced, + LastBlock: int64(lastBlock), + AnalysedTransfers: tokenInfo.AnalysedTransfers + int64(newTransfers), + TotalSupply: annotations.BigInt(totalSupply.String()), + }) + if err != nil { + return created, updated, err + } + // close the database tx and commit it + if err := tx.Commit(); err != nil { + return created, updated, err + } + return created, updated, nil +} diff --git a/scanner/providers/farcaster/provider.go b/scanner/providers/farcaster/provider.go index cbba64ac..0a959347 100644 --- a/scanner/providers/farcaster/provider.go +++ b/scanner/providers/farcaster/provider.go @@ -179,14 +179,20 @@ func (p *FarcasterProvider) SetLastBlockNumber(blockNumber uint64) { // internal database and the current holders from the scanner and calculates the // partial holders. func (p *FarcasterProvider) HoldersBalances(ctx context.Context, _ []byte, fromBlock uint64) ( - map[common.Address]*big.Int, uint64, uint64, bool, *big.Int, error, + map[common.Address]*big.Int, *providers.BlocksDelta, error, ) { // check if both contracts are synced isSynced := globallySynced.Load() // get current holders from internal db appKeys, err := p.db.QueriesRO.ListAppKeys(ctx) if err != nil && !errors.Is(err, sql.ErrNoRows) { - return nil, 0, fromBlock, isSynced, nil, fmt.Errorf("cannot get app keys from farcaster DB %s", err.Error()) + return nil, &providers.BlocksDelta{ + LogsCount: 0, + NewLogsCount: 0, + AlreadyProcessedLogsCount: 0, + Block: fromBlock, + Synced: isSynced, + }, fmt.Errorf("cannot get app keys from farcaster DB %s", err.Error()) } currentHolders := make(map[common.Address]*big.Int) for _, appKey := range appKeys { @@ -201,7 +207,14 @@ func (p *FarcasterProvider) HoldersBalances(ctx context.Context, _ []byte, fromB } p.currentScannerHoldersMtx.Unlock() resultingHolders := providers.CalcPartialHolders(currentScannerHolders, currentHolders) - return resultingHolders, uint64(len(resultingHolders)), p.contracts.lastBlock.Load(), isSynced, totalSupply, nil + return resultingHolders, &providers.BlocksDelta{ + LogsCount: uint64(len(resultingHolders)), + NewLogsCount: uint64(len(resultingHolders)), + AlreadyProcessedLogsCount: uint64(len(resultingHolders)), + Block: p.contracts.lastBlock.Load(), + Synced: isSynced, + TotalSupply: totalSupply, + }, nil } // Close method is not implemented for Farcaster Key Registry. diff --git a/scanner/providers/gitcoin/gitcoin_provider.go b/scanner/providers/gitcoin/gitcoin_provider.go index 8a11f309..cd1136f2 100644 --- a/scanner/providers/gitcoin/gitcoin_provider.go +++ b/scanner/providers/gitcoin/gitcoin_provider.go @@ -141,7 +141,7 @@ func (g *GitcoinPassport) SetLastBalances(_ context.Context, _ []byte, } func (g *GitcoinPassport) HoldersBalances(ctx context.Context, stamp []byte, _ uint64) ( - map[common.Address]*big.Int, uint64, uint64, bool, *big.Int, error, + map[common.Address]*big.Int, *providers.BlocksDelta, error, ) { internalCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -153,13 +153,13 @@ func (g *GitcoinPassport) HoldersBalances(ctx context.Context, stamp []byte, _ u if len(stamp) > 0 { dbStampScores, err := g.db.QueriesRW.GetStampScores(internalCtx, string(stamp)) if err != nil { - return nil, 0, 0, false, big.NewInt(0), fmt.Errorf("error getting stamp scores: %w", err) + return nil, nil, fmt.Errorf("error getting stamp scores: %w", err) } for _, dbStampScore := range dbStampScores { address := common.HexToAddress(string(dbStampScore.Address)) score, ok := new(big.Int).SetString(string(dbStampScore.Score), 10) if !ok { - return nil, 0, 0, false, big.NewInt(0), fmt.Errorf("error parsing score: %w", err) + return nil, nil, fmt.Errorf("error parsing score: %w", err) } currentScores[address] = score totalSupply.Add(totalSupply, score) @@ -167,13 +167,13 @@ func (g *GitcoinPassport) HoldersBalances(ctx context.Context, stamp []byte, _ u } else { dbScores, err := g.db.QueriesRW.GetScores(internalCtx) if err != nil { - return nil, 0, 0, false, big.NewInt(0), fmt.Errorf("error getting scores: %w", err) + return nil, nil, fmt.Errorf("error getting scores: %w", err) } for _, dbScore := range dbScores { address := common.HexToAddress(string(dbScore.Address)) score, ok := new(big.Int).SetString(string(dbScore.Score), 10) if !ok { - return nil, 0, 0, false, big.NewInt(0), fmt.Errorf("error parsing score: %w", err) + return nil, nil, fmt.Errorf("error parsing score: %w", err) } currentScores[address] = score totalSupply.Add(totalSupply, score) @@ -185,7 +185,14 @@ func (g *GitcoinPassport) HoldersBalances(ctx context.Context, stamp []byte, _ u holders := providers.CalcPartialHolders(g.currentBalances, currentScores) // return the balances, 1 new transfer, the current time as lastBlock, true // as a synced and the computed totalSupply - return holders, 1, uint64(time.Now().Unix()), synced, totalSupply, nil + return holders, &providers.BlocksDelta{ + LogsCount: 1, + NewLogsCount: 1, + AlreadyProcessedLogsCount: 0, + Block: uint64(time.Now().Unix()), + Synced: synced, + TotalSupply: totalSupply, + }, nil } // Close cancels the download context. diff --git a/scanner/providers/gitcoin/gitcoin_provider_test.go b/scanner/providers/gitcoin/gitcoin_provider_test.go index cac7457e..10d320df 100644 --- a/scanner/providers/gitcoin/gitcoin_provider_test.go +++ b/scanner/providers/gitcoin/gitcoin_provider_test.go @@ -44,13 +44,13 @@ func TestGitcoinPassport(t *testing.T) { provider := new(GitcoinPassport) c.Assert(provider.Init(ctx, GitcoinPassportConf{endpoints["/original"], time.Second, testDB}), qt.IsNil) // start the first download - emptyBalances, _, _, _, _, err := provider.HoldersBalances(context.TODO(), nil, 0) + emptyBalances, _, err := provider.HoldersBalances(context.TODO(), nil, 0) c.Assert(err, qt.IsNil) c.Assert(len(emptyBalances), qt.Equals, 0) // wait for the download to finish - time.Sleep(2 * time.Second) + time.Sleep(5 * time.Second) // check the balances - holders, _, _, _, _, err := provider.HoldersBalances(context.TODO(), nil, 0) + holders, _, err := provider.HoldersBalances(context.TODO(), nil, 0) c.Assert(err, qt.IsNil) c.Assert(len(holders), qt.Equals, len(expectedOriginalHolders)) for addr, balance := range holders { @@ -61,7 +61,7 @@ func TestGitcoinPassport(t *testing.T) { } c.Assert(provider.SetLastBalances(context.TODO(), nil, holders, 0), qt.IsNil) // start the second download expecting to use the cached data - sameBalances, _, _, _, _, err := provider.HoldersBalances(context.TODO(), nil, 0) + sameBalances, _, err := provider.HoldersBalances(context.TODO(), nil, 0) c.Assert(err, qt.IsNil) // empty results because the data the same c.Assert(len(sameBalances), qt.Equals, 0) @@ -73,7 +73,7 @@ func TestGitcoinPassport(t *testing.T) { // new endpoint with one change time.Sleep(time.Second * 5) c.Assert(newProvider.SetLastBalances(context.TODO(), nil, holders, 0), qt.IsNil) - holders, _, _, _, _, err = newProvider.HoldersBalances(context.TODO(), nil, 1) + holders, _, err = newProvider.HoldersBalances(context.TODO(), nil, 1) c.Assert(err, qt.IsNil) c.Assert(len(holders), qt.Equals, len(expectedUpdatedHolders)) for addr, balance := range holders { diff --git a/scanner/providers/holders_provider.go b/scanner/providers/holders_provider.go index a87707d0..c031b62c 100644 --- a/scanner/providers/holders_provider.go +++ b/scanner/providers/holders_provider.go @@ -7,6 +7,30 @@ import ( "github.com/ethereum/go-ethereum/common" ) +// BlocksDelta struct defines the delta of blocks processed by any +// HolderProvider. It includes the total number of logs processed, the new logs +// processed, the logs already processed, the last block processed, and if the +// provider is synced. It also includes the current total supply of the token +// set in the provider. +type BlocksDelta struct { + LogsCount uint64 + NewLogsCount uint64 + AlreadyProcessedLogsCount uint64 + Block uint64 + Synced bool + TotalSupply *big.Int + NewLogs [][]byte +} + +// Filter interface defines the basic methods to interact with a filter to +// store the processed transfers identifiers and avoid to process them again, +// for example, if a token is rescanned. It allows to implement different +// filters, such as in-memory, disk, merkle tree, etc. +type Filter interface { + CheckKey(key []byte) (bool, error) + CheckAndAddKey(key []byte) (bool, error) +} + // HolderProvider is the interface that wraps the basic methods to interact with // a holders provider. It is used by the HoldersScanner to get the balances of // the token holders. It allows to implement different providers, such as @@ -32,7 +56,7 @@ type HolderProvider interface { // HoldersBalances returns the balances of the token holders for the given // id and delta point in time, from the stored last snapshot. It also // returns the total supply of tokens as a *big.Int. - HoldersBalances(ctx context.Context, id []byte, to uint64) (map[common.Address]*big.Int, uint64, uint64, bool, *big.Int, error) + HoldersBalances(ctx context.Context, id []byte, to uint64) (map[common.Address]*big.Int, *BlocksDelta, error) // Close closes the provider and its internal structures. Close() error // IsExternal returns true if the provider is an external API. diff --git a/scanner/providers/poap/poap_provider.go b/scanner/providers/poap/poap_provider.go index d264bdab..19223121 100644 --- a/scanner/providers/poap/poap_provider.go +++ b/scanner/providers/poap/poap_provider.go @@ -128,7 +128,7 @@ func (p *POAPHolderProvider) SetLastBalances(_ context.Context, id []byte, // API parsing every POAP holder for the event ID provided and calculate the // balances of the token holders from the last snapshot. func (p *POAPHolderProvider) HoldersBalances(_ context.Context, id []byte, delta uint64) ( - map[common.Address]*big.Int, uint64, uint64, bool, *big.Int, error, + map[common.Address]*big.Int, *providers.BlocksDelta, error, ) { // parse eventID from id eventID := string(id) @@ -136,7 +136,7 @@ func (p *POAPHolderProvider) HoldersBalances(_ context.Context, id []byte, delta // get last snapshot newSnapshot, err := p.lastHolders(eventID) if err != nil { - return nil, 0, 0, false, big.NewInt(0), err + return nil, nil, err } p.snapshotsMtx.RLock() defer p.snapshotsMtx.RUnlock() @@ -159,7 +159,14 @@ func (p *POAPHolderProvider) HoldersBalances(_ context.Context, id []byte, delta totalSupply.Add(totalSupply, balance) } // return the final snapshot - return finalSnapshot, uint64(len(finalSnapshot)), from, true, totalSupply, nil + return finalSnapshot, &providers.BlocksDelta{ + LogsCount: uint64(len(finalSnapshot)), + NewLogsCount: uint64(len(newSnapshot)), + AlreadyProcessedLogsCount: 0, + Block: from, + Synced: true, + TotalSupply: totalSupply, + }, nil } // Close method is not implemented in the POAP external provider. By default it diff --git a/scanner/providers/poap/poap_provider_test.go b/scanner/providers/poap/poap_provider_test.go index adef6074..b50867b8 100644 --- a/scanner/providers/poap/poap_provider_test.go +++ b/scanner/providers/poap/poap_provider_test.go @@ -39,7 +39,7 @@ func TestPOAP(t *testing.T) { provider := new(POAPHolderProvider) c.Assert(provider.Init(ctx, POAPConfig{endpoints["/original"], "no-token"}), qt.IsNil) - holders, _, _, _, _, err := provider.HoldersBalances(context.TODO(), nil, 0) + holders, _, err := provider.HoldersBalances(context.TODO(), nil, 0) c.Assert(err, qt.IsNil) c.Assert(len(holders), qt.Equals, len(expectedOriginalHolders)) for addr, balance := range holders { @@ -47,13 +47,13 @@ func TestPOAP(t *testing.T) { c.Assert(exists, qt.Equals, true) c.Assert(balance.String(), qt.Equals, expectedBalance) } - sameBalances, _, _, _, _, err := provider.HoldersBalances(context.TODO(), nil, 0) + sameBalances, _, err := provider.HoldersBalances(context.TODO(), nil, 0) c.Assert(err, qt.IsNil) // empty results because the data the same c.Assert(len(sameBalances), qt.Equals, 0) provider.apiEndpoint = endpoints["/updated"] - holders, _, _, _, _, err = provider.HoldersBalances(context.TODO(), nil, 0) + holders, _, err = provider.HoldersBalances(context.TODO(), nil, 0) c.Assert(err, qt.IsNil) c.Assert(len(holders), qt.Equals, len(expectedUpdatedHolders)) for addr, balance := range holders { diff --git a/scanner/providers/web3/erc20_provider.go b/scanner/providers/web3/erc20_provider.go index 16ceabbe..8d9765b0 100644 --- a/scanner/providers/web3/erc20_provider.go +++ b/scanner/providers/web3/erc20_provider.go @@ -1,7 +1,9 @@ package web3 import ( + "bytes" "context" + "crypto/sha256" "errors" "fmt" "math/big" @@ -9,6 +11,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" erc20 "github.com/vocdoni/census3/contracts/erc/erc20" "github.com/vocdoni/census3/helpers/web3" "github.com/vocdoni/census3/scanner/providers" @@ -29,6 +32,7 @@ type ERC20HolderProvider struct { creationBlock uint64 lastNetworkBlock uint64 synced atomic.Bool + filter providers.Filter } func (p *ERC20HolderProvider) Init(_ context.Context, iconf any) error { @@ -65,6 +69,8 @@ func (p *ERC20HolderProvider) SetRef(iref any) error { if err != nil { return fmt.Errorf("error getting web3 client for the given chainID: %w", err) } + // set the filter provided in the reference + p.filter = ref.Filter // set the client, parse the address and initialize the contract p.address = common.HexToAddress(ref.HexAddress) if p.contract, err = erc20.NewERC20Contract(p.address, p.client); err != nil { @@ -114,15 +120,14 @@ func (p *ERC20HolderProvider) SetLastBlockNumber(blockNumber uint64) { // of new transfers, the last block scanned, if the provider is synced and an // error if it exists. func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fromBlock uint64) ( - map[common.Address]*big.Int, uint64, uint64, bool, *big.Int, error, + map[common.Address]*big.Int, *providers.BlocksDelta, error, ) { // if the last network block is lower than the last scanned block, and the // last scanned block is equal to the creation block, it means that the // last network block is outdated, so it returns that it is not synced and // an error if fromBlock >= p.lastNetworkBlock && fromBlock == p.creationBlock { - return nil, 0, fromBlock, false, big.NewInt(0), - fmt.Errorf("outdated last network block, it will retry in the next iteration") + return nil, nil, fmt.Errorf("outdated last network block, it will retry in the next iteration") } // calculate the range of blocks to scan, by default take the last block // scanned and scan to the latest block, calculate the latest block if the @@ -132,7 +137,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro var err error toBlock, err = p.LatestBlockNumber(ctx, nil) if err != nil { - return nil, 0, fromBlock, false, big.NewInt(0), err + return nil, nil, err } } log.Infow("scan iteration", @@ -145,21 +150,57 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro startTime := time.Now() logs, lastBlock, synced, err := RangeOfLogs(ctx, p.client, p.address, fromBlock, toBlock, LOG_TOPIC_ERC20_TRANSFER) if err != nil && !errors.Is(err, ErrTooManyRequests) { - return nil, 0, fromBlock, false, big.NewInt(0), err + return nil, nil, err } if errors.Is(err, ErrTooManyRequests) { log.Warnf("too many requests, the provider will continue in the next iteration from block %d", lastBlock) } // encode the number of new transfers - newTransfers := uint64(len(logs)) + newTransfers := uint64(0) + alreadyProcessedLogs := uint64(0) balances := make(map[common.Address]*big.Int) // iterate the logs and update the balances + log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs)) + processedLogs := &PartialProcessedLogs{} for _, currentLog := range logs { + // skip the log if it has been removed + if currentLog.Removed { + continue + } + // parse log data logData, err := p.contract.ERC20ContractFilterer.ParseTransfer(currentLog) if err != nil { - return nil, newTransfers, lastBlock, false, big.NewInt(0), - errors.Join(ErrParsingTokenLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err)) + return nil, &providers.BlocksDelta{ + Block: lastBlock, + LogsCount: uint64(len(logs)), + NewLogsCount: newTransfers, + AlreadyProcessedLogsCount: alreadyProcessedLogs, + Synced: false, + TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, + }, errors.Join(ErrParsingTokenLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err)) + } + // check if the log has been already processed and add it to the filter + // if it is not already included + processed, err := p.isLogAlreadyProcessed(currentLog, processedLogs) + if err != nil { + return nil, &providers.BlocksDelta{ + Block: lastBlock, + LogsCount: uint64(len(logs)), + NewLogsCount: newTransfers, + AlreadyProcessedLogsCount: alreadyProcessedLogs, + Synced: false, + TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, + }, errors.Join(ErrCheckingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err)) + } + // if it is the first scan, it will not check if the log has been + // already processed + if processed { + alreadyProcessedLogs++ + continue } + newTransfers++ // update balances if toBalance, ok := balances[logData.To]; ok { balances[logData.To] = new(big.Int).Add(toBalance, logData.Value) @@ -172,19 +213,28 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro balances[logData.From] = new(big.Int).Neg(logData.Value) } } - log.Infow("saving blocks", + log.Infow("logs parsed", "count", len(balances), - "logs", len(logs), + "new_logs", newTransfers, + "already_processed_logs", alreadyProcessedLogs, "blocks/s", 1000*float32(lastBlock-fromBlock)/float32(time.Since(startTime).Milliseconds()), "took", time.Since(startTime).Seconds(), "progress", fmt.Sprintf("%d%%", (fromBlock*100)/toBlock)) p.synced.Store(synced) - totalSupply, err := p.TotalSupply(nil) - if err != nil { - log.Warn("error getting total supply, it will retry in the next iteration", "error", err) - return balances, newTransfers, lastBlock, synced, nil, nil + + delta := &providers.BlocksDelta{ + Block: lastBlock, + LogsCount: uint64(len(logs)), + NewLogsCount: newTransfers, + AlreadyProcessedLogsCount: alreadyProcessedLogs, + Synced: synced, + TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, } - return balances, newTransfers, lastBlock, synced, totalSupply, nil + if delta.TotalSupply, err = p.TotalSupply(nil); err != nil { + log.Warnw("error getting total supply, it will retry in the next iteration", "error", err) + } + return balances, delta, nil } // Close method is not implemented for ERC20 tokens. @@ -339,3 +389,43 @@ func (p *ERC20HolderProvider) IconURI(_ []byte) (string, error) { func (p *ERC20HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map[common.Address]*big.Int, error) { return data, nil } + +// isLogAlreadyProcessed returns true if the log with the given block number and +// log index has been already processed. It uses a filter to check if the log +// has been processed. To identify the log, it creates a hash with the block +// number and log index. It returns true if the log has been already processed +// or false if it has not been processed yet. If some error occurs, it returns +// false and the error. +func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log, pl *PartialProcessedLogs) (bool, error) { + // if the filter is not defined, return false + if p.filter == nil { + return false, nil + } + // get a identifier of each transfer: + // sha256(blockNumber-txHash-log.Index) + transferID := fmt.Sprintf("%d-%x-%d", l.BlockNumber, l.TxHash, l.Index) + hashFn := sha256.New() + if _, err := hashFn.Write([]byte(transferID)); err != nil { + return false, err + } + // check if the hash is in the filter + hID := hashFn.Sum(nil)[:8] + exists, err := p.filter.CheckKey(hID) + if err != nil { + return false, err + } + if exists { + return true, nil + } + // if the hash is not in the filter, check if it is in the partial filter + logs := *pl + for _, id := range logs { + if bytes.Equal(id, hID) { + return true, nil + } + } + // add the hash to the partial filter if it has not been processed and return + logs = append(logs, hID) + *pl = logs + return false, nil +} diff --git a/scanner/providers/web3/erc721_provider.go b/scanner/providers/web3/erc721_provider.go index f83112a8..d50cd270 100644 --- a/scanner/providers/web3/erc721_provider.go +++ b/scanner/providers/web3/erc721_provider.go @@ -1,7 +1,9 @@ package web3 import ( + "bytes" "context" + "crypto/sha256" "errors" "fmt" "math/big" @@ -9,6 +11,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" erc721 "github.com/vocdoni/census3/contracts/erc/erc721" "github.com/vocdoni/census3/helpers/web3" "github.com/vocdoni/census3/scanner/providers" @@ -29,6 +32,7 @@ type ERC721HolderProvider struct { creationBlock uint64 lastNetworkBlock uint64 synced atomic.Bool + filter providers.Filter } func (p *ERC721HolderProvider) Init(_ context.Context, iconf any) error { @@ -65,6 +69,8 @@ func (p *ERC721HolderProvider) SetRef(iref any) error { if err != nil { return fmt.Errorf("error getting web3 client for the given chainID: %w", err) } + // set the filter provided in the reference + p.filter = ref.Filter // set the client, parse the address and initialize the contract address := common.HexToAddress(ref.HexAddress) if p.contract, err = erc721.NewERC721Contract(address, p.client); err != nil { @@ -112,15 +118,14 @@ func (p *ERC721HolderProvider) SetLastBlockNumber(blockNumber uint64) { // of new transfers, the last block scanned, if the provider is synced and an // error if it exists. func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fromBlock uint64) ( - map[common.Address]*big.Int, uint64, uint64, bool, *big.Int, error, + map[common.Address]*big.Int, *providers.BlocksDelta, error, ) { // if the last network block is lower than the last scanned block, and the // last scanned block is equal to the creation block, it means that the // last network block is outdated, so it returns that it is not synced and // an error if fromBlock >= p.lastNetworkBlock && fromBlock == p.creationBlock { - return nil, 0, fromBlock, false, big.NewInt(0), - errors.New("outdated last network block, it will retry in the next iteration") + return nil, nil, fmt.Errorf("outdated last network block, it will retry in the next iteration") } // calculate the range of blocks to scan, by default take the last block // scanned and scan to the latest block, calculate the latest block if the @@ -130,7 +135,7 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr var err error toBlock, err = p.LatestBlockNumber(ctx, nil) if err != nil { - return nil, 0, fromBlock, false, nil, err + return nil, nil, err } } log.Infow("scan iteration", @@ -143,20 +148,57 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr startTime := time.Now() logs, lastBlock, synced, err := RangeOfLogs(ctx, p.client, p.address, fromBlock, toBlock, LOG_TOPIC_ERC20_TRANSFER) if err != nil && !errors.Is(err, ErrTooManyRequests) { - return nil, 0, fromBlock, false, big.NewInt(0), err + return nil, nil, err } if errors.Is(err, ErrTooManyRequests) { log.Warnf("too many requests, the provider will continue in the next iteration from block %d", lastBlock) } // encode the number of new transfers - newTransfers := uint64(len(logs)) + newTransfers := uint64(0) + alreadyProcessedLogs := uint64(0) balances := make(map[common.Address]*big.Int) // iterate the logs and update the balances + log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs)) + processedLogs := &PartialProcessedLogs{} for _, currentLog := range logs { + // skip the log if it has been removed + if currentLog.Removed { + continue + } + // parse log data logData, err := p.contract.ERC721ContractFilterer.ParseTransfer(currentLog) if err != nil { - return nil, newTransfers, lastBlock, false, nil, fmt.Errorf("[ERC721] %w: %s: %w", ErrParsingTokenLogs, p.address.Hex(), err) + return nil, &providers.BlocksDelta{ + Block: lastBlock, + LogsCount: uint64(len(logs)), + NewLogsCount: newTransfers, + AlreadyProcessedLogsCount: alreadyProcessedLogs, + Synced: false, + TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, + }, errors.Join(ErrParsingTokenLogs, fmt.Errorf("[ERC721] %s: %w", p.address, err)) + } + // check if the log has been already processed and add it to the filter + // if it is not already included + processed, err := p.isLogAlreadyProcessed(currentLog, processedLogs) + if err != nil { + return nil, &providers.BlocksDelta{ + Block: lastBlock, + LogsCount: uint64(len(logs)), + NewLogsCount: newTransfers, + AlreadyProcessedLogsCount: alreadyProcessedLogs, + Synced: false, + TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, + }, errors.Join(ErrCheckingProcessedLogs, fmt.Errorf("[ERC721] %s: %w", p.address, err)) + } + // if it is the first scan, it will not check if the log has been + // already processed + if processed { + alreadyProcessedLogs++ + continue } + newTransfers++ // update balances if toBalance, ok := balances[logData.To]; ok { balances[logData.To] = new(big.Int).Add(toBalance, big.NewInt(1)) @@ -169,14 +211,28 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr balances[logData.From] = big.NewInt(-1) } } - log.Infow("saving blocks", + log.Infow("logs parsed", "count", len(balances), - "logs", len(logs), + "new_logs", newTransfers, + "already_processed_logs", alreadyProcessedLogs, "blocks/s", 1000*float32(lastBlock-fromBlock)/float32(time.Since(startTime).Milliseconds()), "took", time.Since(startTime).Seconds(), "progress", fmt.Sprintf("%d%%", (fromBlock*100)/toBlock)) p.synced.Store(synced) - return balances, newTransfers, lastBlock, synced, nil, nil + + delta := &providers.BlocksDelta{ + Block: lastBlock, + LogsCount: uint64(len(logs)), + NewLogsCount: newTransfers, + AlreadyProcessedLogsCount: alreadyProcessedLogs, + Synced: synced, + TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, + } + if delta.TotalSupply, err = p.TotalSupply(nil); err != nil { + log.Warnw("error getting total supply, it will retry in the next iteration", "error", err) + } + return balances, delta, nil } // Close method is not implemented for ERC721 tokens. @@ -329,3 +385,43 @@ func (p *ERC721HolderProvider) IconURI(_ []byte) (string, error) { func (p *ERC721HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map[common.Address]*big.Int, error) { return data, nil } + +// isLogAlreadyProcessed returns true if the log with the given block number and +// log index has been already processed. It uses a filter to check if the log +// has been processed. To identify the log, it creates a hash with the block +// number and log index. It returns true if the log has been already processed +// or false if it has not been processed yet. If some error occurs, it returns +// false and the error. +func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log, pl *PartialProcessedLogs) (bool, error) { + // if the filter is not defined, return false + if p.filter == nil { + return false, nil + } + // get a identifier of each transfer: + // sha256(blockNumber-txHash-log.Index) + transferID := fmt.Sprintf("%d-%x-%d", l.BlockNumber, l.TxHash, l.Index) + hashFn := sha256.New() + if _, err := hashFn.Write([]byte(transferID)); err != nil { + return false, err + } + // check if the hash is in the filter + hID := hashFn.Sum(nil)[:8] + exists, err := p.filter.CheckKey(hID) + if err != nil { + return false, err + } + if exists { + return true, nil + } + // if the hash is not in the filter, check if it is in the partial filter + logs := *pl + for _, id := range logs { + if bytes.Equal(id, hID) { + return true, nil + } + } + // add the hash to the partial filter if it has not been processed and return + logs = append(logs, hID) + *pl = logs + return false, nil +} diff --git a/scanner/providers/web3/erc777_provider.go b/scanner/providers/web3/erc777_provider.go index b5b3fd6f..a9d790a6 100644 --- a/scanner/providers/web3/erc777_provider.go +++ b/scanner/providers/web3/erc777_provider.go @@ -1,7 +1,9 @@ package web3 import ( + "bytes" "context" + "crypto/sha256" "errors" "fmt" "math/big" @@ -9,6 +11,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" erc777 "github.com/vocdoni/census3/contracts/erc/erc777" "github.com/vocdoni/census3/helpers/web3" "github.com/vocdoni/census3/scanner/providers" @@ -29,6 +32,7 @@ type ERC777HolderProvider struct { creationBlock uint64 lastNetworkBlock uint64 synced atomic.Bool + filter providers.Filter } func (p *ERC777HolderProvider) Init(_ context.Context, iconf any) error { @@ -65,6 +69,8 @@ func (p *ERC777HolderProvider) SetRef(iref any) error { if err != nil { return fmt.Errorf("error getting web3 client for the given chainID: %w", err) } + // set the filter provided in the reference + p.filter = ref.Filter // set the client, parse the address and initialize the contract address := common.HexToAddress(ref.HexAddress) if p.contract, err = erc777.NewERC777Contract(address, p.client); err != nil { @@ -112,15 +118,14 @@ func (p *ERC777HolderProvider) SetLastBlockNumber(blockNumber uint64) { // of new transfers, the last block scanned, if the provider is synced and an // error if it exists. func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fromBlock uint64) ( - map[common.Address]*big.Int, uint64, uint64, bool, *big.Int, error, + map[common.Address]*big.Int, *providers.BlocksDelta, error, ) { // if the last network block is lower than the last scanned block, and the // last scanned block is equal to the creation block, it means that the // last network block is outdated, so it returns that it is not synced and // an error if fromBlock >= p.lastNetworkBlock && fromBlock == p.creationBlock { - return nil, 0, fromBlock, false, big.NewInt(0), - errors.New("outdated last network block, it will retry in the next iteration") + return nil, nil, fmt.Errorf("outdated last network block, it will retry in the next iteration") } // calculate the range of blocks to scan, by default take the last block // scanned and scan to the latest block, calculate the latest block if the @@ -130,7 +135,7 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr var err error toBlock, err = p.LatestBlockNumber(ctx, nil) if err != nil { - return nil, 0, fromBlock, false, nil, err + return nil, nil, err } } log.Infow("scan iteration", @@ -143,21 +148,57 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr startTime := time.Now() logs, lastBlock, synced, err := RangeOfLogs(ctx, p.client, p.address, fromBlock, toBlock, LOG_TOPIC_ERC20_TRANSFER) if err != nil && !errors.Is(err, ErrTooManyRequests) { - return nil, 0, fromBlock, false, big.NewInt(0), err + return nil, nil, err } if errors.Is(err, ErrTooManyRequests) { log.Warnf("too many requests, the provider will continue in the next iteration from block %d", lastBlock) } // encode the number of new transfers - newTransfers := uint64(len(logs)) + newTransfers := uint64(0) + alreadyProcessedLogs := uint64(0) balances := make(map[common.Address]*big.Int) // iterate the logs and update the balances + log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs)) + processedLogs := &PartialProcessedLogs{} for _, currentLog := range logs { + // skip the log if it has been removed + if currentLog.Removed { + continue + } + // parse log data logData, err := p.contract.ERC777ContractFilterer.ParseTransfer(currentLog) if err != nil { - return nil, newTransfers, lastBlock, false, nil, - errors.Join(ErrParsingTokenLogs, fmt.Errorf("[ERC777] %s: %w", p.address, err)) + return nil, &providers.BlocksDelta{ + Block: lastBlock, + LogsCount: uint64(len(logs)), + NewLogsCount: newTransfers, + AlreadyProcessedLogsCount: alreadyProcessedLogs, + Synced: false, + TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, + }, errors.Join(ErrParsingTokenLogs, fmt.Errorf("[ERC777] %s: %w", p.address, err)) + } + // check if the log has been already processed and add it to the filter + // if it is not already included + processed, err := p.isLogAlreadyProcessed(currentLog, processedLogs) + if err != nil { + return nil, &providers.BlocksDelta{ + Block: lastBlock, + LogsCount: uint64(len(logs)), + NewLogsCount: newTransfers, + AlreadyProcessedLogsCount: alreadyProcessedLogs, + Synced: false, + TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, + }, errors.Join(ErrCheckingProcessedLogs, fmt.Errorf("[ERC777] %s: %w", p.address, err)) + } + // if it is the first scan, it will not check if the log has been + // already processed + if processed { + alreadyProcessedLogs++ + continue } + newTransfers++ // update balances if toBalance, ok := balances[logData.To]; ok { balances[logData.To] = new(big.Int).Add(toBalance, big.NewInt(1)) @@ -170,13 +211,28 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr balances[logData.From] = big.NewInt(-1) } } - log.Infow("saving blocks", + log.Infow("logs parsed", "count", len(balances), - "logs", len(logs), + "new_logs", newTransfers, + "already_processed_logs", alreadyProcessedLogs, "blocks/s", 1000*float32(lastBlock-fromBlock)/float32(time.Since(startTime).Milliseconds()), "took", time.Since(startTime).Seconds(), "progress", fmt.Sprintf("%d%%", (fromBlock*100)/toBlock)) - return balances, newTransfers, lastBlock, synced, nil, nil + p.synced.Store(synced) + + delta := &providers.BlocksDelta{ + Block: lastBlock, + LogsCount: uint64(len(logs)), + NewLogsCount: newTransfers, + AlreadyProcessedLogsCount: alreadyProcessedLogs, + Synced: synced, + TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, + } + if delta.TotalSupply, err = p.TotalSupply(nil); err != nil { + log.Warnw("error getting total supply, it will retry in the next iteration", "error", err) + } + return balances, delta, nil } // Close method is not implemented for ERC777 tokens. @@ -329,3 +385,43 @@ func (p *ERC777HolderProvider) IconURI(_ []byte) (string, error) { func (p *ERC777HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map[common.Address]*big.Int, error) { return data, nil } + +// isLogAlreadyProcessed returns true if the log with the given block number and +// log index has been already processed. It uses a filter to check if the log +// has been processed. To identify the log, it creates a hash with the block +// number and log index. It returns true if the log has been already processed +// or false if it has not been processed yet. If some error occurs, it returns +// false and the error. +func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log, pl *PartialProcessedLogs) (bool, error) { + // if the filter is not defined, return false + if p.filter == nil { + return false, nil + } + // get a identifier of each transfer: + // sha256(blockNumber-txHash-log.Index) + transferID := fmt.Sprintf("%d-%x-%d", l.BlockNumber, l.TxHash, l.Index) + hashFn := sha256.New() + if _, err := hashFn.Write([]byte(transferID)); err != nil { + return false, err + } + // check if the hash is in the filter + hID := hashFn.Sum(nil)[:8] + exists, err := p.filter.CheckKey(hID) + if err != nil { + return false, err + } + if exists { + return true, nil + } + // if the hash is not in the filter, check if it is in the partial filter + logs := *pl + for _, id := range logs { + if bytes.Equal(id, hID) { + return true, nil + } + } + // add the hash to the partial filter if it has not been processed and return + logs = append(logs, hID) + *pl = logs + return false, nil +} diff --git a/scanner/providers/web3/errors.go b/scanner/providers/web3/errors.go index aa64f591..e51d92d8 100644 --- a/scanner/providers/web3/errors.go +++ b/scanner/providers/web3/errors.go @@ -9,5 +9,7 @@ var ( ErrScanningTokenLogs = fmt.Errorf("error scanning token logs") ErrTooManyRequests = fmt.Errorf("web3 endpoint returns too many requests") ErrParsingTokenLogs = fmt.Errorf("error parsing token logs") + ErrCheckingProcessedLogs = fmt.Errorf("error checking processed logs") ErrGettingTotalSupply = fmt.Errorf("error getting total supply") + ErrAddingProcessedLogs = fmt.Errorf("error adding processed logs to the filter") ) diff --git a/scanner/providers/web3/web3_provider.go b/scanner/providers/web3/web3_provider.go index 9a4e5bca..4c338e66 100644 --- a/scanner/providers/web3/web3_provider.go +++ b/scanner/providers/web3/web3_provider.go @@ -21,6 +21,7 @@ type Web3ProviderRef struct { HexAddress string ChainID uint64 CreationBlock uint64 + Filter providers.Filter } type Web3ProviderConfig struct { @@ -29,6 +30,12 @@ type Web3ProviderConfig struct { DB *db.Database } +// PartialProcessedLogs struct is used to store the logs that are partially +// processed by the provider. It is used to avoid to process the same logs +// multiple times if the provider is rescanned and to store the logs that are +// already processed in a single call to the token filter. +type PartialProcessedLogs [][]byte + // creationBlock function returns the block number of the creation of a contract // address. It uses the `eth_getCode` method to get the contract code at the // block number provided. If the method is not supported, it returns 0 and nil. diff --git a/scanner/scanner.go b/scanner/scanner.go index 415c5215..0b23d3fb 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -4,17 +4,14 @@ import ( "context" "database/sql" "errors" - "fmt" "math/big" "sort" - "strings" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/vocdoni/census3/db" - "github.com/vocdoni/census3/db/annotations" queries "github.com/vocdoni/census3/db/sqlc" "github.com/vocdoni/census3/helpers/web3" "github.com/vocdoni/census3/scanner/providers/manager" @@ -44,6 +41,7 @@ type Scanner struct { ctx context.Context cancel context.CancelFunc db *db.DB + updater *Updater networks *web3.Web3Pool providerManager *manager.ProviderManager coolDown time.Duration @@ -57,9 +55,12 @@ type Scanner struct { // NewScanner returns a new scanner instance with the required parameters // initialized. -func NewScanner(db *db.DB, networks *web3.Web3Pool, pm *manager.ProviderManager, coolDown time.Duration) *Scanner { +func NewScanner(db *db.DB, updater *Updater, networks *web3.Web3Pool, + pm *manager.ProviderManager, coolDown time.Duration, +) *Scanner { return &Scanner{ db: db, + updater: updater, networks: networks, providerManager: pm, coolDown: coolDown, @@ -74,10 +75,7 @@ func NewScanner(db *db.DB, networks *web3.Web3Pool, pm *manager.ProviderManager, // Start starts the scanner. It starts a loop that scans the tokens in the // database and saves the holders in the database. It stops when the context is // cancelled. -func (s *Scanner) Start(ctx context.Context, concurrentTokens int) { - if concurrentTokens < 1 { - concurrentTokens = 1 - } +func (s *Scanner) Start(ctx context.Context) { s.ctx, s.cancel = context.WithCancel(ctx) itCounter := 0 // keep the latest block numbers updated @@ -101,56 +99,58 @@ func (s *Scanner) Start(ctx context.Context, concurrentTokens int) { log.Error(err) continue } - // calculate number of batches - sem := make(chan struct{}, concurrentTokens) - defer close(sem) // iterate over the tokens to scan var atSyncGlobal atomic.Bool atSyncGlobal.Store(true) for _, token := range tokens { - // get the semaphore - sem <- struct{}{} - go func(token ScannerToken) { - // release the semaphore when the goroutine finishes - defer func() { - <-sem - }() - log.Infow("scanning token", - "address", token.Address.Hex(), - "chainID", token.ChainID, - "externalID", token.ExternalID, - "lastBlock", token.LastBlock, - "ready", token.Ready) - // scan the token - holders, newTransfers, lastBlock, synced, totalSupply, err := s.ScanHolders(ctx, token) - if err != nil { - atSyncGlobal.Store(false) - if errors.Is(err, context.Canceled) { - log.Info("scanner context cancelled, shutting down") - return - } - log.Error(err) - return + if !token.Ready { + if err := s.prepareToken(token); err != nil { + log.Warnw("error preparing token", "error", err) + continue } - if !synced { - atSyncGlobal.Store(false) + } + log.Infow("checking token in the updater queue", + "address", token.Address.Hex(), + "chainID", token.ChainID, + "externalID", token.ExternalID) + // get the request ID of the token in the updater queue + reqID, err := RequestID(token.Address, token.ChainID, token.ExternalID) + if err != nil { + log.Error(err) + continue + } + // get the status of the token in the updater queue + status := s.updater.RequestStatus(reqID, true) + if status != nil { + // if the token is in the updater queue, update the + // internal token status and continue to the next token + // only if the token is done + defer s.updateInternalTokenStatus(*token, status.LastBlock, status.Done, status.LastTotalSupply) + if status.Done { + continue } - // save the new token holders - if err = s.SaveHolders(ctx, token, holders, newTransfers, lastBlock, synced, totalSupply); err != nil { - if strings.Contains(err.Error(), "database is closed") { - return + atSyncGlobal.Store(false) + } + // if it has been processed or it is not in the queue, load + // the last available block number of the network and + // enqueue it to the updater queue from the last scanned + // block + if iLastNetworkBlock, ok := s.latestBlockNumbers.Load(token.ChainID); ok { + if lastNetworkBlock, ok := iLastNetworkBlock.(uint64); ok { + if _, err := s.updater.AddRequest(&UpdateRequest{ + Address: token.Address, + ChainID: token.ChainID, + Type: token.Type, + ExternalID: token.ExternalID, + CreationBlock: token.CreationBlock, + EndBlock: lastNetworkBlock, + LastBlock: token.LastBlock, + }); err != nil { + log.Warnw("error enqueuing token", "error", err) + continue } - log.Warnw("error saving tokenholders", - "address", token.Address.Hex(), - "chainID", token.ChainID, - "externalID", token.ExternalID, - "error", err) } - }(*token) - } - // wait for all the tokens to be scanned - for i := 0; i < concurrentTokens; i++ { - sem <- struct{}{} + } } log.Infow("scan iteration finished", "iteration", itCounter, @@ -288,124 +288,11 @@ func (s *Scanner) TokensToScan(ctx context.Context) ([]*ScannerToken, error) { return tokens, nil } -// ScanHolders scans the holders of the given token. It get the current holders -// from the database, set them into the provider and get the new ones. It -// returns the new holders, the last block scanned and if the token is synced -// after the scan. -func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) ( - map[common.Address]*big.Int, uint64, uint64, bool, *big.Int, error, -) { - internalCtx, cancel := context.WithTimeout(ctx, SCAN_TIMEOUT) - defer cancel() - // get the correct token holder provider for the current token - provider, err := s.providerManager.GetProvider(s.ctx, token.Type) - if err != nil { - return nil, 0, token.LastBlock, token.Synced, nil, - fmt.Errorf("token type %d not supported: %w", token.Type, err) - } - // create a tx to use it in the following queries - tx, err := s.db.RW.BeginTx(internalCtx, nil) - if err != nil { - return nil, 0, token.LastBlock, token.Synced, nil, err - } - defer func() { - if err := tx.Rollback(); err != nil && !errors.Is(sql.ErrTxDone, err) { - log.Error(err) - } - }() - qtx := s.db.QueriesRW.WithTx(tx) - // if the provider is not an external one, instance the current token - if !provider.IsExternal() { - if err := provider.SetRef(web3provider.Web3ProviderRef{ - HexAddress: token.Address.Hex(), - ChainID: token.ChainID, - CreationBlock: token.CreationBlock, - }); err != nil { - return nil, 0, token.LastBlock, token.Synced, nil, err - } - // set the last block number of the network in the provider getting it - // from the latest block numbers cache - if iLastNetworkBlock, ok := s.latestBlockNumbers.Load(token.ChainID); ok { - if lastNetworkBlock, ok := iLastNetworkBlock.(uint64); ok { - provider.SetLastBlockNumber(lastNetworkBlock) - } - } - // if the token is not ready yet (its creation block has not been - // calculated yet), calculate it, update the token information and - // return - if !token.Ready { - log.Debugw("token not ready yet, calculating creation block and continue", - "address", token.Address.Hex(), - "chainID", token.ChainID, - "externalID", token.ExternalID) - creationBlock, err := provider.CreationBlock(internalCtx, []byte(token.ExternalID)) - if err != nil { - return nil, 0, token.LastBlock, token.Synced, nil, err - } - _, err = qtx.UpdateTokenBlocks(internalCtx, queries.UpdateTokenBlocksParams{ - ID: token.Address.Bytes(), - ChainID: token.ChainID, - ExternalID: token.ExternalID, - CreationBlock: int64(creationBlock), - LastBlock: int64(creationBlock), - }) - if err != nil { - return nil, 0, token.LastBlock, token.Synced, nil, err - } - token.LastBlock = creationBlock - } - } - log.Infow("scanning holders", - "address", token.Address.Hex(), - "chainID", token.ChainID, - "externalID", token.ExternalID, - "lastBlock", token.LastBlock) - // get the current token holders from the database - results, err := qtx.ListTokenHolders(internalCtx, - queries.ListTokenHoldersParams{ - TokenID: token.Address.Bytes(), - ChainID: token.ChainID, - ExternalID: token.ExternalID, - }) - if err != nil { - return nil, 0, token.LastBlock, token.Synced, nil, err - } - // set the current holders into the provider and get the new ones - currentHolders := map[common.Address]*big.Int{} - for _, result := range results { - bBalance, ok := new(big.Int).SetString(result.Balance, 10) - if !ok { - return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("error parsing token holder balance") - } - currentHolders[common.BytesToAddress(result.HolderID)] = bBalance - } - // close the database tx and commit it - if err := tx.Commit(); err != nil { - return nil, 0, token.LastBlock, token.Synced, nil, err - } - // set the current holders into the provider and get the new ones - if err := provider.SetLastBalances(ctx, []byte(token.ExternalID), - currentHolders, token.LastBlock, - ); err != nil { - return nil, 0, token.LastBlock, token.Synced, nil, err - } - // get the new holders from the provider - return provider.HoldersBalances(ctx, []byte(token.ExternalID), token.LastBlock) -} - -// SaveHolders saves the given holders in the database. It updates the token -// synced status if it is different from the received one. Then, it creates, -// updates or deletes the token holders in the database depending on the -// calculated balance. -// WARNING: the following code could produce holders with negative balances -// in the database. This is because the scanner does not know if the token -// holder is a contract or not, so it does not know if the balance is -// correct or not. The scanner assumes that the balance is correct and -// updates it in the database: -// 1. To get the correct holders from the database you must filter the -// holders with negative balances. -// 2. To get the correct balances you must use the contract methods to get -// the balances of the holders. +// SaveHolders saves the given holders in the database. It calls the SaveHolders +// helper function to save the holders and the token status in the database. It +// prints the number of created and updated token holders if there are any, else +// it prints that there are no holders to save. If some error occurs, it returns +// the error. func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken, holders map[common.Address]*big.Int, newTransfers, lastBlock uint64, synced bool, totalSupply *big.Int, @@ -416,92 +303,8 @@ func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken, "externalID", token.ExternalID, "block", lastBlock, "holders", len(holders)) - s.tokensMtx.Lock() - for i, t := range s.tokens { - if t.Address == token.Address && t.ChainID == token.ChainID && t.ExternalID == token.ExternalID { - s.tokens[i].LastBlock = lastBlock - s.tokens[i].Synced = synced - if totalSupply != nil && totalSupply.Cmp(big.NewInt(0)) > 0 { - s.tokens[i].totalSupply = totalSupply - token.totalSupply = totalSupply - } - break - } - } - s.tokensMtx.Unlock() internalCtx, cancel := context.WithTimeout(ctx, SAVE_TIMEOUT) defer cancel() - // create a tx to use it in the following queries - tx, err := s.db.RW.BeginTx(internalCtx, nil) - if err != nil { - return err - } - defer func() { - if err := tx.Rollback(); err != nil && !errors.Is(sql.ErrTxDone, err) { - log.Errorf("error rolling back tx: %v, token=%s chainID=%d externalID=%s", - err, token.Address.Hex(), token.ChainID, token.ExternalID) - } - }() - qtx := s.db.QueriesRW.WithTx(tx) - // create, update or delete token holders - created, updated := 0, 0 - for addr, balance := range holders { - // get the current token holder from the database - currentTokenHolder, err := qtx.GetTokenHolderEvenZero(ctx, queries.GetTokenHolderEvenZeroParams{ - TokenID: token.Address.Bytes(), - ChainID: token.ChainID, - ExternalID: token.ExternalID, - HolderID: addr.Bytes(), - }) - if err != nil { - if !errors.Is(sql.ErrNoRows, err) { - return err - } - // if the token holder not exists, create it - _, err = qtx.CreateTokenHolder(ctx, queries.CreateTokenHolderParams{ - TokenID: token.Address.Bytes(), - ChainID: token.ChainID, - ExternalID: token.ExternalID, - HolderID: addr.Bytes(), - BlockID: lastBlock, - Balance: balance.String(), - }) - if err != nil { - return err - } - created++ - continue - } - // parse the current balance of the holder - currentBalance, ok := new(big.Int).SetString(currentTokenHolder.Balance, 10) - if !ok { - return fmt.Errorf("error parsing current token holder balance") - } - // if both balances are zero, continue with the next holder to prevent - // UNIQUES constraint errors - if balance.Cmp(big.NewInt(0)) == 0 && currentBalance.Cmp(big.NewInt(0)) == 0 { - continue - } - // calculate the new balance of the holder by adding the current balance - // and the new balance - newBalance := new(big.Int).Add(currentBalance, balance) - // update the token holder in the database with the new balance. - // WANING: the balance could be negative so you must filter the holders - // with negative balances to get the correct holders from the database. - _, err = qtx.UpdateTokenHolderBalance(ctx, queries.UpdateTokenHolderBalanceParams{ - TokenID: token.Address.Bytes(), - ChainID: token.ChainID, - ExternalID: token.ExternalID, - HolderID: addr.Bytes(), - BlockID: currentTokenHolder.BlockID, - NewBlockID: lastBlock, - Balance: newBalance.String(), - }) - if err != nil { - return fmt.Errorf("error updating token holder: %w", err) - } - updated++ - } // print the number of created and updated token holders if there are any, // else, print that there are no holders to save if len(holders) == 0 { @@ -510,6 +313,10 @@ func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken, "chainID", token.ChainID, "externalID", token.ExternalID) } else { + created, updated, err := SaveHolders(s.db, internalCtx, token, holders, newTransfers, lastBlock, synced, totalSupply) + if err != nil { + return err + } log.Debugw("committing token holders", "token", token.Address.Hex(), "chainID", token.ChainID, @@ -519,30 +326,6 @@ func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken, "created", created, "updated", updated) } - // get the token info from the database to update ir - tokenInfo, err := qtx.GetToken(internalCtx, - queries.GetTokenParams{ - ID: token.Address.Bytes(), - ChainID: token.ChainID, - ExternalID: token.ExternalID, - }) - if err != nil { - return err - } - // update the synced status, last block, the number of analysed transfers - // (for debug) and the total supply in the database - _, err = qtx.UpdateTokenStatus(internalCtx, queries.UpdateTokenStatusParams{ - ID: token.Address.Bytes(), - ChainID: token.ChainID, - ExternalID: token.ExternalID, - Synced: synced, - LastBlock: int64(lastBlock), - AnalysedTransfers: tokenInfo.AnalysedTransfers + int64(newTransfers), - TotalSupply: annotations.BigInt(token.totalSupply.String()), - }) - if err != nil { - return err - } log.Debugw("token status saved", "synced", synced, "token", token.Address.Hex(), @@ -550,10 +333,6 @@ func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken, "externalID", token.ExternalID, "totalSupply", token.totalSupply.String(), "block", lastBlock) - // close the database tx and commit it - if err := tx.Commit(); err != nil { - return err - } return nil } @@ -582,3 +361,76 @@ func (s *Scanner) getLatestBlockNumbersUpdates() { } } } + +// updateInternalTokenStatus updates the internal token status of the scanner +// with the given information. It is used to update the last block number and +// the synced status of the token in the scanner. It is used to avoid +// overloading the database with requests to get tokens information in every +// iteration of the scanner. It is used in the SaveHolders function to update +// the token status after saving the holders in the database. +func (s *Scanner) updateInternalTokenStatus(token ScannerToken, lastBlock uint64, + synced bool, totalSupply *big.Int, +) { + s.tokensMtx.Lock() + for i, t := range s.tokens { + if t.Address == token.Address && t.ChainID == token.ChainID && t.ExternalID == token.ExternalID { + s.tokens[i].LastBlock = lastBlock + s.tokens[i].Synced = synced + if totalSupply != nil && totalSupply.Cmp(big.NewInt(0)) > 0 { + s.tokens[i].totalSupply = totalSupply + token.totalSupply = totalSupply + } + break + } + } + s.tokensMtx.Unlock() +} + +// prepareToken prepares the token to be scanned. It calculates the creation +// block of the token if it is not ready yet. It updates the token in the +// scanner but also the token information in the database. It returns an error +// if something fails in the process. It sets the last block of the token to +// the creation block of the token to start scanning from the creation block. +func (s *Scanner) prepareToken(token *ScannerToken) error { + ctx, cancel := context.WithTimeout(s.ctx, PREPARE_TIMEOUT) + defer cancel() + // get the provider by token type + provider, err := s.providerManager.GetProvider(ctx, token.Type) + if err != nil { + return err + } + // if the token is not ready yet (its creation block has not been + // calculated yet), calculate it, update the token information and + // return + if !provider.IsExternal() && !token.Ready { + if err := provider.SetRef(web3provider.Web3ProviderRef{ + HexAddress: token.Address.Hex(), + ChainID: token.ChainID, + CreationBlock: token.CreationBlock, + }); err != nil { + return err + } + log.Debugw("token not ready yet, calculating creation block and continue", + "address", token.Address.Hex(), + "chainID", token.ChainID, + "externalID", token.ExternalID) + creationBlock, err := provider.CreationBlock(ctx, []byte(token.ExternalID)) + if err != nil { + return err + } + _, err = s.db.QueriesRW.UpdateTokenBlocks(ctx, queries.UpdateTokenBlocksParams{ + ID: token.Address.Bytes(), + ChainID: token.ChainID, + ExternalID: token.ExternalID, + CreationBlock: int64(creationBlock), + LastBlock: int64(creationBlock), + }) + if err != nil { + return err + } + token.CreationBlock = creationBlock + token.LastBlock = creationBlock + token.Ready = true + } + return nil +} diff --git a/scanner/updater.go b/scanner/updater.go new file mode 100644 index 00000000..70d64f1f --- /dev/null +++ b/scanner/updater.go @@ -0,0 +1,378 @@ +package scanner + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/vocdoni/census3/db" + queries "github.com/vocdoni/census3/db/sqlc" + "github.com/vocdoni/census3/db/treedb" + "github.com/vocdoni/census3/helpers/web3" + "github.com/vocdoni/census3/scanner/providers/manager" + web3provider "github.com/vocdoni/census3/scanner/providers/web3" + dvotedb "go.vocdoni.io/dvote/db" + "go.vocdoni.io/dvote/log" +) + +// UpdateRequest is a struct to request a token update but also to query about +// the status of a request that is being processed. +type UpdateRequest struct { + Address common.Address + ChainID uint64 + ExternalID string + ChainAddress string + Type uint64 + CreationBlock uint64 + EndBlock uint64 + LastBlock uint64 + Done bool + + TotalLogs uint64 + TotalNewLogs uint64 + TotalAlreadyProcessedLogs uint64 + LastTotalSupply *big.Int +} + +// Updater is a struct to manage the update requests of the tokens. It will +// iterate over the requests, repeating the process of getting the token holders +// balances and saving them in the database until the last block is greater or +// equal to the end block. The end block is the block number where the token +// holders balances are up to date. +type Updater struct { + ctx context.Context + cancel context.CancelFunc + + db *db.DB + networks *web3.Web3Pool + providers *manager.ProviderManager + sortedQueue []string + queue map[string]*UpdateRequest + queueMtx sync.Mutex + processing sync.Map + nextReq atomic.Uint64 + waiter sync.WaitGroup + kvdb dvotedb.Database + coolDown time.Duration +} + +// NewUpdater creates a new instance of Updater. +func NewUpdater(db *db.DB, networks *web3.Web3Pool, pm *manager.ProviderManager, + kvdb dvotedb.Database, coolDown time.Duration, +) *Updater { + return &Updater{ + db: db, + networks: networks, + providers: pm, + sortedQueue: []string{}, + queue: make(map[string]*UpdateRequest), + kvdb: kvdb, + coolDown: coolDown, + } +} + +// Start starts the updater process in a goroutine. +func (u *Updater) Start(ctx context.Context, concurrentTokens int) { + u.ctx, u.cancel = context.WithCancel(ctx) + sem := make(chan struct{}, concurrentTokens) + defer close(sem) + for { + select { + case <-u.ctx.Done(): + return + default: + req, id := u.next() + if req == nil { + time.Sleep(u.coolDown) + continue + } + sem <- struct{}{} + u.waiter.Add(1) + go func(id string, req UpdateRequest) { + defer func() { + u.waiter.Done() + <-sem + }() + log.Infow("processing token", + "address", req.Address.Hex(), + "from", req.CreationBlock, + "to", req.EndBlock, + "current", req.LastBlock) + res, err := u.process(id, req) + if err != nil { + log.Errorf("error processing update request: %v", err) + return + } + // update the request in the queue + if err := u.SetRequest(id, &res); err != nil { + log.Errorf("error updating request in the queue: %v", err) + } + log.Infow("token processed", + "address", res.Address.Hex(), + "lastBlock", res.LastBlock, + "done", res.Done) + }(id, *req) + } + } +} + +// Stop stops the updater process. +func (u *Updater) Stop() { + u.cancel() + u.waiter.Wait() +} + +// RequestStatus returns the status of a request by its ID. If the request is +// done, it will be removed from the queue. If the request is not found, it will +// return an error. +func (u *Updater) RequestStatus(id string, deleteOnDone bool) *UpdateRequest { + u.queueMtx.Lock() + defer u.queueMtx.Unlock() + req, ok := u.queue[id] + if !ok { + return nil + } + if deleteOnDone && req.Done { + // remove the request from the processing map + u.processing.Delete(id) + // remove the request from the queue + delete(u.queue, id) + // remove the request from the sorted queue + for i, v := range u.sortedQueue { + if v == id { + u.sortedQueue = append(u.sortedQueue[:i], u.sortedQueue[i+1:]...) + break + } + } + } + return req +} + +// SetRequest adds a new request to the queue. It will return an error if the +// request is missing required fields or the block range is invalid. The request +// will be added to the queue with a given ID. +func (u *Updater) SetRequest(id string, req *UpdateRequest) error { + // check required fields + if id == "" { + return fmt.Errorf("missing request ID") + } + if req.ChainID == 0 || req.Type == 0 || req.CreationBlock == 0 || req.EndBlock == 0 { + return fmt.Errorf("missing required fields") + } + // ensure the block range is valid + if req.CreationBlock >= req.EndBlock { + return fmt.Errorf("invalid block range") + } + // set the last block to the creation block to start the process from there + // if it is not set by the client + if req.LastBlock == 0 { + req.LastBlock = req.CreationBlock + } + u.queueMtx.Lock() + defer u.queueMtx.Unlock() + if _, exists := u.queue[id]; !exists { + u.sortedQueue = append(u.sortedQueue, id) + } + u.queue[id] = req + return nil +} + +// AddRequest adds a new request to the queue. It will return an error if the +// request is missing required fields or the block range is invalid. The request +// will be added to the queue with a ID generated from the address, chainID and +// externalID, that will be returned to allow the client to query the status of +// the request. +func (u *Updater) AddRequest(req *UpdateRequest) (string, error) { + id, err := RequestID(req.Address, req.ChainID, req.ExternalID) + if err != nil { + return "", err + } + if _, ok := u.processing.Load(id); ok { + return "", nil + } + if err := u.SetRequest(id, req); err != nil { + return "", err + } + return id, nil +} + +// RequestID returns the ID of a request given the address, chainID and external +// ID. The raw ID is a string with the format "chainID:address:externalID". The +// resulting ID is the first 4 bytes of the hash of the raw ID using the sha256 +// algorithm, encoded in hexadecimal. +func RequestID(address common.Address, chainID uint64, externalID string) (string, error) { + rawID := fmt.Sprintf("%d:%s:%s", chainID, address.Hex(), externalID) + hashFn := sha256.New() + if _, err := hashFn.Write([]byte(rawID)); err != nil { + return "", err + } + bHash := hashFn.Sum(nil) + return hex.EncodeToString(bHash[:4]), nil +} + +// next returns the next request in the queue that is not being processed or +// already done. It will return the request and its ID. If the queue is empty or +// the next request is out of the range of the sorted queue, it will return nil +// and an it will return nil and an empty string. If the next request is found +// it updates the next request index to the next request in the sorted queue. +func (u *Updater) next() (*UpdateRequest, string) { + u.queueMtx.Lock() + defer u.queueMtx.Unlock() + // if the queue is empty return nil + if len(u.sortedQueue) == 0 { + return nil, "" + } + // get the next request in the queue, if the next request is out of the + // range of the sorted queue, return nil and set the next request index to 0 + i := u.nextReq.Load() + max := uint64(len(u.sortedQueue)) + if i >= max { + u.nextReq.Store(0) + return nil, "" + } + // iterate over the sorted queue to find the next request that is not being + // processed or already done + for ; i < max; i++ { + id := u.sortedQueue[i] + req, exists := u.queue[id] + if !exists { + // if the request is not found, remove the ID from the sorted queue and + // return nil setting the next request index to 0 + u.sortedQueue = append(u.sortedQueue[:i], u.sortedQueue[i+1:]...) + u.nextReq.Store(0) + return nil, "" + } + // if request is not done and not being processed, return it + if isProcessing, ok := u.processing.Load(id); !req.Done && (!ok || !isProcessing.(bool)) { + u.nextReq.Store(i + 1) + return req, id + } + } + // if the next request is not found, set the next request index to 0 + u.nextReq.Store(0) + return nil, "" +} + +// process iterates over the current queue items, getting the token holders +// balances and saving them in the database until the last block is greater or +// equal to the end block. It updates th status of the request in the queue. It +// will return an error if the provider is not found, the token is external or +// there is an error getting the token holders balances. +func (u *Updater) process(id string, req UpdateRequest) (UpdateRequest, error) { + // set the request as processing and defer to set it as not processing + u.processing.Store(id, true) + defer u.processing.Store(id, false) + // create a context with a timeout to avoid blocking the process + ctx, cancel := context.WithTimeout(u.ctx, UPDATE_TIMEOUT) + defer cancel() + // get the provider by token type + provider, err := u.providers.GetProvider(ctx, req.Type) + if err != nil { + return req, fmt.Errorf("error getting provider for token: %v", err) + } + // if the token is a external token, return an error + var filter *treedb.TreeDB + if !provider.IsExternal() { + chainAddress, ok := u.networks.ChainAddress(req.ChainID, req.Address.Hex()) + if !ok { + return req, fmt.Errorf("error getting chain address for token: %v", err) + } + // load filter of the token from the database + filter, err = treedb.LoadTree(u.kvdb, chainAddress) + if err != nil { + return req, err + } + // set the reference of the token to update in the provider + if err := provider.SetRef(web3provider.Web3ProviderRef{ + HexAddress: req.Address.Hex(), + ChainID: req.ChainID, + CreationBlock: req.CreationBlock, + Filter: filter, + }); err != nil { + return req, fmt.Errorf("error setting provider reference: %v", err) + } + } + // update the last block number of the provider to the last block of + // the request + provider.SetLastBlockNumber(req.EndBlock) + // get current token holders from database + results, err := u.db.QueriesRO.ListTokenHolders(ctx, queries.ListTokenHoldersParams{ + TokenID: req.Address.Bytes(), + ChainID: req.ChainID, + }) + if err != nil { + return req, fmt.Errorf("error getting token holders from database: %v", err) + } + currentHolders := map[common.Address]*big.Int{} + for _, holder := range results { + bBalance, ok := new(big.Int).SetString(holder.Balance, 10) + if !ok { + log.Warnw("error parsing balance from database", + "balance", holder.Balance, + "holder", holder.HolderID, + "address", req.Address.Hex(), + "chainID", req.ChainID, + "externalID", req.ExternalID) + continue + } + currentHolders[common.Address(holder.HolderID)] = bBalance + } + // set the current holders in the provider + if err := provider.SetLastBalances(ctx, nil, currentHolders, req.LastBlock); err != nil { + return req, fmt.Errorf("error setting last balances in provider: %v", err) + } + // get range balances from the provider, it will check itereate again + // over transfers logs, checking if there are new transfers using the + // bloom filter associated to the token + balances, delta, err := provider.HoldersBalances(ctx, nil, req.LastBlock) + // update the token last block in the request before checking the error + if delta != nil { + req.TotalLogs += delta.LogsCount + req.TotalNewLogs += delta.NewLogsCount + req.TotalAlreadyProcessedLogs += delta.AlreadyProcessedLogsCount + req.LastTotalSupply = delta.TotalSupply + + req.Done = delta.Synced + if delta.Synced { + req.LastBlock = req.EndBlock + } else if delta.Block >= req.LastBlock { + req.LastBlock = delta.Block + } + } + if err != nil { + return req, fmt.Errorf("error getting token holders balances: %v", err) + } + log.Debugw("new logs received", + "address", req.Address.Hex(), + "from", req.LastBlock, + "lastBlock", delta.Block, + "newLogs", delta.NewLogsCount, + "alreadyProcessedLogs", delta.AlreadyProcessedLogsCount, + "totalLogs", delta.LogsCount) + // save the new balances in the database + created, updated, err := SaveHolders(u.db, ctx, ScannerToken{ + Address: req.Address, + ChainID: req.ChainID, + }, balances, delta.NewLogsCount, delta.Block, delta.Synced, delta.TotalSupply) + if err != nil { + return req, fmt.Errorf("error saving token holders balances: %v", err) + } + // add the new keys to the filter if it is defined (not external token) + if filter != nil && delta.NewLogs != nil { + if err := filter.AddKey(delta.NewLogs...); err != nil { + return req, fmt.Errorf("error adding keys to filter: %v", err) + } + } + log.Debugw("token holders balances updated", + "token", req.Address.Hex(), + "chainID", req.ChainID, + "created", created, + "updated", updated) + return req, nil +} diff --git a/scripts/block_logs/main.go b/scripts/block_logs/main.go new file mode 100644 index 00000000..7f155eaf --- /dev/null +++ b/scripts/block_logs/main.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "math/big" + "strings" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" +) + +var ( + rpcURL string + contractAddress string + blockNumber int64 +) + +func init() { + flag.StringVar(&rpcURL, "rpc", "https://mainnet.infura.io/v3/YOUR_INFURA_PROJECT_ID", "Ethereum RPC URL") + flag.StringVar(&contractAddress, "contract", "", "ERC20 contract address") + flag.Int64Var(&blockNumber, "block", 0, "Block number") +} + +var transferEventSignature = []byte("Transfer(address,address,uint256)") +var transferEventSigHash = common.BytesToHash(crypto.Keccak256(transferEventSignature)) + +func main() { + flag.Parse() + + if rpcURL == "" || contractAddress == "" || blockNumber == 0 { + log.Fatalf("All flags (rpc, contract, block) are required") + } + fmt.Println("event signature hash:", transferEventSigHash.Hex()) + + client, err := ethclient.Dial(rpcURL) + if err != nil { + log.Fatalf("Failed to connect to the Ethereum client: %v", err) + } + + blockNum := big.NewInt(blockNumber) + contractAddr := common.HexToAddress(contractAddress) + + query := ethereum.FilterQuery{ + FromBlock: blockNum, + ToBlock: blockNum, + Addresses: []common.Address{contractAddr}, + Topics: [][]common.Hash{{transferEventSigHash}}, + } + + logs, err := client.FilterLogs(context.Background(), query) + if err != nil { + log.Fatalf("Failed to retrieve logs: %v", err) + } + + transferEventABI := `[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]` + + contractABI, err := abi.JSON(strings.NewReader(transferEventABI)) + if err != nil { + log.Fatalf("Failed to parse contract ABI: %v", err) + } + + for _, vLog := range logs { + event := struct { + From common.Address + To common.Address + Value *big.Int + }{} + + err := contractABI.UnpackIntoInterface(&event, "Transfer", vLog.Data) + if err != nil { + log.Fatalf("Failed to unpack log data: %v", err) + } + + event.From = common.HexToAddress(vLog.Topics[1].Hex()) + event.To = common.HexToAddress(vLog.Topics[2].Hex()) + + fmt.Printf("Log Block Number: %d\n", vLog.BlockNumber) + fmt.Printf("Log Index: %d\n", vLog.Index) + fmt.Printf("From: %s\n", event.From.Hex()) + fmt.Printf("To: %s\n", event.To.Hex()) + fmt.Printf("Value: %s\n", event.Value.String()) + } +}