Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #53 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Improve missing block finder & retry queue manager
  • Loading branch information
itzmeanjan authored Jan 31, 2021
2 parents f06e0fc + da0c3df commit 8946574
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 51 deletions.
7 changes: 4 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne
func Run(configFile, subscriptionPlansFile string) {
_connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile)
_redisInfo := d.RedisInfo{
Client: _redisClient,
BlockRetryQueueName: "blocks_in_retry_queue",
UnfinalizedBlocksQueueName: "unfinalized_blocks",
Client: _redisClient,
BlockRetryQueue: "blocks_in_retry_queue",
BlockRetryCountTable: "attempt_count_tracker_table",
UnfinalizedBlocksQueue: "unfinalized_blocks",
}

// Attempting to listen to Ctrl+C signal
Expand Down
20 changes: 11 additions & 9 deletions app/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func HasBlockFinalized(status *d.StatusHolder, number uint64) bool {
}

// ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) {
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) bool {

// Closure managing publishing whole block data i.e. block header, txn(s), event logs
// on redis pubsub channel
Expand Down Expand Up @@ -60,7 +60,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt)))
status.IncrementBlocksProcessed()

return
return true

}

Expand All @@ -71,7 +71,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return
return true

}

Expand All @@ -82,15 +82,15 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm

// If failed to persist, we'll put it in retry queue
PushBlockIntoRetryQueue(redis, block.Number().String())
return
return false

}

// Successfully processed block
log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt)))
status.IncrementBlocksProcessed()

return
return true

}

Expand Down Expand Up @@ -165,7 +165,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
if !(result.Failure == 0) {

PushBlockIntoRetryQueue(redis, block.Number().String())
return
return false

}

Expand All @@ -183,7 +183,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)))
status.IncrementBlocksProcessed()

return
return true

}

Expand All @@ -194,7 +194,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return
return true

}

Expand All @@ -205,12 +205,14 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm

// If failed to persist, we'll put it in retry queue
PushBlockIntoRetryQueue(redis, block.Number().String())
return
return false

}

// Successfully processed block
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)))

status.IncrementBlocksProcessed()
return true

}
9 changes: 8 additions & 1 deletion app/block/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, r
return
}

ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt)
// If attempt to process block by number went successful
// we can consider removing this block number's entry from
// attempt count tracker table
if ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt) {

RemoveBlockFromAttemptCountTrackerTable(redis, fmt.Sprintf("%d", number))

}

}

Expand Down
19 changes: 13 additions & 6 deletions app/block/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package block

import (
"context"
"fmt"
"log"
"runtime"

Expand Down Expand Up @@ -87,7 +88,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
//
// Though it'll be picked up sometime in future ( by missing block finder ), but it can be safely handled now
// so that it gets processed immediately
func(blockHash common.Hash, blockNumber string) {
func(blockHash common.Hash, blockNumber uint64) {

// When only processing blocks in real-time mode
// no need to check what's present in unfinalized block number queue
Expand Down Expand Up @@ -132,9 +133,15 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
}(oldest)

} else {
// If oldest block is not finalized, no meaning
// staying here, we'll revisit it some time in future
break

// If left most block is not yet finalized, it'll attempt to
// reorganize that queue so that other blocks waiting to be processed
// can get that opportunity
//
// This situation generally occurs due to concurrent pattern implemented
// in block processor
MoveUnfinalizedOldestBlockToEnd(redis)

}

}
Expand All @@ -145,14 +152,14 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,

FetchBlockByHash(connection.RPC,
blockHash,
blockNumber,
fmt.Sprintf("%d", blockNumber),
_db,
redis,
status)

})

}(header.Hash(), header.Number.String())
}(header.Hash(), header.Number.Uint64())

}
}
Expand Down
82 changes: 78 additions & 4 deletions app/block/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,19 @@ func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.Redis
sleep()

// Popping oldest element from Redis queue
blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueueName).Result()
blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueue).Result()
if err != nil {
continue
}

attemptCount := GetAttemptCountFromTable(redis, blockNumber)
if attemptCount != 0 && attemptCount%3 != 0 {

PushBlockIntoRetryQueue(redis, blockNumber)
continue

}

// Parsing string blockNumber to uint64
parsedBlockNumber, err := strconv.ParseUint(blockNumber, 10, 64)
if err != nil {
Expand Down Expand Up @@ -82,11 +90,75 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) {
// Checking presence first & then deciding whether to add it or not
if !CheckBlockInRetryQueue(redis, blockNumber) {

if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueueName, blockNumber).Result(); err != nil {
if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueue, blockNumber).Result(); err != nil {
log.Print(color.Red.Sprintf("[!] Failed to push block %s into retry queue : %s", blockNumber, err.Error()))
}

IncrementAttemptCountOfBlockNumber(redis, blockNumber)

}
}

