Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #47 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Delayed block processing queue [ addressing Chain Reorganisation issue ]
  • Loading branch information
itzmeanjan authored Jan 17, 2021
2 parents 8da500e + dc4834f commit 769db23
Show file tree
Hide file tree
Showing 21 changed files with 462 additions and 229 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ And that's `ette`

> Note : Redis **v6.0.6** is required
> Note : Setting password in Redis instance has been made optional from now on, though it's recommended.
- Blockchain Node's both **HTTP & Websocket** connection URL required, because we'll be querying block, transaction, event log related data using HTTP interface & listening for block mining events in real time over Websocket.

## Installation 🛠
Expand Down Expand Up @@ -98,6 +100,12 @@ cd ette
- For processing block(s)/ tx(s) concurrently, it'll create `ConcurrencyFactor * #-of CPUs on machine` workers, who will pick up jobs submitted to them.
- If nothing is specified, it defaults to 1 & assuming you're running `ette` on machine with 4 CPUs, it'll spawn worker pool of size 4. But more number of jobs can be submitted, only 4 can be running at max.
- 👆 being done for controlling concurrency level, by putting more control on user's hand.
- If you want to persist blocks in delayed fashion, you might consider setting `BlockConfirmations` to some _number > 0_.
- That will make `ette` think you're asking it 80 is latest block, which can be persisted in final data store, when latest mined block number is 100 & `BlockConfirmations` is set to 20.
- This option is **recommended** to be used, at least in production.
- Skipping `RedisPassword` is absolutely fine, if you don't want to use any password in Redis instance. [ **Not recommended** ]
- For range based queries `BlockRange` can be set to limit how many blocks can be queried by client in a single go. Default value 100.
- For time span based queries `TimeRange` can be set to put limit on max time span _( in terms of second )_, can be used by clients. Default value 3600 i.e. 1 hour.

```
RPCUrl=https://<domain-name>
Expand All @@ -117,6 +125,9 @@ Production=yes
EtteMode=3
EtteGraphQLPlayGround=yes
ConcurrencyFactor=2
BlockConfirmations=20
BlockRange=1000
TimeRange=21600
```
- Create another file in same directory, named `.plans.json`, whose content will look like 👇.
Expand Down
11 changes: 8 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne
Websocket: getClient(false),
}

_redisClient := getPubSubClient()
_redisClient := getRedisClient()

if _redisClient == nil {
log.Fatalf("[!] Failed to connect to Redis Server\n")
}

