Skip to content

Commit

Permalink
Merge pull request #5375 from filecoin-project/feat/refactor-fsm-input
Browse files Browse the repository at this point in the history
storagefsm: Rewrite input handling
  • Loading branch information
magik6k authored Feb 25, 2021
2 parents 247a086 + d8c9712 commit e49a412
Show file tree
Hide file tree
Showing 11 changed files with 630 additions and 387 deletions.
2 changes: 2 additions & 0 deletions cmd/lotus-storage-miner/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ var stateList = []stateMeta{

{col: color.FgBlue, state: sealing.Empty},
{col: color.FgBlue, state: sealing.WaitDeals},
{col: color.FgBlue, state: sealing.AddPiece},

{col: color.FgRed, state: sealing.UndefinedSectorState},
{col: color.FgYellow, state: sealing.Packing},
Expand All @@ -306,6 +307,7 @@ var stateList = []stateMeta{
{col: color.FgCyan, state: sealing.Removed},

{col: color.FgRed, state: sealing.FailedUnrecoverable},
{col: color.FgRed, state: sealing.AddPieceFailed},
{col: color.FgRed, state: sealing.SealPreCommit1Failed},
{col: color.FgRed, state: sealing.SealPreCommit2Failed},
{col: color.FgRed, state: sealing.PreCommitFailed},
Expand Down
50 changes: 49 additions & 1 deletion extern/storage-sealing/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

156 changes: 64 additions & 92 deletions extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,22 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
// Sealing

UndefinedSectorState: planOne(
on(SectorStart{}, Empty),
on(SectorStart{}, WaitDeals),
on(SectorStartCC{}, Packing),
),
Empty: planOne(on(SectorAddPiece{}, WaitDeals)),
Empty: planOne( // deprecated
on(SectorAddPiece{}, AddPiece),
on(SectorStartPacking{}, Packing),
),
WaitDeals: planOne(
on(SectorAddPiece{}, WaitDeals),
on(SectorAddPiece{}, AddPiece),
on(SectorStartPacking{}, Packing),
),
AddPiece: planOne(
on(SectorPieceAdded{}, WaitDeals),
apply(SectorStartPacking{}),
on(SectorAddPieceFailed{}, AddPieceFailed),
),
Packing: planOne(on(SectorPacked{}, GetTicket)),
GetTicket: planOne(
on(SectorTicket{}, PreCommit1),
Expand Down Expand Up @@ -97,6 +105,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto

// Sealing errors

AddPieceFailed: planOne(),
SealPreCommit1Failed: planOne(
on(SectorRetrySealPreCommit1{}, PreCommit1),
),
Expand Down Expand Up @@ -238,12 +247,11 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta

/*
* Empty <- incoming deals
| |
| v
*<- WaitDeals <- incoming deals
| |
| v
UndefinedSectorState (start)
v |
*<- WaitDeals <-> AddPiece |
| | /--------------------/
| v v
*<- Packing <- incoming committed capacity
| |
| v
Expand Down Expand Up @@ -282,10 +290,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
v
FailedUnrecoverable
UndefinedSectorState <- ¯\_(ツ)_/¯
| ^
*---------------------/
*/

m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State)
Expand All @@ -295,7 +299,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
case Empty:
fallthrough
case WaitDeals:
log.Infof("Waiting for deals %d", state.SectorNumber)
return m.handleWaitDeals, processed, nil
case AddPiece:
return m.handleAddPiece, processed, nil
case Packing:
return m.handlePacking, processed, nil
case GetTicket:
Expand Down Expand Up @@ -418,60 +424,10 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
log.Errorf("loading sector list: %+v", err)
}

cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting the sealing delay: %w", err)
}

spt, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting current seal proof: %w", err)
}
ssize, err := spt.SectorSize()
if err != nil {
return err
}

// m.unsealedInfoMap.lk.Lock() taken early in .New to prevent races
defer m.unsealedInfoMap.lk.Unlock()

