diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 1941038b06..3993a07e7c 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -2,6 +2,7 @@ package polybft import ( + "context" "encoding/json" "fmt" "path/filepath" @@ -353,18 +354,21 @@ func (p *Polybft) Start() error { return fmt.Errorf("failed to start syncer. Error: %w", err) } - // start syncing - go func() { + // sync concurrently, retrying indefinitely + go common.RetryForever(context.Background(), time.Second, func(context.Context) error { blockHandler := func(b *types.FullBlock) bool { p.runtime.OnBlockInserted(b) return false } - if err := p.syncer.Sync(blockHandler); err != nil { p.logger.Error("blocks synchronization failed", "error", err) + + return err } - }() + + return nil + }) // start consensus runtime if err := p.startRuntime(); err != nil { diff --git a/go.mod b/go.mod index 4cb639fd50..62c463531d 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,7 @@ require ( github.com/dave/jennifer v1.6.1 github.com/quasilyte/go-ruleguard v0.3.19 github.com/quasilyte/go-ruleguard/dsl v0.3.22 + github.com/sethvargo/go-retry v0.2.4 golang.org/x/sync v0.2.0 google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 gopkg.in/DataDog/dd-trace-go.v1 v1.50.1 diff --git a/go.sum b/go.sum index 490445c85d..7e33310893 100644 --- a/go.sum +++ b/go.sum @@ -619,6 +619,8 @@ github.com/secure-systems-lab/go-securesystemslib v0.3.1/go.mod h1:o8hhjkbNl2gOa github.com/secure-systems-lab/go-securesystemslib v0.5.0 h1:oTiNu0QnulMQgN/hLK124wJD/r2f9ZhIUuKIeBsCBT8= github.com/secure-systems-lab/go-securesystemslib v0.5.0/go.mod h1:uoCqUC0Ap7jrBSEanxT+SdACYJTVplRXWLkGMuDjXqk= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec= +github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw= github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY= github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM= github.com/shurcooL/github_flavored_markdown v0.0.0-20181002035957-2122de532470/go.mod h1:2dOwnU2uBioM+SGy2aZoq1f/Sd1l9OkAeAUvjSyvgU0= diff --git a/helper/common/common.go b/helper/common/common.go index 954d6af4ae..11139435d3 100644 --- a/helper/common/common.go +++ b/helper/common/common.go @@ -1,6 +1,7 @@ package common import ( + "context" "encoding/binary" "encoding/json" "errors" @@ -18,6 +19,7 @@ import ( "time" "github.com/0xPolygon/polygon-edge/helper/hex" + "github.com/sethvargo/go-retry" ) var ( @@ -30,6 +32,17 @@ var ( errInvalidDuration = errors.New("invalid duration") ) +// RetryForever will execute a function until it completes without error +func RetryForever(ctx context.Context, interval time.Duration, fn func(context.Context) error) { + _ = retry.Do(ctx, retry.NewConstant(interval), func(context.Context) error { + if err := fn(ctx); err != nil { + return retry.RetryableError(err) + } + + return nil + }) +} + // Min returns the strictly lower number func Min(a, b uint64) uint64 { if a < b { diff --git a/helper/common/common_test.go b/helper/common/common_test.go index 938ec30632..6a7af9e4e4 100644 --- a/helper/common/common_test.go +++ b/helper/common/common_test.go @@ -1,7 +1,9 @@ package common import ( + "context" "encoding/json" + "errors" "math/big" "testing" "time" @@ -98,3 +100,35 @@ func Test_Duration_Marshal_UnmarshalJSON(t *testing.T) { require.Equal(t, origTimer, otherTimer) }) } + +func TestRetryForever_AlwaysReturnError_ShouldNeverEnd(t *testing.T) { + interval := time.Millisecond * 10 + ended := false + + go func() { + RetryForever(context.Background(), interval, func(ctx context.Context) error { + return errors.New("") + }) + + ended = true + }() + time.Sleep(interval * 10) + require.False(t, ended) +} + +func TestRetryForever_ReturnNilAfterFirstRun_ShouldEnd(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + RetryForever(ctx, time.Millisecond*100, func(ctx context.Context) error { + select { + case <-ctx.Done(): + + return nil + default: + cancel() + + return errors.New("") + } + }) + <-ctx.Done() + require.True(t, errors.Is(ctx.Err(), context.Canceled)) +} diff --git a/server/server.go b/server/server.go index 960f8952ee..3855e8b147 100644 --- a/server/server.go +++ b/server/server.go @@ -917,6 +917,7 @@ func (s *Server) setupGRPC() error { return err } + // Start server with infinite retries go func() { if err := s.grpcServer.Serve(lis); err != nil { s.logger.Error(err.Error()) @@ -996,11 +997,13 @@ func (s *Server) startPrometheusServer(listenAddr *net.TCPAddr) *http.Server { ReadHeaderTimeout: 60 * time.Second, } - go func() { - s.logger.Info("Prometheus server started", "addr=", listenAddr.String()) + s.logger.Info("Prometheus server started", "addr=", listenAddr.String()) - if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { - s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err) + go func() { + if err := srv.ListenAndServe(); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err) + } } }() diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index 882e7bab6e..2a2895c02e 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -2,7 +2,9 @@ package tracker import ( "context" + "time" + "github.com/0xPolygon/polygon-edge/helper/common" hcf "github.com/hashicorp/go-hclog" "github.com/umbracle/ethgo" "github.com/umbracle/ethgo/blocktracker" @@ -64,6 +66,29 @@ func (e *EventTracker) Start(ctx context.Context) error { blockMaxBacklog := e.numBlockConfirmations*2 + 1 blockTracker := blocktracker.NewBlockTracker(provider.Eth(), blocktracker.WithBlockMaxBacklog(blockMaxBacklog)) + go func() { + <-ctx.Done() + blockTracker.Close() + store.Close() + }() + + // Init and start block tracker concurrently, retrying indefinitely + go common.RetryForever(ctx, time.Second, func(context.Context) error { + if err := blockTracker.Init(); err != nil { + e.logger.Error("failed to init blocktracker", "error", err) + + return err + } + + if err := blockTracker.Start(); err != nil { + e.logger.Error("failed to start blocktracker", "error", err) + + return err + } + + return nil + }) + tt, err := tracker.NewTracker(provider.Eth(), tracker.WithBatchSize(10), tracker.WithBlockTracker(blockTracker), @@ -79,30 +104,16 @@ func (e *EventTracker) Start(ctx context.Context) error { if err != nil { return err } - - go func() { - if err := blockTracker.Init(); err != nil { - e.logger.Error("failed to init blocktracker", "error", err) - - return - } - - if err := blockTracker.Start(); err != nil { - e.logger.Error("failed to start blocktracker", "error", err) - } - }() - - go func() { - <-ctx.Done() - blockTracker.Close() - store.Close() - }() - - go func() { + // Sync concurrently, retrying indefinitely + go common.RetryForever(ctx, time.Second, func(context.Context) error { if err := tt.Sync(ctx); err != nil { e.logger.Error("failed to sync", "error", err) + + return err } - }() + + return nil + }) return nil }