Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: re-enqueue failed block after average block time #73

Merged
merged 9 commits into from
Sep 2, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
### Changes
- ([\#71](https://github.com/forbole/juno/pull/71)) Retry RPC client connection upon failure instead of panic
- ([\#72](https://github.com/forbole/juno/pull/72)) Updated missing blocks parsing
- ([\#73](https://github.com/forbole/juno/pull/73)) Re-enqueue failed block after average block time

## v3.3.0
### Changes
Expand Down
2 changes: 1 addition & 1 deletion cmd/migrate/v3/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func migrateConfig() (Config, error) {
ParseGenesis: cfg.Parser.ParseGenesis,
StartHeight: cfg.Parser.StartHeight,
FastSync: cfg.Parser.FastSync,
AvgBlockTime: averageBlockTime,
AvgBlockTime: &averageBlockTime,
},
Logging: cfg.Logging,
Telemetry: cfg.Telemetry,
Expand Down
9 changes: 4 additions & 5 deletions cmd/start/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,13 @@ func enqueueNewBlocks(exportQueue types.HeightQueue, ctx *parser.Context) {
ctx.Logger.Debug("enqueueing new block", "height", currHeight)
exportQueue <- currHeight
}
time.Sleep(config.Cfg.Parser.AvgBlockTime)
time.Sleep(config.GetAvgBlockTime())
}
}

// mustGetLatestHeight tries getting the latest height from the RPC client.
// If after 50 tries no latest height can be found, it returns 0.
func mustGetLatestHeight(ctx *parser.Context) int64 {
avgBlockTime := config.Cfg.Parser.AvgBlockTime

for retryCount := 0; retryCount < 50; retryCount++ {
latestBlockHeight, err := ctx.Node.LatestHeight()
if err == nil {
Expand All @@ -194,9 +192,10 @@ func mustGetLatestHeight(ctx *parser.Context) int64 {

ctx.Logger.Error("failed to get last block from RPCConfig client",
"err", err,
"retry interval", avgBlockTime,
"retry interval", config.GetAvgBlockTime(),
"retry count", retryCount)
time.Sleep(avgBlockTime)

time.Sleep(config.GetAvgBlockTime())
}

return 0
Expand Down
21 changes: 11 additions & 10 deletions parser/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package config
import "time"

type Config struct {
Workers int64 `yaml:"workers"`
ParseNewBlocks bool `yaml:"listen_new_blocks"`
ParseOldBlocks bool `yaml:"parse_old_blocks"`
GenesisFilePath string `yaml:"genesis_file_path,omitempty"`
ParseGenesis bool `yaml:"parse_genesis"`
StartHeight int64 `yaml:"start_height"`
FastSync bool `yaml:"fast_sync,omitempty"`
AvgBlockTime time.Duration `yaml:"average_block_time"`
Workers int64 `yaml:"workers"`
ParseNewBlocks bool `yaml:"listen_new_blocks"`
ParseOldBlocks bool `yaml:"parse_old_blocks"`
GenesisFilePath string `yaml:"genesis_file_path,omitempty"`
ParseGenesis bool `yaml:"parse_genesis"`
StartHeight int64 `yaml:"start_height"`
FastSync bool `yaml:"fast_sync,omitempty"`
AvgBlockTime *time.Duration `yaml:"average_block_time"`
}

// NewParsingConfig allows to build a new Config instance
Expand All @@ -19,7 +19,7 @@ func NewParsingConfig(
parseNewBlocks, parseOldBlocks bool,
parseGenesis bool, genesisFilePath string,
startHeight int64, fastSync bool,
avgBlockTime time.Duration,
avgBlockTime *time.Duration,
) Config {
return Config{
Workers: workers,
Expand All @@ -35,6 +35,7 @@ func NewParsingConfig(

// DefaultParsingConfig returns the default instance of Config
func DefaultParsingConfig() Config {
avgBlockTime := 5 * time.Second
return NewParsingConfig(
1,
true,
Expand All @@ -43,6 +44,6 @@ func DefaultParsingConfig() Config {
"",
1,
false,
5*time.Second,
&avgBlockTime,
)
}
5 changes: 4 additions & 1 deletion parser/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parser
import (
"encoding/json"
"fmt"
"time"

"github.com/forbole/juno/v3/logging"

Expand Down Expand Up @@ -60,7 +61,9 @@ func (w Worker) Start() {

for i := range w.queue {
if err := w.ProcessIfNotExists(i); err != nil {
// re-enqueue any failed job
// re-enqueue any failed job after average block time
time.Sleep(config.GetAvgBlockTime())

// TODO: Implement exponential backoff or max retries for a block height.
go func() {
w.logger.Error("re-enqueueing failed block", "height", i, "err", err)
Expand Down
10 changes: 10 additions & 0 deletions types/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"path"
"time"
)

var (
Expand All @@ -12,3 +13,12 @@ var (
func GetConfigFilePath() string {
return path.Join(HomePath, "config.yaml")
}

// GetAvgBlockTime returns the average_block_time in the configuration file or
// returns 3 seconds if it is not configured
func GetAvgBlockTime() time.Duration {
if Cfg.Parser.AvgBlockTime == nil {
return 3 * time.Second
}
return *Cfg.Parser.AvgBlockTime
}