From f251237ef45786b3f422b52e2d8e2d5bbf98a024 Mon Sep 17 00:00:00 2001 From: Morgan Bazalgette Date: Tue, 17 Sep 2024 01:44:21 +0200 Subject: [PATCH 1/2] attempts at using replay file for this stuff --- gno.land/cmd/gnoland/start.go | 3 ++ misc/walscan/walscan.go | 60 ++++++++++++++++++++++++++++ tm2/pkg/bft/consensus/replay_file.go | 35 ++++++++-------- 3 files changed, 82 insertions(+), 16 deletions(-) create mode 100644 misc/walscan/walscan.go diff --git a/gno.land/cmd/gnoland/start.go b/gno.land/cmd/gnoland/start.go index 21f0cb4b1a6..c260248bd11 100644 --- a/gno.land/cmd/gnoland/start.go +++ b/gno.land/cmd/gnoland/start.go @@ -254,6 +254,9 @@ func execStart(ctx context.Context, c *startCfg, io commands.IO) error { if err != nil { return fmt.Errorf("unable to create the Gnoland node, %w", err) } + if err := gnoNode.ConsensusState().ReplayFile("./wal", false); err != nil { + return err + } // Start the node (async) if err := gnoNode.Start(); err != nil { diff --git a/misc/walscan/walscan.go b/misc/walscan/walscan.go new file mode 100644 index 00000000000..4a21fbf8d5f --- /dev/null +++ b/misc/walscan/walscan.go @@ -0,0 +1,60 @@ +package main + +import ( + "errors" + "flag" + "fmt" + "io" + "log" + "os" + + "github.com/gnolang/gno/tm2/pkg/amino" + "github.com/gnolang/gno/tm2/pkg/bft/wal" + + // Amino, amino, why so peculiar? + _ "github.com/gnolang/gno/tm2/pkg/bft/consensus" +) + +const ( + maxWALSize = 1 << 20 // 1MB +) + +func main() { + flag.Parse() + + filename := flag.Arg(0) + if filename == "" { + log.Fatalf("usage: %s \n", flag.CommandLine.Name()) + } + f, err := os.Open(filename) + if err != nil { + log.Fatalf("error opening %q: %v", filename, err) + } + defer f.Close() + + amino.RegisterPackage(amino.NewPackage("github.com/gnolang/gno/tm2/pkg/bft/wal", "wal", amino.GetCallersDirname()).WithTypes( + &wal.MetaMessage{}, "MetaMessage", + &wal.TimedWALMessage{}, "TimedMessage", + )) + + rd := wal.NewWALReader(f, maxWALSize) + for { + msg, meta, err := rd.ReadMessage() + if err != nil { + if errors.Is(err, io.EOF) { + return + } + log.Fatalf("reading wal message: %v", err) + } + switch { + case msg != nil: + x := any(msg) + fmt.Printf("%s\n", amino.MustMarshalJSON(&x)) + case meta != nil: + x := any(meta) + fmt.Printf("%s\n", amino.MustMarshalJSON(&x)) + default: + panic("msg == meta == nil") + } + } +} diff --git a/tm2/pkg/bft/consensus/replay_file.go b/tm2/pkg/bft/consensus/replay_file.go index 701c2893053..d8be0942493 100644 --- a/tm2/pkg/bft/consensus/replay_file.go +++ b/tm2/pkg/bft/consensus/replay_file.go @@ -30,7 +30,7 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { if cs.IsRunning() { return errors.New("cs is already running, cannot replay") } - if cs.wal != nil { + if cs.wal != nil && (cs.wal != walm.NopWAL{}) { return errors.New("cs wal is open, cannot replay") } @@ -38,8 +38,13 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { // ensure all new step events are regenerated as expected - newStepSub := events.SubscribeToEvent(cs.evsw, subscriber, cstypes.EventNewRoundStep{}) - defer cs.evsw.RemoveListener(subscriber) + // XXX: + // This doesn't work synchronously (the default), and there seem to be tweaks + // to be made to make this work synchronously. + // + // ch := make(chan events.Event, 1) // asynchronous + // newStepSub := events.SubscribeToEventOn(cs.evsw, subscriber, cstypes.EventNewRoundStep{}, ch) + // defer cs.evsw.RemoveListener(subscriber) // just open the file for reading, no need to use wal fp, err := os.OpenFile(file, os.O_RDONLY, 0o600) @@ -65,7 +70,7 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { return err } - if err := pb.cs.readReplayMessage(msg, meta, newStepSub); err != nil { + if err := pb.cs.readReplayMessage(msg, meta, nil); err != nil { return err } @@ -142,18 +147,15 @@ func (pb *playback) replayReset(count int, newStepSub <-chan events.Event) error } func (cs *ConsensusState) startForReplay() { - cs.Logger.Error("Replay commands are disabled until someone updates them and writes tests") - /* TODO:! - // since we replay tocks we just ignore ticks - go func() { - for { - select { - case <-cs.tickChan: - case <-cs.Quit: - return - } + go func() { + for { + select { + //case <-cs.timeoutTicker.(*timeoutTicker).tickChan: + case <-cs.Quit(): + return } - }()*/ + } + }() } // console function for parsing input and running commands @@ -197,7 +199,8 @@ func (pb *playback) replayConsoleLoop() int { // ensure all new step events are regenerated as expected - newStepSub := events.SubscribeToEvent(pb.cs.evsw, subscriber, cstypes.EventNewRoundStep{}) + ch := make(chan events.Event, 1) // asynchronous + newStepSub := events.SubscribeToEventOn(pb.cs.evsw, subscriber, cstypes.EventNewRoundStep{}, ch) defer pb.cs.evsw.RemoveListener(subscriber) if len(tokens) == 1 { From db52967032a3bfb4a6f7af9736418a109dad0ff2 Mon Sep 17 00:00:00 2001 From: Morgan Bazalgette Date: Wed, 18 Sep 2024 19:20:38 +0200 Subject: [PATCH 2/2] add logline --- tm2/pkg/bft/consensus/replay_file.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tm2/pkg/bft/consensus/replay_file.go b/tm2/pkg/bft/consensus/replay_file.go index d8be0942493..0568bfed6e8 100644 --- a/tm2/pkg/bft/consensus/replay_file.go +++ b/tm2/pkg/bft/consensus/replay_file.go @@ -55,6 +55,8 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { pb := newPlayback(file, fp, cs, cs.state.Copy()) defer pb.fp.Close() //nolint: errcheck + cs.Logger.Debug("Replay: playing back from file", "filename", file) + var nextN int // apply N msgs in a row var msg *walm.TimedWALMessage var meta *walm.MetaMessage