Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: re-enqueue failed block after average block time #73

Merged
merged 9 commits into from
Sep 2, 2022
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Unreleased
### Changes
- ([\#73](https://github.com/forbole/juno/pull/73)) re-enqueue failed block after average block time

## v3.3.0
### Changes
- ([\#67](https://github.com/forbole/juno/pull/67)) Added support for concurrent transaction handling
Expand Down
2 changes: 1 addition & 1 deletion cmd/migrate/v3/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func migrateConfig() (Config, error) {
ParseGenesis: cfg.Parser.ParseGenesis,
StartHeight: cfg.Parser.StartHeight,
FastSync: cfg.Parser.FastSync,
AvgBlockTime: averageBlockTime,
AvgBlockTime: &averageBlockTime,
},
Logging: cfg.Logging,
Telemetry: cfg.Telemetry,
Expand Down
2 changes: 1 addition & 1 deletion cmd/start/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func enqueueNewBlocks(exportQueue types.HeightQueue, ctx *parser.Context) {
ctx.Logger.Debug("enqueueing new block", "height", currHeight)
exportQueue <- currHeight
}
time.Sleep(config.Cfg.Parser.AvgBlockTime)
time.Sleep(config.GetAvgBlockTime())
}
}

Expand Down
21 changes: 11 additions & 10 deletions parser/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package config
import "time"

type Config struct {
Workers int64 `yaml:"workers"`
ParseNewBlocks bool `yaml:"listen_new_blocks"`
ParseOldBlocks bool `yaml:"parse_old_blocks"`
GenesisFilePath string `yaml:"genesis_file_path,omitempty"`
ParseGenesis bool `yaml:"parse_genesis"`
StartHeight int64 `yaml:"start_height"`
FastSync bool `yaml:"fast_sync,omitempty"`
AvgBlockTime time.Duration `yaml:"average_block_time"`
Workers int64 `yaml:"workers"`
ParseNewBlocks bool `yaml:"listen_new_blocks"`
ParseOldBlocks bool `yaml:"parse_old_blocks"`
GenesisFilePath string `yaml:"genesis_file_path,omitempty"`
ParseGenesis bool `yaml:"parse_genesis"`
StartHeight int64 `yaml:"start_height"`
FastSync bool `yaml:"fast_sync,omitempty"`
AvgBlockTime *time.Duration `yaml:"average_block_time"`
}

// NewParsingConfig allows to build a new Config instance
Expand All @@ -19,7 +19,7 @@ func NewParsingConfig(
parseNewBlocks, parseOldBlocks bool,
parseGenesis bool, genesisFilePath string,
startHeight int64, fastSync bool,
avgBlockTime time.Duration,
avgBlockTime *time.Duration,
) Config {
return Config{
Workers: workers,
Expand All @@ -35,6 +35,7 @@ func NewParsingConfig(

// DefaultParsingConfig returns the default instance of Config
func DefaultParsingConfig() Config {
avgBlockTime := 5 * time.Second
return NewParsingConfig(
1,
true,
Expand All @@ -43,6 +44,6 @@ func DefaultParsingConfig() Config {
"",
1,
false,
5*time.Second,
&avgBlockTime,
)
}
5 changes: 4 additions & 1 deletion parser/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parser
import (
"encoding/json"
"fmt"
"time"

"github.com/forbole/juno/v3/logging"

Expand Down Expand Up @@ -60,7 +61,9 @@ func (w Worker) Start() {

for i := range w.queue {
if err := w.ProcessIfNotExists(i); err != nil {
// re-enqueue any failed job
// re-enqueue any failed job after average block time
time.Sleep(config.GetAvgBlockTime())

// TODO: Implement exponential backoff or max retries for a block height.
go func() {
w.logger.Error("re-enqueueing failed block", "height", i, "err", err)
Expand Down
10 changes: 10 additions & 0 deletions types/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"path"
"time"
)

var (
Expand All @@ -12,3 +13,12 @@ var (
func GetConfigFilePath() string {
return path.Join(HomePath, "config.yaml")
}

// GetAvgBlockTime returns the average_block_time in the configuration file or
// returns 3 seconds if it is not configured
func GetAvgBlockTime() time.Duration {
if Cfg.Parser.AvgBlockTime == nil {
return 3 * time.Second
}
return *Cfg.Parser.AvgBlockTime
}