// IncrementAttemptCountOfBlockNumber - Given block number, increments failed attempt count
// of processing this block
//
// If block doesn't yet exist in tracker table, it'll be inserted first time & counter to be set to 1
//
// It'll be wrapped back to 0 as soon as it reaches 101
func IncrementAttemptCountOfBlockNumber(redis *data.RedisInfo, blockNumber string) {

wrappedAttemptCount := (GetAttemptCountFromTable(redis, blockNumber) + 1) % 101

if _, err := redis.Client.HSet(context.Background(), redis.BlockRetryCountTable, blockNumber, wrappedAttemptCount).Result(); err != nil {
log.Print(color.Red.Sprintf("[!] Failed to increment attempt count of block %s : %s", blockNumber, err.Error()))
}

}

// CheckBlockInAttemptCounterTable - Checks whether given block number already exist in
// attempt count tracker table
func CheckBlockInAttemptCounterTable(redis *data.RedisInfo, blockNumber string) bool {

if _, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result(); err != nil {
return false
}

return true

}

// GetAttemptCountFromTable - Returns current attempt counter from table
// for given block number
func GetAttemptCountFromTable(redis *data.RedisInfo, blockNumber string) uint64 {

count, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result()
if err != nil {
return 0
}

parsedCount, err := strconv.ParseUint(count, 10, 64)
if err != nil {
return 0
}

return parsedCount

}

// RemoveBlockFromAttemptCountTrackerTable - Attempt to delete block number's
// associated attempt count, given it already exists in table
//
// This is supposed to be invoked when a block is considered to be successfully processed
func RemoveBlockFromAttemptCountTrackerTable(redis *data.RedisInfo, blockNumber string) {

if CheckBlockInAttemptCounterTable(redis, blockNumber) {

if _, err := redis.Client.HDel(context.Background(), redis.BlockRetryCountTable, blockNumber).Result(); err != nil {
log.Print(color.Red.Sprintf("[!] Failed to delete attempt count of successful block %s : %s", blockNumber, err.Error()))
}

}

}

// CheckBlockInRetryQueue - Checks whether block number is already added in
Expand All @@ -97,17 +169,19 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) {
// Note: this feature of checking index of value in redis queue,
// was added in Redis v6.0.6 : https://redis.io/commands/lpos
func CheckBlockInRetryQueue(redis *data.RedisInfo, blockNumber string) bool {
if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil {

if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueue, blockNumber, _redis.LPosArgs{}).Result(); err != nil {
return false
}

return true

}

// GetRetryQueueLength - Returns redis backed retry queue length
func GetRetryQueueLength(redis *data.RedisInfo) int64 {

blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueueName).Result()
blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueue).Result()
if err != nil {
log.Printf(color.Red.Sprintf("[!] Failed to determine retry queue length : %s", err.Error()))
}
Expand Down
74 changes: 62 additions & 12 deletions app/block/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"runtime"
"sort"
"time"

"github.com/ethereum/go-ethereum/ethclient"
Expand All @@ -16,8 +17,32 @@ import (
"gorm.io/gorm"
)

// FindMissingBlocksInRange - Given ascending ordered block numbers read from DB
// attempts to find out which numbers are missing in [from, to] range
// where both ends are inclusive
func FindMissingBlocksInRange(found []uint64, from uint64, to uint64) []uint64 {

// creating slice with backing array of larger size
// to avoid potential memory allocation during iteration
// over loop
absent := make([]uint64, 0, to-from+1)

for b := from; b <= to; b++ {

idx := sort.Search(len(found), func(j int) bool { return found[j] >= b })

if !(idx < len(found) && found[idx] == b) {
absent = append(absent, b)
}

}

return absent

}

// Syncer - Given ascending block number range i.e. fromBlock <= toBlock
// fetches blocks in order {fromBlock, toBlock, fromBlock + 1, toBlock - 1, fromBlock + 2, toBlock - 2 ...}
// attempts to fetch missing blocks in that range
// while running n workers concurrently, where n = number of cores this machine has
//
// Waits for all of them to complete
Expand All @@ -28,8 +53,6 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB
}

wp := workerpool.New(runtime.NumCPU() * int(cfg.GetConcurrencyFactor()))
i := fromBlock
j := toBlock

// Jobs need to be submitted using this interface, while
// just mentioning which block needs to be fetched
Expand All @@ -43,17 +66,44 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB
})
}

for i <= j {
// This condition to be arrived at when range has odd number of elements
if i == j {
job(i)
} else {
job(i)
job(j)
// attempting to fetch X blocks ( max ) at a time, by range
//
// @note This can be improved
var step uint64 = 10000

for i := fromBlock; i <= toBlock; i += step {

toShouldbe := i + step - 1
if toShouldbe > toBlock {
toShouldbe = toBlock
}

blocks := db.GetAllBlockNumbersInRange(_db, i, toShouldbe)

// No blocks present in DB, in queried range
if blocks == nil || len(blocks) == 0 {

// So submitting all of them to job processor queue
for j := i; j <= toShouldbe; j++ {

job(j)

}
continue

}

// All blocks in range present in DB ✅
if toShouldbe-i+1 == uint64(len(blocks)) {
continue
}

// Some blocks are missing in range, attempting to find them
// and pushing their processing request to job queue
for _, v := range FindMissingBlocksInRange(blocks, i, toShouldbe) {
job(v)
}

i++
j--
}

wp.StopWait()
Expand Down
Loading

0 comments on commit 8946574

Please sign in to comment.