Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: use wal file to replay chain #2885

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions gno.land/cmd/gnoland/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ func execStart(ctx context.Context, c *startCfg, io commands.IO) error {
if err != nil {
return fmt.Errorf("unable to create the Gnoland node, %w", err)
}
if err := gnoNode.ConsensusState().ReplayFile("./wal", false); err != nil {
return err
}

// Start the node (async)
if err := gnoNode.Start(); err != nil {
Expand Down
60 changes: 60 additions & 0 deletions misc/walscan/walscan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"errors"
"flag"
"fmt"
"io"
"log"
"os"

"github.com/gnolang/gno/tm2/pkg/amino"
"github.com/gnolang/gno/tm2/pkg/bft/wal"

// Amino, amino, why so peculiar?
_ "github.com/gnolang/gno/tm2/pkg/bft/consensus"
)

const (
maxWALSize = 1 << 20 // 1MB
)

func main() {
flag.Parse()

filename := flag.Arg(0)
if filename == "" {
log.Fatalf("usage: %s <FILE>\n", flag.CommandLine.Name())
}
f, err := os.Open(filename)
if err != nil {
log.Fatalf("error opening %q: %v", filename, err)
}
defer f.Close()

amino.RegisterPackage(amino.NewPackage("github.com/gnolang/gno/tm2/pkg/bft/wal", "wal", amino.GetCallersDirname()).WithTypes(
&wal.MetaMessage{}, "MetaMessage",
&wal.TimedWALMessage{}, "TimedMessage",
))

rd := wal.NewWALReader(f, maxWALSize)
for {
msg, meta, err := rd.ReadMessage()
if err != nil {
if errors.Is(err, io.EOF) {
return
}
log.Fatalf("reading wal message: %v", err)
}
switch {
case msg != nil:
x := any(msg)
fmt.Printf("%s\n", amino.MustMarshalJSON(&x))
case meta != nil:
x := any(meta)
fmt.Printf("%s\n", amino.MustMarshalJSON(&x))
default:
panic("msg == meta == nil")
}
}
}
37 changes: 21 additions & 16 deletions tm2/pkg/bft/consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,21 @@
if cs.IsRunning() {
return errors.New("cs is already running, cannot replay")
}
if cs.wal != nil {
if cs.wal != nil && (cs.wal != walm.NopWAL{}) {

Check warning on line 33 in tm2/pkg/bft/consensus/replay_file.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/consensus/replay_file.go#L33

Added line #L33 was not covered by tests
return errors.New("cs wal is open, cannot replay")
}

cs.startForReplay()

// ensure all new step events are regenerated as expected

newStepSub := events.SubscribeToEvent(cs.evsw, subscriber, cstypes.EventNewRoundStep{})
defer cs.evsw.RemoveListener(subscriber)
// XXX:
// This doesn't work synchronously (the default), and there seem to be tweaks
// to be made to make this work synchronously.
//
// ch := make(chan events.Event, 1) // asynchronous
// newStepSub := events.SubscribeToEventOn(cs.evsw, subscriber, cstypes.EventNewRoundStep{}, ch)
// defer cs.evsw.RemoveListener(subscriber)

// just open the file for reading, no need to use wal
fp, err := os.OpenFile(file, os.O_RDONLY, 0o600)
Expand All @@ -50,6 +55,8 @@
pb := newPlayback(file, fp, cs, cs.state.Copy())
defer pb.fp.Close() //nolint: errcheck

cs.Logger.Debug("Replay: playing back from file", "filename", file)

Check warning on line 58 in tm2/pkg/bft/consensus/replay_file.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/consensus/replay_file.go#L58

Added line #L58 was not covered by tests

var nextN int // apply N msgs in a row
var msg *walm.TimedWALMessage
var meta *walm.MetaMessage
Expand All @@ -65,7 +72,7 @@
return err
}

if err := pb.cs.readReplayMessage(msg, meta, newStepSub); err != nil {
if err := pb.cs.readReplayMessage(msg, meta, nil); err != nil {

Check warning on line 75 in tm2/pkg/bft/consensus/replay_file.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/consensus/replay_file.go#L75

Added line #L75 was not covered by tests
return err
}

Expand Down Expand Up @@ -142,18 +149,15 @@
}

func (cs *ConsensusState) startForReplay() {
cs.Logger.Error("Replay commands are disabled until someone updates them and writes tests")
/* TODO:!
// since we replay tocks we just ignore ticks
go func() {
for {
select {
case <-cs.tickChan:
case <-cs.Quit:
return
}
go func() {
for {
select {

Check warning on line 154 in tm2/pkg/bft/consensus/replay_file.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/consensus/replay_file.go#L152-L154

Added lines #L152 - L154 were not covered by tests
//case <-cs.timeoutTicker.(*timeoutTicker).tickChan:
case <-cs.Quit():
return

Check warning on line 157 in tm2/pkg/bft/consensus/replay_file.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/consensus/replay_file.go#L156-L157

Added lines #L156 - L157 were not covered by tests
}
}()*/
}
}()
}

// console function for parsing input and running commands
Expand Down Expand Up @@ -197,7 +201,8 @@

// ensure all new step events are regenerated as expected

newStepSub := events.SubscribeToEvent(pb.cs.evsw, subscriber, cstypes.EventNewRoundStep{})
ch := make(chan events.Event, 1) // asynchronous
newStepSub := events.SubscribeToEventOn(pb.cs.evsw, subscriber, cstypes.EventNewRoundStep{}, ch)

Check warning on line 205 in tm2/pkg/bft/consensus/replay_file.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/consensus/replay_file.go#L204-L205

Added lines #L204 - L205 were not covered by tests
defer pb.cs.evsw.RemoveListener(subscriber)

if len(tokens) == 1 {
Expand Down
Loading