Skip to content

Commit

Permalink
fix error handling on scanner and wrap some errors
Browse files Browse the repository at this point in the history
Signed-off-by: p4u <pau@dabax.net>
  • Loading branch information
p4u committed Aug 22, 2024
1 parent 3e2e7c5 commit b0c5d42
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 27 deletions.
2 changes: 1 addition & 1 deletion scanner/providers/farcaster/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (p *FarcasterProvider) updateFarcasterDB(ctx context.Context, usersData []F
return fmt.Errorf("cannot update farcaster db: %w", err)
}
defer func() {
if err := tx.Rollback(); err != nil && !errors.Is(sql.ErrTxDone, err) {
if err := tx.Rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) {
log.Errorw(err, "farcaster transaction rollback failed")
}
}()
Expand Down
8 changes: 3 additions & 5 deletions scanner/providers/farcaster/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (p *FarcasterProvider) scanIteration(ctx context.Context) (uint64, bool, bo

// save new users registered on the database
// from the logs of the IDRegistry we can obtain the user FID and the custody and recovery addresses
if err := p.storeNewRegisteredUsers(ctx, newRegisters, fromBlock); err != nil {
if err := p.storeNewRegisteredUsers(ctx, newRegisters); err != nil {
return fromBlock, isIDRegistrySynced, isKeyRegistrySynced,
fmt.Errorf("cannot store new registered users into farcaster DB %s", err.Error())
}
Expand Down Expand Up @@ -592,16 +592,14 @@ func (p *FarcasterProvider) scanLogsKeyRegistry(ctx context.Context, fromBlock,
return addedKeys, removedKeys, lastBlock, synced, nil
}

func (p *FarcasterProvider) storeNewRegisteredUsers(
ctx context.Context, newRegisters map[uint64]common.Address, fromBlock uint64,
) error {
func (p *FarcasterProvider) storeNewRegisteredUsers(ctx context.Context, newRegisters map[uint64]common.Address) error {
usersDBData := make([]FarcasterUserData, 0)
for fid := range newRegisters {
_, err := p.db.QueriesRO.GetUserByFID(ctx, fid)
if err == nil { // if the user already exists in the database skip it
continue
}
if err != nil && !errors.Is(err, sql.ErrNoRows) {
if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("cannot get user by fid %w", err)
}
userData := FarcasterUserData{
Expand Down
42 changes: 21 additions & 21 deletions scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *Scanner) Start(ctx context.Context, concurrentTokens int) {
// get the tokens to scan
tokens, err := s.TokensToScan(ctx)
if err != nil {
log.Error(err)
log.Errorw(err, "error getting tokens to scan")
continue
}
// calculate number of batches
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *Scanner) Start(ctx context.Context, concurrentTokens int) {
log.Info("scanner context cancelled, shutting down")
return
}
log.Error(err)
log.Errorw(err, "error scanning token")
return
}
if !synced {
Expand Down Expand Up @@ -186,7 +186,7 @@ func (s *Scanner) TokensToScan(ctx context.Context) ([]*ScannerToken, error) {
tokens := []*ScannerToken{}
// get last created tokens from the database to scan them first (1)
lastNotSyncedTokens, err := s.db.QueriesRO.ListLastNoSyncedTokens(internalCtx)
if err != nil && !errors.Is(sql.ErrNoRows, err) {
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
// parse last not synced token addresses
Expand All @@ -209,7 +209,7 @@ func (s *Scanner) TokensToScan(ctx context.Context) ([]*ScannerToken, error) {
}
// get old not synced tokens from the database (2)
oldNotSyncedTokens, err := s.db.QueriesRO.ListOldNoSyncedTokens(internalCtx)
if err != nil && !errors.Is(sql.ErrNoRows, err) {
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
// if there are old not synced tokens, sort them by nearest to be synced
Expand Down Expand Up @@ -261,7 +261,7 @@ func (s *Scanner) TokensToScan(ctx context.Context) ([]*ScannerToken, error) {
}
// get synced tokens from the database to scan them last (3)
syncedTokens, err := s.db.QueriesRO.ListSyncedTokens(internalCtx)
if err != nil && !errors.Is(sql.ErrNoRows, err) {
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
for _, token := range syncedTokens {
Expand Down Expand Up @@ -306,10 +306,10 @@ func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) (
// create a tx to use it in the following queries
tx, err := s.db.RW.BeginTx(internalCtx, nil)
if err != nil {
return nil, 0, token.LastBlock, token.Synced, nil, err
return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("error starting tx: %w", err)
}
defer func() {
if err := tx.Rollback(); err != nil && !errors.Is(sql.ErrTxDone, err) {
if err := tx.Rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) {
log.Error(err)
}
}()
Expand All @@ -321,7 +321,7 @@ func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) (
ChainID: token.ChainID,
CreationBlock: token.CreationBlock,
}); err != nil {
return nil, 0, token.LastBlock, token.Synced, nil, err
return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("error setting provider ref: %w", err)
}
// set the last block number of the network in the provider getting it
// from the latest block numbers cache
Expand All @@ -340,7 +340,7 @@ func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) (
"externalID", token.ExternalID)
creationBlock, err := provider.CreationBlock(internalCtx, []byte(token.ExternalID))
if err != nil {
return nil, 0, token.LastBlock, token.Synced, nil, err
return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("error getting token creation block: %w", err)
}
_, err = qtx.UpdateTokenBlocks(internalCtx, queries.UpdateTokenBlocksParams{
ID: token.Address.Bytes(),
Expand All @@ -350,7 +350,7 @@ func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) (
LastBlock: int64(creationBlock),
})
if err != nil {
return nil, 0, token.LastBlock, token.Synced, nil, err
return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("error updating token blocks: %w", err)
}
token.LastBlock = creationBlock
}
Expand All @@ -368,7 +368,7 @@ func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) (
ExternalID: token.ExternalID,
})
if err != nil {
return nil, 0, token.LastBlock, token.Synced, nil, err
return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("error getting token holders: %w", err)
}
// set the current holders into the provider and get the new ones
currentHolders := map[common.Address]*big.Int{}
Expand All @@ -381,13 +381,13 @@ func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) (
}
// close the database tx and commit it
if err := tx.Commit(); err != nil {
return nil, 0, token.LastBlock, token.Synced, nil, err
return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("error committing tx: %w", err)
}
// set the current holders into the provider and get the new ones
if err := provider.SetLastBalances(ctx, []byte(token.ExternalID),
currentHolders, token.LastBlock,
); err != nil {
return nil, 0, token.LastBlock, token.Synced, nil, err
return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("error setting last balances: %w", err)
}
// get the new holders from the provider
return provider.HoldersBalances(ctx, []byte(token.ExternalID), token.LastBlock)
Expand Down Expand Up @@ -434,10 +434,10 @@ func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken,
// create a tx to use it in the following queries
tx, err := s.db.RW.BeginTx(internalCtx, nil)
if err != nil {
return err
return fmt.Errorf("error starting tx: %w", err)
}
defer func() {
if err := tx.Rollback(); err != nil && !errors.Is(sql.ErrTxDone, err) {
if err := tx.Rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) {
log.Errorf("error rolling back tx: %v, token=%s chainID=%d externalID=%s",
err, token.Address.Hex(), token.ChainID, token.ExternalID)
}
Expand All @@ -454,8 +454,8 @@ func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken,
HolderID: addr.Bytes(),
})
if err != nil {
if !errors.Is(sql.ErrNoRows, err) {
return err
if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("error getting token holder: %w", err)
}
// if the token holder not exists, create it
_, err = qtx.CreateTokenHolder(ctx, queries.CreateTokenHolderParams{
Expand All @@ -467,7 +467,7 @@ func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken,
Balance: balance.String(),
})
if err != nil {
return err
return fmt.Errorf("error creating token holder: %w", err)
}
created++
continue
Expand Down Expand Up @@ -527,7 +527,7 @@ func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken,
ExternalID: token.ExternalID,
})
if err != nil {
return err
return fmt.Errorf("error getting token: %w", err)
}
// update the synced status, last block, the number of analysed transfers
// (for debug) and the total supply in the database
Expand All @@ -541,7 +541,7 @@ func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken,
TotalSupply: annotations.BigInt(token.totalSupply.String()),
})
if err != nil {
return err
return fmt.Errorf("error updating token status: %w", err)
}
log.Debugw("token status saved",
"synced", synced,
Expand All @@ -552,7 +552,7 @@ func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken,
"block", lastBlock)
// close the database tx and commit it
if err := tx.Commit(); err != nil {
return err
return fmt.Errorf("error committing tx: %w", err)
}
return nil
}
Expand Down

0 comments on commit b0c5d42

Please sign in to comment.