if err := _redisClient.FlushAll(context.Background()).Err(); err != nil {
log.Printf("[!] Failed to flush all keys from redis : %s\n", err.Error())
Expand Down Expand Up @@ -63,8 +67,9 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne
func Run(configFile, subscriptionPlansFile string) {
_connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile)
_redisInfo := d.RedisInfo{
Client: _redisClient,
QueueName: "blocks",
Client: _redisClient,
BlockRetryQueueName: "blocks_in_retry_queue",
UnfinalizedBlocksQueueName: "unfinalized_blocks",
}

// Attempting to listen to Ctrl+C signal
Expand Down
42 changes: 38 additions & 4 deletions app/block/block.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package block

import (
"fmt"
"log"
"runtime"

Expand All @@ -14,6 +15,14 @@ import (
"gorm.io/gorm"
)

// HasBlockFinalized - Checking whether block under processing i.e. `number`
// has `N` confirmations on top of it or not
func HasBlockFinalized(status *d.StatusHolder, number uint64) bool {

return status.GetLatestBlockNumber()-cfg.GetBlockConfirmations() >= number

}

// ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder) {

Expand All @@ -40,11 +49,24 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
// This is what we just published on pubsub channel
packedBlock := pubsubWorker(nil)

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with 0 tx(s) [ Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return

}

// If block doesn't contain any tx, we'll attempt to persist only block
if err := db.StoreBlock(_db, packedBlock, status); err != nil {

log.Print(color.Red.Sprintf("[+] Failed to process block %d with 0 tx(s) : %s", block.NumberU64(), err.Error()))

// If failed to persist, we'll put it in retry queue
pushBlockHashIntoRedisQueue(redis, block.Number().String())
PushBlockIntoRetryQueue(redis, block.Number().String())
return

}
Expand Down Expand Up @@ -127,8 +149,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
// we're exiting from this context, while putting this block number in retry queue
if !(result.Failure == 0) {

// If failed to persist, we'll put it in retry queue
pushBlockHashIntoRedisQueue(redis, block.Number().String())
PushBlockIntoRetryQueue(redis, block.Number().String())
return

}
Expand All @@ -138,11 +159,24 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
// This is what we just published on pubsub channel
packedBlock := pubsubWorker(packedTxs)

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with %d tx(s) [ Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, block.Transactions().Len(), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return

}

// If block doesn't contain any tx, we'll attempt to persist only block
if err := db.StoreBlock(_db, packedBlock, status); err != nil {

log.Print(color.Red.Sprintf("[+] Failed to process block %d with %d tx(s) : %s", block.NumberU64(), block.Transactions().Len(), err.Error()))

// If failed to persist, we'll put it in retry queue
pushBlockHashIntoRedisQueue(redis, block.Number().String())
PushBlockIntoRetryQueue(redis, block.Number().String())
return

}
Expand Down
4 changes: 2 additions & 2 deletions app/block/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func FetchBlockByHash(client *ethclient.Client, hash common.Hash, number string,
block, err := client.BlockByHash(context.Background(), hash)
if err != nil {
// Pushing block number into Redis queue for retrying later
pushBlockHashIntoRedisQueue(redis, number)
PushBlockIntoRetryQueue(redis, number)

log.Print(color.Red.Sprintf("[!] Failed to fetch block by hash [ block : %s] : %s", number, err.Error()))
return
Expand All @@ -37,7 +37,7 @@ func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, r
block, err := client.BlockByNumber(context.Background(), _num)
if err != nil {
// Pushing block number into Redis queue for retrying later
pushBlockHashIntoRedisQueue(redis, fmt.Sprintf("%d", number))
PushBlockIntoRetryQueue(redis, fmt.Sprintf("%d", number))

log.Print(color.Red.Sprintf("[!] Failed to fetch block by number [ block : %d ] : %s", number, err))
return
Expand Down
50 changes: 42 additions & 8 deletions app/block/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
log.Fatal(color.Red.Sprintf("[!] Listener stopped : %s", err.Error()))
break
case header := <-headerChan:

// Latest block number seen, is getting safely updated, as
// soon as new block mined data gets propagated to network
status.SetLatestBlockNumber(header.Number.Uint64())

if first {

// Starting now, to be used for calculating system performance, uptime etc.
Expand All @@ -60,19 +65,13 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
// blocks from highest block number it fetched last time to current network block number
// i.e. trying to fill up gap, which was caused when `ette` was offline
//
// But in reverse direction i.e. from 100 to 50, where `ette` fetched upto 50 last time & 100
// is latest block, got mined in network
//
// Yes it's going refetch 50, due to the fact, some portions of 50 might be missed in last try
// So, it'll check & decide whether persisting again is required or not
//
// This backward traversal mechanism gives us more recent blockchain happenings to cover
// Backward traversal mechanism gives us more recent blockchain happenings to cover
go SyncBlocksByRange(connection.RPC, _db, redis, header.Number.Uint64()-1, currentHighestBlockNumber, status)

// Starting go routine for fetching blocks `ette` failed to process in previous attempt
//
// Uses Redis backed queue for fetching pending block hash & retries
go retryBlockFetching(connection.RPC, _db, redis, status)
go RetryQueueManager(connection.RPC, _db, redis, status)

// Making sure on when next latest block header is received, it'll not
// start another syncer
Expand All @@ -93,6 +92,41 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
// so that it gets processed immediately
func(blockHash common.Hash, blockNumber string) {

// Attempting to submit all blocks to job processor queue
// if more blocks are present in non-final queue, than actually
// should be
for GetUnfinalizedQueueLength(redis) > int64(cfg.GetBlockConfirmations()) {

// Before submitting new block processing job
// checking whether there exists any block in unfinalized
// block queue or not
//
// If yes, we're attempting to process it, because it has now
// achieved enough confirmations
if CheckIfOldestBlockIsConfirmed(redis, status) {

oldest := PopOldestBlockFromUnfinalizedQueue(redis)

log.Print(color.Yellow.Sprintf("[*] Attempting to process finalised block %d [ Latest Block : %d | In Queue : %d ]", oldest, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

wp.Submit(func() {

FetchBlockByNumber(connection.RPC,
oldest,
_db,
redis,
status)

})

} else {
// If oldest block is not finalized, no meaning
// staying here, we'll revisit it some time in future
break
}

}

wp.Submit(func() {

FetchBlockByHash(connection.RPC,
Expand Down
4 changes: 4 additions & 0 deletions app/block/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
// PublishBlock - Attempts to publish block data to Redis pubsub channel
func PublishBlock(block *db.PackedBlock, redis *d.RedisInfo) {

if block == nil {
return
}

if err := redis.Client.Publish(context.Background(), "block", &d.Block{
Hash: block.Block.Hash,
Number: block.Block.Number,
Expand Down
36 changes: 19 additions & 17 deletions app/block/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ import (
"gorm.io/gorm"
)

// Pop oldest block number from Redis queue & try to fetch it in different go routine
// RetryQueueManager - Pop oldest block number from Redis backed retry
// queue & try to fetch it in different go routine
//
// Sleeps for 1000 milliseconds
//
// Keeps repeating
func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) {
func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) {
sleep := func() {
time.Sleep(time.Duration(500) * time.Millisecond)
time.Sleep(time.Duration(1000) * time.Millisecond)
}

// Creating worker pool and submitting jobs as soon as it's determined
Expand All @@ -36,7 +37,7 @@ func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.Redi
sleep()

// Popping oldest element from Redis queue
blockNumber, err := redis.Client.LPop(context.Background(), redis.QueueName).Result()
blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueueName).Result()
if err != nil {
continue
}
Expand All @@ -47,7 +48,7 @@ func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.Redi
continue
}

log.Print(color.Cyan.Sprintf("[~] Retrying block : %d [ In Queue : %d ]", parsedBlockNumber, getRetryQueueLength(redis)))
log.Print(color.Cyan.Sprintf("[~] Retrying block : %d [ In Queue : %d ]", parsedBlockNumber, GetRetryQueueLength(redis)))

// Submitting block processor job into pool
// which will be picked up & processed
Expand All @@ -67,39 +68,40 @@ func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.Redi
}
}

// Pushes failed to fetch block number at end of Redis queue
// PushBlockIntoRetryQueue - Pushes failed to fetch block number at end of Redis queue
// given it has not already been added
func pushBlockHashIntoRedisQueue(redis *data.RedisInfo, blockNumber string) {
func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) {
// Checking presence first & then deciding whether to add it or not
if !checkExistenceOfBlockNumberInRedisQueue(redis, blockNumber) {
if !CheckBlockInRetryQueue(redis, blockNumber) {

if err := redis.Client.RPush(context.Background(), redis.QueueName, blockNumber).Err(); err != nil {
log.Print(color.Red.Sprintf("[!] Failed to push block %s : %s", blockNumber, err.Error()))
if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueueName, blockNumber).Result(); err != nil {
log.Print(color.Red.Sprintf("[!] Failed to push block %s into retry queue : %s", blockNumber, err.Error()))
}

}
}

// Checks whether block number is already added in Redis backed retry queue or not
// CheckBlockInRetryQueue - Checks whether block number is already added in
// Redis backed retry queue or not
//
// If yes, it'll not be added again
//
// Note: this feature of checking index of value in redis queue,
// was added in Redis v6.0.6 : https://redis.io/commands/lpos
func checkExistenceOfBlockNumberInRedisQueue(redis *data.RedisInfo, blockNumber string) bool {
if _, err := redis.Client.LPos(context.Background(), redis.QueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil {
func CheckBlockInRetryQueue(redis *data.RedisInfo, blockNumber string) bool {
if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil {
return false
}

return true
}

// Returns redis backed retry queue length
func getRetryQueueLength(redis *data.RedisInfo) int64 {
// GetRetryQueueLength - Returns redis backed retry queue length
func GetRetryQueueLength(redis *data.RedisInfo) int64 {

blockCount, err := redis.Client.LLen(context.Background(), redis.QueueName).Result()
blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueueName).Result()
if err != nil {
log.Printf(color.Red.Sprintf("[!] Failed to determine Redis queue length : %s", err.Error()))
log.Printf(color.Red.Sprintf("[!] Failed to determine retry queue length : %s", err.Error()))
}

return blockCount
Expand Down
26 changes: 25 additions & 1 deletion app/block/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,20 @@ func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redis *data.Redis
//
// Job specification is provided in `Job` struct
job := func(wp *workerpool.WorkerPool, j *d.Job) {

wp.Submit(func() {

if !HasBlockFinalized(status, j.Block) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d [ Latest Block : %d | In Queue : %d ]", j.Block, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", j.Block))
return

}

FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status)

})
Expand Down Expand Up @@ -124,11 +136,23 @@ func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redis *data.R
//
// Job specification is provided in `Job` struct
job := func(wp *workerpool.WorkerPool, j *d.Job) {

wp.Submit(func() {

if !HasBlockFinalized(status, j.Block) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d [ Latest Block : %d | In Queue : %d ]", j.Block, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", j.Block))
return

}

// Worker fetches block by number from local storage
block := db.GetBlock(j.DB, j.Block)
if block == nil && !checkExistenceOfBlockNumberInRedisQueue(redis, fmt.Sprintf("%d", j.Block)) {
if block == nil && !CheckBlockInRetryQueue(redis, fmt.Sprintf("%d", j.Block)) {
// If not found, block fetching cycle is run, for this block
FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status)
}
Expand Down
Loading

0 comments on commit 769db23

Please sign in to comment.