for _, sector := range trackedSectors {
if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil {
log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err)
}

if sector.State == WaitDeals {

// put the sector in the unsealedInfoMap
if _, ok := m.unsealedInfoMap.infos[sector.SectorNumber]; ok {
// something's funky here, but probably safe to move on
log.Warnf("sector %v was already in the unsealedInfoMap when restarting", sector.SectorNumber)
} else {
ui := UnsealedSectorInfo{
ssize: ssize,
}
for _, p := range sector.Pieces {
if p.DealInfo != nil {
ui.numDeals++
}
ui.stored += p.Piece.Size
ui.pieceSizes = append(ui.pieceSizes, p.Piece.Size.Unpadded())
}

m.unsealedInfoMap.infos[sector.SectorNumber] = ui
}

// start a fresh timer for the sector
if cfg.WaitDealsDelay > 0 {
timer := time.NewTimer(cfg.WaitDealsDelay)
go func() {
<-timer.C
if err := m.StartPacking(sector.SectorNumber); err != nil {
log.Errorf("starting sector %d: %+v", sector.SectorNumber, err)
}
}()
}
}
}

// TODO: Grab on-chain sector set and diff with trackedSectors
Expand All @@ -494,56 +450,72 @@ func final(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
}

func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) {
return func() (mutator, func(*SectorInfo) error) {
return mut, func(state *SectorInfo) error {
func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
state.State = next
return nil
return false, nil
}
}
}

// like `on`, but doesn't change state
func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
return true, nil
}
}
}

func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) {
return func() (mutator, func(*SectorInfo) error) {
return mut, func(state *SectorInfo) error {
func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
if state.Return == "" {
return xerrors.Errorf("return state not set")
return false, xerrors.Errorf("return state not set")
}

state.State = SectorState(state.Return)
state.Return = ""
return nil
return false, nil
}
}
}

func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
if gm, ok := events[0].User.(globalMutator); ok {
gm.applyGlobal(state)
return 1, nil
}
for i, event := range events {
if gm, ok := event.User.(globalMutator); ok {
gm.applyGlobal(state)
return uint64(i + 1), nil
}

for _, t := range ts {
mut, next := t()
for _, t := range ts {
mut, next := t()

if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) {
continue
}
if reflect.TypeOf(event.User) != reflect.TypeOf(mut) {
continue
}

if err, iserr := event.User.(error); iserr {
log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, event.User, err)
}

if err, iserr := events[0].User.(error); iserr {
log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, events[0].User, err)
event.User.(mutator).apply(state)
more, err := next(state)
if err != nil || !more {
return uint64(i + 1), err
}
}

events[0].User.(mutator).apply(state)
return 1, next(state)
}
_, ok := event.User.(Ignorable)
if ok {
continue
}

_, ok := events[0].User.(Ignorable)
if ok {
return 1, nil
return uint64(i + 1), xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, event.User, event)
}

return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
return uint64(len(events)), nil
}
}
26 changes: 21 additions & 5 deletions extern/storage-sealing/fsm_events.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package sealing

import (
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"time"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
)

type mutator interface {
Expand Down Expand Up @@ -76,14 +79,27 @@ func (evt SectorStartCC) apply(state *SectorInfo) {
state.SectorType = evt.SectorType
}

type SectorAddPiece struct {
NewPiece Piece
}
type SectorAddPiece struct{}

func (evt SectorAddPiece) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.NewPiece)
if state.CreationTime == 0 {
state.CreationTime = time.Now().Unix()
}
}

type SectorPieceAdded struct {
NewPieces []Piece
}

func (evt SectorPieceAdded) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.NewPieces...)
}

type SectorAddPieceFailed struct{ error }

func (evt SectorAddPieceFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorAddPieceFailed) apply(si *SectorInfo) {}

type SectorStartPacking struct{}

func (evt SectorStartPacking) apply(*SectorInfo) {}
Expand Down
Loading

0 comments on commit e49a412

Please sign in to comment.