From f648b36d8044c881815e40e64bcbd9c1340ec72d Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 11:57:58 +0530 Subject: [PATCH 01/20] changed field name of struct --- app/app.go | 6 +++--- app/block/retry.go | 8 ++++---- app/block/unfinalized_blocks.go | 10 +++++----- app/data/data.go | 6 +++--- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/app/app.go b/app/app.go index e9615656..221de1cd 100644 --- a/app/app.go +++ b/app/app.go @@ -73,9 +73,9 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne func Run(configFile, subscriptionPlansFile string) { _connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile) _redisInfo := d.RedisInfo{ - Client: _redisClient, - BlockRetryQueueName: "blocks_in_retry_queue", - UnfinalizedBlocksQueueName: "unfinalized_blocks", + Client: _redisClient, + BlockRetryQueue: "blocks_in_retry_queue", + UnfinalizedBlocksQueue: "unfinalized_blocks", } // Attempting to listen to Ctrl+C signal diff --git a/app/block/retry.go b/app/block/retry.go index 41c82dae..8fae43f3 100644 --- a/app/block/retry.go +++ b/app/block/retry.go @@ -37,7 +37,7 @@ func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.Redis sleep() // Popping oldest element from Redis queue - blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueueName).Result() + blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueue).Result() if err != nil { continue } @@ -82,7 +82,7 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) { // Checking presence first & then deciding whether to add it or not if !CheckBlockInRetryQueue(redis, blockNumber) { - if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueueName, blockNumber).Result(); err != nil { + if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueue, blockNumber).Result(); err != nil { log.Print(color.Red.Sprintf("[!] Failed to push block %s into retry queue : %s", blockNumber, err.Error())) } @@ -97,7 +97,7 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) { // Note: this feature of checking index of value in redis queue, // was added in Redis v6.0.6 : https://redis.io/commands/lpos func CheckBlockInRetryQueue(redis *data.RedisInfo, blockNumber string) bool { - if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil { + if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueue, blockNumber, _redis.LPosArgs{}).Result(); err != nil { return false } @@ -107,7 +107,7 @@ func CheckBlockInRetryQueue(redis *data.RedisInfo, blockNumber string) bool { // GetRetryQueueLength - Returns redis backed retry queue length func GetRetryQueueLength(redis *data.RedisInfo) int64 { - blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueueName).Result() + blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueue).Result() if err != nil { log.Printf(color.Red.Sprintf("[!] Failed to determine retry queue length : %s", err.Error())) } diff --git a/app/block/unfinalized_blocks.go b/app/block/unfinalized_blocks.go index 39994c0f..ac5b08b3 100644 --- a/app/block/unfinalized_blocks.go +++ b/app/block/unfinalized_blocks.go @@ -14,7 +14,7 @@ import ( // i.e. element at 0th index func GetOldestBlockFromUnfinalizedQueue(redis *data.RedisInfo) string { - blockNumber, err := redis.Client.LIndex(context.Background(), redis.UnfinalizedBlocksQueueName, 0).Result() + blockNumber, err := redis.Client.LIndex(context.Background(), redis.UnfinalizedBlocksQueue, 0).Result() if err != nil { return "" } @@ -45,7 +45,7 @@ func CheckIfOldestBlockIsConfirmed(redis *data.RedisInfo, status *data.StatusHol // queue, which can be processed now func PopOldestBlockFromUnfinalizedQueue(redis *data.RedisInfo) uint64 { - blockNumber, err := redis.Client.LPop(context.Background(), redis.UnfinalizedBlocksQueueName).Result() + blockNumber, err := redis.Client.LPop(context.Background(), redis.UnfinalizedBlocksQueue).Result() if err != nil { return 0 } @@ -66,7 +66,7 @@ func PushBlockIntoUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) { // Checking presence first & then deciding whether to add it or not if !CheckBlockInUnfinalizedQueue(redis, blockNumber) { - if _, err := redis.Client.RPush(context.Background(), redis.UnfinalizedBlocksQueueName, blockNumber).Result(); err != nil { + if _, err := redis.Client.RPush(context.Background(), redis.UnfinalizedBlocksQueue, blockNumber).Result(); err != nil { log.Print(color.Red.Sprintf("[!] Failed to push block %s into non-final block queue : %s", blockNumber, err.Error())) } @@ -81,7 +81,7 @@ func PushBlockIntoUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) { // Note: this feature of checking index of value in redis queue, // was added in Redis v6.0.6 : https://redis.io/commands/lpos func CheckBlockInUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) bool { - if _, err := redis.Client.LPos(context.Background(), redis.UnfinalizedBlocksQueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil { + if _, err := redis.Client.LPos(context.Background(), redis.UnfinalizedBlocksQueue, blockNumber, _redis.LPosArgs{}).Result(); err != nil { return false } @@ -91,7 +91,7 @@ func CheckBlockInUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) boo // GetUnfinalizedQueueLength - Returns redis backed unfinalized block number queue length func GetUnfinalizedQueueLength(redis *data.RedisInfo) int64 { - blockCount, err := redis.Client.LLen(context.Background(), redis.UnfinalizedBlocksQueueName).Result() + blockCount, err := redis.Client.LLen(context.Background(), redis.UnfinalizedBlocksQueue).Result() if err != nil { log.Printf(color.Red.Sprintf("[!] Failed to determine non-final block queue length : %s", err.Error())) } diff --git a/app/data/data.go b/app/data/data.go index d8e3de5f..702157c5 100644 --- a/app/data/data.go +++ b/app/data/data.go @@ -135,9 +135,9 @@ func (s *StatusHolder) SetLatestBlockNumber(num uint64) { // RedisInfo - Holds redis related information in this struct, to be used // when passing to functions as argument type RedisInfo struct { - Client *redis.Client // using this object `ette` will talk to Redis - BlockRetryQueueName string // retry queue name, for storing block numbers - UnfinalizedBlocksQueueName string // stores unfinalized block numbers, processes + Client *redis.Client // using this object `ette` will talk to Redis + BlockRetryQueue string // retry queue name, for storing block numbers + UnfinalizedBlocksQueue string // stores unfinalized block numbers, processes // them later after reaching finality ( as set by deployer of `ette` ) } From b493a4749af7e43912a0ed5944edfa031d6f1ddf Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 12:16:00 +0530 Subject: [PATCH 02/20] explicitly mention to sort block numbers ascendically --- app/db/query.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/db/query.go b/app/db/query.go index 076e439e..4de51f7b 100644 --- a/app/db/query.go +++ b/app/db/query.go @@ -14,14 +14,14 @@ func GetAllBlockNumbersInRange(db *gorm.DB, from uint64, to uint64) []uint64 { var blocks []uint64 if from < to { - if err := db.Model(&Blocks{}).Where("number >= ? and number <= ?", from, to).Select("number").Find(&blocks).Error; err != nil { + if err := db.Model(&Blocks{}).Where("number >= ? and number <= ?", from, to).Order("number asc").Select("number").Find(&blocks).Error; err != nil { log.Printf("[!] Failed to fetch block numbers by range : %s\n", err.Error()) return nil } } else { - if err := db.Model(&Blocks{}).Where("number >= ? and number <= ?", to, from).Select("number").Find(&blocks).Error; err != nil { + if err := db.Model(&Blocks{}).Where("number >= ? and number <= ?", to, from).Order("number asc").Select("number").Find(&blocks).Error; err != nil { log.Printf("[!] Failed to fetch block numbers by range : %s\n", err.Error()) return nil From 844b99a7a67f1b7d752fff197c5811244a8a1e2b Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 14:16:59 +0530 Subject: [PATCH 03/20] finds missing blocks from number range --- app/block/syncer.go | 61 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/app/block/syncer.go b/app/block/syncer.go index 9b3c1691..e433c65c 100644 --- a/app/block/syncer.go +++ b/app/block/syncer.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "runtime" + "sort" "time" "github.com/ethereum/go-ethereum/ethclient" @@ -16,6 +17,27 @@ import ( "gorm.io/gorm" ) +// FindMissingBlocksInRange - Given ascending ordered block numbers read from DB +// attempts to find out which numbers are missing in [from, to] range +// where both ends are inclusive +func FindMissingBlocksInRange(found []uint64, shouldBeFrom uint64, shouldBeTo uint64) []uint64 { + + absent := make([]uint64, 0) + + for b := shouldBeFrom; b <= shouldBeTo; b++ { + + _i := sort.Search(len(found), func(j int) bool { return found[j] >= b }) + + if !(_i < len(found) && found[_i] == b) { + absent = append(absent, b) + } + + } + + return absent + +} + // Syncer - Given ascending block number range i.e. fromBlock <= toBlock // fetches blocks in order {fromBlock, toBlock, fromBlock + 1, toBlock - 1, fromBlock + 2, toBlock - 2 ...} // while running n workers concurrently, where n = number of cores this machine has @@ -43,6 +65,45 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB }) } + // attempting to fetch X blocks ( max ) at a time, by range + // + // @note This can be improved + var step uint64 = 10000 + + for i := fromBlock; i <= toBlock; i += step { + + blocks := db.GetAllBlockNumbersInRange(_db, i, i+step-1) + // No blocks present in DB, in queried range + if blocks == nil { + + // So submitting all of them to job processor queue + for j := i; j <= i+step-1; j++ { + + job(j) + + } + continue + + } + + // All blocks in range present in DB ✅ + if step == uint64(len(blocks)) { + continue + } + + // Some blocks are missing in range, attempting to find them + for index, value := range blocks { + + if value == fromBlock+uint64(index) { + continue + } + + job(value) + + } + + } + for i <= j { // This condition to be arrived at when range has odd number of elements if i == j { From 604ca07c8efd574977dd9207997cca49d11a3a76 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 14:28:32 +0530 Subject: [PATCH 04/20] improved way of finding missing blocks in range, in terms of memory allocation --- app/block/syncer.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/app/block/syncer.go b/app/block/syncer.go index e433c65c..27dc5d6b 100644 --- a/app/block/syncer.go +++ b/app/block/syncer.go @@ -20,15 +20,18 @@ import ( // FindMissingBlocksInRange - Given ascending ordered block numbers read from DB // attempts to find out which numbers are missing in [from, to] range // where both ends are inclusive -func FindMissingBlocksInRange(found []uint64, shouldBeFrom uint64, shouldBeTo uint64) []uint64 { +func FindMissingBlocksInRange(found []uint64, from uint64, to uint64) []uint64 { - absent := make([]uint64, 0) + // creating slice with backing array of larger size + // to avoid potential memory allocation during iteration + // over loop + absent := make([]uint64, 0, to-from+1) - for b := shouldBeFrom; b <= shouldBeTo; b++ { + for b := from; b <= to; b++ { - _i := sort.Search(len(found), func(j int) bool { return found[j] >= b }) + idx := sort.Search(len(found), func(j int) bool { return found[j] >= b }) - if !(_i < len(found) && found[_i] == b) { + if !(idx < len(found) && found[idx] == b) { absent = append(absent, b) } From cef65bd75de7a6236c4a9e5cc86586672a5b66de Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 15:14:37 +0530 Subject: [PATCH 05/20] improved way of syncing missing blocks of range --- app/block/syncer.go | 28 ++++------------------------ 1 file changed, 4 insertions(+), 24 deletions(-) diff --git a/app/block/syncer.go b/app/block/syncer.go index 27dc5d6b..5ca0193f 100644 --- a/app/block/syncer.go +++ b/app/block/syncer.go @@ -42,7 +42,7 @@ func FindMissingBlocksInRange(found []uint64, from uint64, to uint64) []uint64 { } // Syncer - Given ascending block number range i.e. fromBlock <= toBlock -// fetches blocks in order {fromBlock, toBlock, fromBlock + 1, toBlock - 1, fromBlock + 2, toBlock - 2 ...} +// attempts to fetch missing blocks in that range // while running n workers concurrently, where n = number of cores this machine has // // Waits for all of them to complete @@ -53,8 +53,6 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB } wp := workerpool.New(runtime.NumCPU() * int(cfg.GetConcurrencyFactor())) - i := fromBlock - j := toBlock // Jobs need to be submitted using this interface, while // just mentioning which block needs to be fetched @@ -95,29 +93,11 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB } // Some blocks are missing in range, attempting to find them - for index, value := range blocks { - - if value == fromBlock+uint64(index) { - continue - } - - job(value) - - } - - } - - for i <= j { - // This condition to be arrived at when range has odd number of elements - if i == j { - job(i) - } else { - job(i) - job(j) + // and pushing their processing request to job queue + for _, v := range FindMissingBlocksInRange(blocks, i, i+step-1) { + job(v) } - i++ - j-- } wp.StopWait() From 3a39270c8683744b70d6da053843301dc07fcf36 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 18:01:03 +0530 Subject: [PATCH 06/20] count of blocks to be received from DB, handling it correctly --- app/block/syncer.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/app/block/syncer.go b/app/block/syncer.go index 5ca0193f..47ce6157 100644 --- a/app/block/syncer.go +++ b/app/block/syncer.go @@ -75,7 +75,7 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB blocks := db.GetAllBlockNumbersInRange(_db, i, i+step-1) // No blocks present in DB, in queried range - if blocks == nil { + if blocks == nil || len(blocks) == 0 { // So submitting all of them to job processor queue for j := i; j <= i+step-1; j++ { @@ -87,8 +87,21 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB } + countShouldBe := step + + // Attempting to fix how many block numbers we should be ideally + // receiving back in response, sent to DB + // + // This will only be executed when asked for 10 block numbers ( = `step` ) + // starting at 1 ( = `fromBlock` ), but actually `toBlock` is set to 5 + // + // In that case, count of blocks received from DB can't be more than 5 at any cost. + if i+step-1 > toBlock { + countShouldBe = toBlock - i + 1 + } + // All blocks in range present in DB ✅ - if step == uint64(len(blocks)) { + if countShouldBe == uint64(len(blocks)) { continue } From 692b870527828a9949ed7849327e5996d915233a Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 18:01:56 +0530 Subject: [PATCH 07/20] just a note which can be taken up in future phase of development --- app/snapshot/write.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app/snapshot/write.go b/app/snapshot/write.go index d77b4011..81bc029b 100644 --- a/app/snapshot/write.go +++ b/app/snapshot/write.go @@ -58,6 +58,8 @@ func TakeSnapshot(db *gorm.DB, file string, start uint64, end uint64, count uint go PutIntoSink(fd, count, data, done) // attempting to fetch X blocks ( max ) at a time, by range + // + // @note This can be improved var step uint64 = 10000 // stepping through blocks present in DB From 9dab9d00e05854f510e71900c1f8f5c423a26193 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 19:38:02 +0530 Subject: [PATCH 08/20] manage block retry attempt count table --- app/block/retry.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/app/block/retry.go b/app/block/retry.go index 8fae43f3..cdaec8c3 100644 --- a/app/block/retry.go +++ b/app/block/retry.go @@ -86,7 +86,49 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) { log.Print(color.Red.Sprintf("[!] Failed to push block %s into retry queue : %s", blockNumber, err.Error())) } + IncrementAttemptCountOfBlockNumber(redis, blockNumber) + + } +} + +// IncrementAttemptCountOfBlockNumber - Given block number, increments failed attempt count +// of processing this block +// +// If block doesn't yet exist in tracker table, it'll be inserted first time & counter to be set to 1 +func IncrementAttemptCountOfBlockNumber(redis *data.RedisInfo, blockNumber string) { + + if _, err := redis.Client.HIncrBy(context.Background(), redis.BlockRetryCountTable, blockNumber, 1).Result(); err != nil { + log.Print(color.Red.Sprintf("[!] Failed to increment attempt count of block %s : %s", blockNumber, err.Error())) + } + +} + +// CheckBlockInAttemptCounterTable - Checks whether given block number already exist in +// attempt count tracker table +func CheckBlockInAttemptCounterTable(redis *data.RedisInfo, blockNumber string) bool { + + if _, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result(); err != nil { + return false + } + + return true + +} + +// RemoveBlockFromAttemptCountTrackerTable - Attempt to delete block number's +// associated attempt count, given it already exists in table +// +// This is supposed to be invoked when a block is considered to be successfully processed +func RemoveBlockFromAttemptCountTrackerTable(redis *data.RedisInfo, blockNumber string) { + + if CheckBlockInAttemptCounterTable(redis, blockNumber) { + + if _, err := redis.Client.HDel(context.Background(), redis.BlockRetryCountTable, blockNumber).Result(); err != nil { + log.Print(color.Red.Sprintf("[!] Failed to delete attempt count of successful block %s : %s", blockNumber, err.Error())) + } + } + } // CheckBlockInRetryQueue - Checks whether block number is already added in @@ -97,11 +139,13 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) { // Note: this feature of checking index of value in redis queue, // was added in Redis v6.0.6 : https://redis.io/commands/lpos func CheckBlockInRetryQueue(redis *data.RedisInfo, blockNumber string) bool { + if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueue, blockNumber, _redis.LPosArgs{}).Result(); err != nil { return false } return true + } // GetRetryQueueLength - Returns redis backed retry queue length From a650a0220a515aba0579df5bc84c8c0ac618cdba Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 19:38:41 +0530 Subject: [PATCH 09/20] updated struct for holding redis related info --- app/data/data.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/app/data/data.go b/app/data/data.go index 702157c5..3a6ccc20 100644 --- a/app/data/data.go +++ b/app/data/data.go @@ -135,9 +135,11 @@ func (s *StatusHolder) SetLatestBlockNumber(num uint64) { // RedisInfo - Holds redis related information in this struct, to be used // when passing to functions as argument type RedisInfo struct { - Client *redis.Client // using this object `ette` will talk to Redis - BlockRetryQueue string // retry queue name, for storing block numbers - UnfinalizedBlocksQueue string // stores unfinalized block numbers, processes + Client *redis.Client // using this object `ette` will talk to Redis + BlockRetryQueue string // retry queue name, for storing block numbers + BlockRetryCountTable string // keeping track of how many times this block was attempted + // to be processed in past, but went unsuccessful + UnfinalizedBlocksQueue string // stores unfinalized block numbers, processes // them later after reaching finality ( as set by deployer of `ette` ) } From de04d92a605567e8092f40078bbd2333b5c39f67 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 19:48:07 +0530 Subject: [PATCH 10/20] return parsed current attempt count for block number --- app/block/retry.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/app/block/retry.go b/app/block/retry.go index cdaec8c3..0ad833e9 100644 --- a/app/block/retry.go +++ b/app/block/retry.go @@ -115,6 +115,24 @@ func CheckBlockInAttemptCounterTable(redis *data.RedisInfo, blockNumber string) } +// GetAttemptCountFromTable - Returns current attempt counter from table +// for given block number +func GetAttemptCountFromTable(redis *data.RedisInfo, blockNumber string) uint64 { + + count, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result() + if err != nil { + return 0 + } + + parsedCount, err := strconv.ParseUint(count, 10, 64) + if err != nil { + return 0 + } + + return parsedCount + +} + // RemoveBlockFromAttemptCountTrackerTable - Attempt to delete block number's // associated attempt count, given it already exists in table // From 94302d035fab3846b37d8d93d809766c21421e04 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 20:30:25 +0530 Subject: [PATCH 11/20] incrementing attempt count of blocks in retry queue while wrapping back to 0 as soon as it reaches 101 --- app/block/retry.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/app/block/retry.go b/app/block/retry.go index 0ad833e9..c6f534c8 100644 --- a/app/block/retry.go +++ b/app/block/retry.go @@ -42,6 +42,14 @@ func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.Redis continue } + attemptCount := GetAttemptCountFromTable(redis, blockNumber) + if attemptCount != 0 && attemptCount%3 != 0 { + + PushBlockIntoRetryQueue(redis, blockNumber) + continue + + } + // Parsing string blockNumber to uint64 parsedBlockNumber, err := strconv.ParseUint(blockNumber, 10, 64) if err != nil { @@ -95,9 +103,13 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) { // of processing this block // // If block doesn't yet exist in tracker table, it'll be inserted first time & counter to be set to 1 +// +// It'll be wrapped back to 0 as soon as it reaches 101 func IncrementAttemptCountOfBlockNumber(redis *data.RedisInfo, blockNumber string) { - if _, err := redis.Client.HIncrBy(context.Background(), redis.BlockRetryCountTable, blockNumber, 1).Result(); err != nil { + wrappedAttemptCount := (GetAttemptCountFromTable(redis, blockNumber) + 1) % 101 + + if _, err := redis.Client.HSet(context.Background(), redis.BlockRetryCountTable, blockNumber, wrappedAttemptCount).Result(); err != nil { log.Print(color.Red.Sprintf("[!] Failed to increment attempt count of block %s : %s", blockNumber, err.Error())) } From 9764c3da0d41c88c3ab842dd8cdc7fb15cc4ad89 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 20:33:48 +0530 Subject: [PATCH 12/20] returning status of block body processing from processor function --- app/block/block.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/app/block/block.go b/app/block/block.go index 51782c7e..38a2bb18 100644 --- a/app/block/block.go +++ b/app/block/block.go @@ -25,7 +25,7 @@ func HasBlockFinalized(status *d.StatusHolder, number uint64) bool { } // ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data -func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) { +func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) bool { // Closure managing publishing whole block data i.e. block header, txn(s), event logs // on redis pubsub channel @@ -60,7 +60,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt))) status.IncrementBlocksProcessed() - return + return true } @@ -71,7 +71,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // Pushing into unfinalized block queue, to be picked up only when // finality for this block has been achieved PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number)) - return + return true } @@ -82,7 +82,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // If failed to persist, we'll put it in retry queue PushBlockIntoRetryQueue(redis, block.Number().String()) - return + return false } @@ -90,7 +90,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt))) status.IncrementBlocksProcessed() - return + return true } @@ -165,7 +165,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm if !(result.Failure == 0) { PushBlockIntoRetryQueue(redis, block.Number().String()) - return + return false } @@ -183,7 +183,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt))) status.IncrementBlocksProcessed() - return + return true } @@ -194,7 +194,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // Pushing into unfinalized block queue, to be picked up only when // finality for this block has been achieved PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number)) - return + return true } @@ -205,12 +205,14 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // If failed to persist, we'll put it in retry queue PushBlockIntoRetryQueue(redis, block.Number().String()) - return + return false } // Successfully processed block log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt))) + status.IncrementBlocksProcessed() + return true } From 4a98bd018d6f4e7b2a2716537d3158c5691e087c Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 20:41:48 +0530 Subject: [PATCH 13/20] removing entry of block number from attempt count table when successfully processed --- app/block/fetch.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/app/block/fetch.go b/app/block/fetch.go index 1d27ffbd..3e28c4ba 100644 --- a/app/block/fetch.go +++ b/app/block/fetch.go @@ -53,7 +53,14 @@ func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, r return } - ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt) + // If attempt to process block by number went successful + // we can consider removing this block number's entry from + // attempt count tracker table + if ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt) { + + RemoveBlockFromAttemptCountTrackerTable(redis, fmt.Sprintf("%d", number)) + + } } From 7c5ca4c1a94b82d9c5389efd8fd07a1ae00ca5ab Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 21:22:58 +0530 Subject: [PATCH 14/20] set attempt count tracker table name --- app/app.go | 1 + 1 file changed, 1 insertion(+) diff --git a/app/app.go b/app/app.go index 221de1cd..2ef767c8 100644 --- a/app/app.go +++ b/app/app.go @@ -75,6 +75,7 @@ func Run(configFile, subscriptionPlansFile string) { _redisInfo := d.RedisInfo{ Client: _redisClient, BlockRetryQueue: "blocks_in_retry_queue", + BlockRetryCountTable: "attempt_count_tracker_table", UnfinalizedBlocksQueue: "unfinalized_blocks", } From a993221013a5b389d5e963bcbec0283e9d6d86d3 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 21:58:15 +0530 Subject: [PATCH 15/20] considering what's upper limit of block number range when finding missing blocks from found block numbers in DB --- app/block/syncer.go | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/app/block/syncer.go b/app/block/syncer.go index 47ce6157..5e4f89e5 100644 --- a/app/block/syncer.go +++ b/app/block/syncer.go @@ -73,7 +73,18 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB for i := fromBlock; i <= toBlock; i += step { - blocks := db.GetAllBlockNumbersInRange(_db, i, i+step-1) + var blocks []uint64 + + if i+step-1 >= toBlock { + + blocks = db.GetAllBlockNumbersInRange(_db, i, toBlock) + + } else { + + blocks = db.GetAllBlockNumbersInRange(_db, i, i+step-1) + + } + // No blocks present in DB, in queried range if blocks == nil || len(blocks) == 0 { @@ -105,10 +116,22 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB continue } - // Some blocks are missing in range, attempting to find them - // and pushing their processing request to job queue - for _, v := range FindMissingBlocksInRange(blocks, i, i+step-1) { - job(v) + if i+step-1 >= toBlock { + + // Some blocks are missing in range, attempting to find them + // and pushing their processing request to job queue + for _, v := range FindMissingBlocksInRange(blocks, i, toBlock) { + job(v) + } + + } else { + + // Some blocks are missing in range, attempting to find them + // and pushing their processing request to job queue + for _, v := range FindMissingBlocksInRange(blocks, i, i+step-1) { + job(v) + } + } } From eba1b9b0083f8654cf86cad7072759f53264266b Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 29 Jan 2021 22:28:28 +0530 Subject: [PATCH 16/20] simpler implementation while finding missing blocks --- app/block/syncer.go | 52 ++++++++++----------------------------------- 1 file changed, 11 insertions(+), 41 deletions(-) diff --git a/app/block/syncer.go b/app/block/syncer.go index 5e4f89e5..fddebfd5 100644 --- a/app/block/syncer.go +++ b/app/block/syncer.go @@ -73,23 +73,18 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB for i := fromBlock; i <= toBlock; i += step { - var blocks []uint64 - - if i+step-1 >= toBlock { - - blocks = db.GetAllBlockNumbersInRange(_db, i, toBlock) - - } else { - - blocks = db.GetAllBlockNumbersInRange(_db, i, i+step-1) - + toShouldbe := i + step - 1 + if toShouldbe > toBlock { + toShouldbe = toBlock } + blocks := db.GetAllBlockNumbersInRange(_db, i, toShouldbe) + // No blocks present in DB, in queried range if blocks == nil || len(blocks) == 0 { // So submitting all of them to job processor queue - for j := i; j <= i+step-1; j++ { + for j := i; j <= toShouldbe; j++ { job(j) @@ -98,40 +93,15 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB } - countShouldBe := step - - // Attempting to fix how many block numbers we should be ideally - // receiving back in response, sent to DB - // - // This will only be executed when asked for 10 block numbers ( = `step` ) - // starting at 1 ( = `fromBlock` ), but actually `toBlock` is set to 5 - // - // In that case, count of blocks received from DB can't be more than 5 at any cost. - if i+step-1 > toBlock { - countShouldBe = toBlock - i + 1 - } - // All blocks in range present in DB ✅ - if countShouldBe == uint64(len(blocks)) { + if toShouldbe-i+1 == uint64(len(blocks)) { continue } - if i+step-1 >= toBlock { - - // Some blocks are missing in range, attempting to find them - // and pushing their processing request to job queue - for _, v := range FindMissingBlocksInRange(blocks, i, toBlock) { - job(v) - } - - } else { - - // Some blocks are missing in range, attempting to find them - // and pushing their processing request to job queue - for _, v := range FindMissingBlocksInRange(blocks, i, i+step-1) { - job(v) - } - + // Some blocks are missing in range, attempting to find them + // and pushing their processing request to job queue + for _, v := range FindMissingBlocksInRange(blocks, i, toShouldbe) { + job(v) } } From 9819ed228ea68eaac775e348c8daf5ada076e90e Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 30 Jan 2021 10:45:56 +0530 Subject: [PATCH 17/20] move block from start of queue to end --- app/block/unfinalized_blocks.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/app/block/unfinalized_blocks.go b/app/block/unfinalized_blocks.go index ac5b08b3..6dfd207e 100644 --- a/app/block/unfinalized_blocks.go +++ b/app/block/unfinalized_blocks.go @@ -2,6 +2,7 @@ package block import ( "context" + "fmt" "log" "strconv" @@ -73,6 +74,15 @@ func PushBlockIntoUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) { } } +// MoveUnfinalizedOldestBlockToEnd - Attempts to pop oldest block ( i.e. left most block ) +// from unfinalized queue & pushes it back to end of queue, so that other blocks waiting after +// this one can get be attempted to be processed by workers +func MoveUnfinalizedOldestBlockToEnd(redis *data.RedisInfo) { + + PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", PopOldestBlockFromUnfinalizedQueue(redis))) + +} + // CheckBlockInUnfinalizedQueue - Checks whether block number is already added in // Redis backed unfinalized queue or not // From 74669bc6d426670809c503d0e6f2f3eed48bbe8d Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 30 Jan 2021 10:47:00 +0530 Subject: [PATCH 18/20] attempt to reorganize unfinalized block queue --- app/block/listener.go | 19 +++++++++++++------ app/block/unfinalized_blocks.go | 2 ++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/app/block/listener.go b/app/block/listener.go index e287227e..97e865cc 100644 --- a/app/block/listener.go +++ b/app/block/listener.go @@ -2,6 +2,7 @@ package block import ( "context" + "fmt" "log" "runtime" @@ -87,7 +88,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // // Though it'll be picked up sometime in future ( by missing block finder ), but it can be safely handled now // so that it gets processed immediately - func(blockHash common.Hash, blockNumber string) { + func(blockHash common.Hash, blockNumber uint64) { // When only processing blocks in real-time mode // no need to check what's present in unfinalized block number queue @@ -132,9 +133,15 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, }(oldest) } else { - // If oldest block is not finalized, no meaning - // staying here, we'll revisit it some time in future - break + + // If left most block is not yet finalized, it'll attempt to + // reorganize that queue so that other blocks waiting to be processed + // can get that opportunity + // + // This situation generally occurs due to concurrent pattern implemented + // in block processor + MoveUnfinalizedOldestBlockToEnd(redis) + } } @@ -145,14 +152,14 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, FetchBlockByHash(connection.RPC, blockHash, - blockNumber, + fmt.Sprintf("%d", blockNumber), _db, redis, status) }) - }(header.Hash(), header.Number.String()) + }(header.Hash(), header.Number.Uint64()) } } diff --git a/app/block/unfinalized_blocks.go b/app/block/unfinalized_blocks.go index 6dfd207e..4fdfd5c0 100644 --- a/app/block/unfinalized_blocks.go +++ b/app/block/unfinalized_blocks.go @@ -77,6 +77,8 @@ func PushBlockIntoUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) { // MoveUnfinalizedOldestBlockToEnd - Attempts to pop oldest block ( i.e. left most block ) // from unfinalized queue & pushes it back to end of queue, so that other blocks waiting after // this one can get be attempted to be processed by workers +// +// @note This can be improved using `LMOVE` command of Redis ( >= 6.2.0 ) func MoveUnfinalizedOldestBlockToEnd(redis *data.RedisInfo) { PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", PopOldestBlockFromUnfinalizedQueue(redis))) From b2193c363181c682a63ff917ea97181f1c8e8359 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sun, 31 Jan 2021 20:19:33 +0530 Subject: [PATCH 19/20] given two strings attempts to perform case insensitive matching between them --- app/pubsub/subscription.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/app/pubsub/subscription.go b/app/pubsub/subscription.go index cfa8d567..e4e8273b 100644 --- a/app/pubsub/subscription.go +++ b/app/pubsub/subscription.go @@ -1,6 +1,7 @@ package pubsub import ( + "fmt" "log" "regexp" "strings" @@ -156,6 +157,20 @@ func (s *SubscriptionRequest) GetTransactionFilters() []string { return []string{matches[4], matches[6]} } +// CheckSimilarity - Performing case insensitive matching between two +// strings +func CheckSimilarity(first string, second string) bool { + + reg, err := regexp.Compile(fmt.Sprintf("(?i)^(%s)$", first)) + if err != nil { + log.Printf("[!] Failed to parse regex pattern : %s\n", err.Error()) + return false + } + + return reg.MatchString(second) + +} + // DoesMatchWithPublishedTransactionData - All `transaction` topic listeners i.e. subscribers are // going to get notified when new transaction detected, but they will only send those data to client application // ( connected over websocket ), to which client has subscribed to From da0c3df44e8b3aa3e98f59f80bce626fdfc2d67d Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sun, 31 Jan 2021 20:24:54 +0530 Subject: [PATCH 20/20] using case insensitive matching for checking whether pubsub client has subscribed to a certain topic of interest or not --- app/pubsub/subscription.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/app/pubsub/subscription.go b/app/pubsub/subscription.go index e4e8273b..5ecf951b 100644 --- a/app/pubsub/subscription.go +++ b/app/pubsub/subscription.go @@ -97,12 +97,12 @@ func (s *SubscriptionRequest) DoesMatchWithPublishedEventData(event *data.Event) // --- Matching specific topic signature provided by client // application with received event data, published by // redis pub-sub - matchTopicXInEvent := func(topic string, x int) bool { + matchTopicXInEvent := func(topic string, x uint8) bool { // Not all topics will have 4 elements in topics array // // For those cases, if topic signature for that index is {"", "*"} // provided by consumer, then we're safely going to say it's a match - if !(x < len(event.Topics)) { + if !(int(x) < len(event.Topics)) { return topic == "" || topic == "*" } @@ -114,7 +114,7 @@ func (s *SubscriptionRequest) DoesMatchWithPublishedEventData(event *data.Event) status = true // match with specific `topic` signature default: - status = topic == event.Topics[x] + status = CheckSimilarity(topic, event.Topics[x]) } return status @@ -135,7 +135,7 @@ func (s *SubscriptionRequest) DoesMatchWithPublishedEventData(event *data.Event) status = matchTopicXInEvent(filters[1], 0) && matchTopicXInEvent(filters[2], 1) && matchTopicXInEvent(filters[3], 2) && matchTopicXInEvent(filters[4], 3) // match with provided `contract` address default: - if filters[0] == event.Origin { + if CheckSimilarity(filters[0], event.Origin) { status = matchTopicXInEvent(filters[1], 0) && matchTopicXInEvent(filters[2], 1) && matchTopicXInEvent(filters[3], 2) && matchTopicXInEvent(filters[4], 3) } } @@ -191,7 +191,7 @@ func (s *SubscriptionRequest) DoesMatchWithPublishedTransactionData(tx *data.Tra status = true // match with specific `to` address default: - status = to == tx.To + status = CheckSimilarity(to, tx.To) } return status @@ -211,7 +211,7 @@ func (s *SubscriptionRequest) DoesMatchWithPublishedTransactionData(tx *data.Tra status = matchToFieldInTx(filters[1]) // match with provided `from` address default: - if filters[0] == tx.From { + if CheckSimilarity(filters[0], tx.From) { status = matchToFieldInTx(filters[1]) } }