Skip to content

Commit

Permalink
fix: re-enqueue failed block after average block time (#73)
Browse files Browse the repository at this point in the history
## Description

request from Ravi: https://forbole.atlassian.net/browse/BDU-490

## Checklist
- [x] Targeted PR against correct branch.
- [ ] Linked to Github issue with discussion and accepted design OR link to spec that describes this work.
- [ ] Wrote unit tests.  
- [x] Re-reviewed `Files changed` in the Github PR explorer.
  • Loading branch information
huichiaotsou authored and RiccardoM committed Sep 2, 2022
1 parent 3605a07 commit b75639c
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
### Changes
- ([\#71](https://github.com/forbole/juno/pull/71)) Retry RPC client connection upon failure instead of panic
- ([\#72](https://github.com/forbole/juno/pull/72)) Updated missing blocks parsing
- ([\#73](https://github.com/forbole/juno/pull/73)) Re-enqueue failed block after average block time

## v3.3.0
### Changes
Expand Down
2 changes: 1 addition & 1 deletion cmd/migrate/v3/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func migrateConfig() (Config, error) {
ParseGenesis: cfg.Parser.ParseGenesis,
StartHeight: cfg.Parser.StartHeight,
FastSync: cfg.Parser.FastSync,
AvgBlockTime: averageBlockTime,
AvgBlockTime: &averageBlockTime,
},
Logging: cfg.Logging,
Telemetry: cfg.Telemetry,
Expand Down
9 changes: 4 additions & 5 deletions cmd/start/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,13 @@ func enqueueNewBlocks(exportQueue types.HeightQueue, ctx *parser.Context) {
ctx.Logger.Debug("enqueueing new block", "height", currHeight)
exportQueue <- currHeight
}
time.Sleep(config.Cfg.Parser.AvgBlockTime)
time.Sleep(config.GetAvgBlockTime())
}
}

// mustGetLatestHeight tries getting the latest height from the RPC client.
// If after 50 tries no latest height can be found, it returns 0.
func mustGetLatestHeight(ctx *parser.Context) int64 {
avgBlockTime := config.Cfg.Parser.AvgBlockTime

for retryCount := 0; retryCount < 50; retryCount++ {
latestBlockHeight, err := ctx.Node.LatestHeight()
if err == nil {
Expand All @@ -194,9 +192,10 @@ func mustGetLatestHeight(ctx *parser.Context) int64 {

ctx.Logger.Error("failed to get last block from RPCConfig client",
"err", err,
"retry interval", avgBlockTime,
"retry interval", config.GetAvgBlockTime(),
"retry count", retryCount)
time.Sleep(avgBlockTime)

time.Sleep(config.GetAvgBlockTime())
}

return 0
Expand Down
21 changes: 11 additions & 10 deletions parser/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package config
import "time"

type Config struct {
Workers int64 `yaml:"workers"`
ParseNewBlocks bool `yaml:"listen_new_blocks"`
ParseOldBlocks bool `yaml:"parse_old_blocks"`
GenesisFilePath string `yaml:"genesis_file_path,omitempty"`
ParseGenesis bool `yaml:"parse_genesis"`
StartHeight int64 `yaml:"start_height"`
FastSync bool `yaml:"fast_sync,omitempty"`
AvgBlockTime time.Duration `yaml:"average_block_time"`
Workers int64 `yaml:"workers"`
ParseNewBlocks bool `yaml:"listen_new_blocks"`
ParseOldBlocks bool `yaml:"parse_old_blocks"`
GenesisFilePath string `yaml:"genesis_file_path,omitempty"`
ParseGenesis bool `yaml:"parse_genesis"`
StartHeight int64 `yaml:"start_height"`
FastSync bool `yaml:"fast_sync,omitempty"`
AvgBlockTime *time.Duration `yaml:"average_block_time"`
}

// NewParsingConfig allows to build a new Config instance
Expand All @@ -19,7 +19,7 @@ func NewParsingConfig(
parseNewBlocks, parseOldBlocks bool,
parseGenesis bool, genesisFilePath string,
startHeight int64, fastSync bool,
avgBlockTime time.Duration,
avgBlockTime *time.Duration,
) Config {
return Config{
Workers: workers,
Expand All @@ -35,6 +35,7 @@ func NewParsingConfig(

// DefaultParsingConfig returns the default instance of Config
func DefaultParsingConfig() Config {
avgBlockTime := 5 * time.Second
return NewParsingConfig(
1,
true,
Expand All @@ -43,6 +44,6 @@ func DefaultParsingConfig() Config {
"",
1,
false,
5*time.Second,
&avgBlockTime,
)
}
5 changes: 4 additions & 1 deletion parser/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parser
import (
"encoding/json"
"fmt"
"time"

"github.com/forbole/juno/v3/logging"

Expand Down Expand Up @@ -60,7 +61,9 @@ func (w Worker) Start() {

for i := range w.queue {
if err := w.ProcessIfNotExists(i); err != nil {
// re-enqueue any failed job
// re-enqueue any failed job after average block time
time.Sleep(config.GetAvgBlockTime())

// TODO: Implement exponential backoff or max retries for a block height.
go func() {
w.logger.Error("re-enqueueing failed block", "height", i, "err", err)
Expand Down
10 changes: 10 additions & 0 deletions types/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"path"
"time"
)

var (
Expand All @@ -12,3 +13,12 @@ var (
func GetConfigFilePath() string {
return path.Join(HomePath, "config.yaml")
}

// GetAvgBlockTime returns the average_block_time in the configuration file or
// returns 3 seconds if it is not configured
func GetAvgBlockTime() time.Duration {
if Cfg.Parser.AvgBlockTime == nil {
return 3 * time.Second
}
return *Cfg.Parser.AvgBlockTime
}

0 comments on commit b75639c

Please sign in to comment.