diff --git a/cmd/epik-storage-miner/info.go b/cmd/epik-storage-miner/info.go index ae553068842..cea9223c65b 100644 --- a/cmd/epik-storage-miner/info.go +++ b/cmd/epik-storage-miner/info.go @@ -327,6 +327,7 @@ var stateList = []stateMeta{ {col: color.FgBlue, state: sealing.Empty}, {col: color.FgBlue, state: sealing.WaitDeals}, + {col: color.FgBlue, state: sealing.AddPiece}, {col: color.FgRed, state: sealing.UndefinedSectorState}, {col: color.FgYellow, state: sealing.Packing}, @@ -349,6 +350,7 @@ var stateList = []stateMeta{ {col: color.FgCyan, state: sealing.Removed}, {col: color.FgRed, state: sealing.FailedUnrecoverable}, + {col: color.FgRed, state: sealing.AddPieceFailed}, {col: color.FgRed, state: sealing.SealPreCommit1Failed}, {col: color.FgRed, state: sealing.SealPreCommit2Failed}, {col: color.FgRed, state: sealing.PreCommitFailed}, diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index 5bcc0a30431..33355f25191 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -471,7 +471,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{184, 24}); err != nil { + if _, err := w.Write([]byte{184, 25}); err != nil { return err } @@ -538,6 +538,28 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } + // t.CreationTime (int64) (int64) + if len("CreationTime") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"CreationTime\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("CreationTime"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("CreationTime")); err != nil { + return err + } + + if t.CreationTime >= 0 { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.CreationTime)); err != nil { + return err + } + } else { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.CreationTime-1)); err != nil { + return err + } + } + // t.Pieces ([]sealing.Piece) (slice) if len("Pieces") > cbg.MaxLength { return xerrors.Errorf("Value in field \"Pieces\" was too long") @@ -1087,6 +1109,32 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.SectorType = abi.RegisteredSealProof(extraI) } + // t.CreationTime (int64) (int64) + case "CreationTime": + { + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + var extraI int64 + if err != nil { + return err + } + switch maj { + case cbg.MajUnsignedInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 positive overflow") + } + case cbg.MajNegativeInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 negative oveflow") + } + extraI = -1 - extraI + default: + return fmt.Errorf("wrong type for int64 field: %d", maj) + } + + t.CreationTime = int64(extraI) + } // t.Pieces ([]sealing.Piece) (slice) case "Pieces": diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index c989d02967b..c38101e6cf3 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -37,14 +37,22 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto // Sealing UndefinedSectorState: planOne( - on(SectorStart{}, Empty), + on(SectorStart{}, WaitDeals), on(SectorStartCC{}, Packing), ), - Empty: planOne(on(SectorAddPiece{}, WaitDeals)), + Empty: planOne( // deprecated + on(SectorAddPiece{}, AddPiece), + on(SectorStartPacking{}, Packing), + ), WaitDeals: planOne( - on(SectorAddPiece{}, WaitDeals), + on(SectorAddPiece{}, AddPiece), on(SectorStartPacking{}, Packing), ), + AddPiece: planOne( + on(SectorPieceAdded{}, WaitDeals), + apply(SectorStartPacking{}), + on(SectorAddPieceFailed{}, AddPieceFailed), + ), Packing: planOne(on(SectorPacked{}, GetTicket)), GetTicket: planOne( on(SectorTicket{}, PreCommit1), @@ -97,6 +105,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto // Sealing errors + AddPieceFailed: planOne(), SealPreCommit1Failed: planOne( on(SectorRetrySealPreCommit1{}, PreCommit1), ), @@ -238,12 +247,11 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta /* - * Empty <- incoming deals - | | - | v - *<- WaitDeals <- incoming deals - | | - | v + UndefinedSectorState (start) + v | + *<- WaitDeals <-> AddPiece | + | | /--------------------/ + | v v *<- Packing <- incoming committed capacity | | | v @@ -282,10 +290,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta v FailedUnrecoverable - UndefinedSectorState <- ¯\_(ツ)_/¯ - | ^ - *---------------------/ - */ m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State) @@ -295,7 +299,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta case Empty: fallthrough case WaitDeals: - log.Infof("Waiting for deals %d", state.SectorNumber) + return m.handleWaitDeals, processed, nil + case AddPiece: + return m.handleAddPiece, processed, nil case Packing: return m.handlePacking, processed, nil case GetTicket: @@ -418,60 +424,10 @@ func (m *Sealing) restartSectors(ctx context.Context) error { log.Errorf("loading sector list: %+v", err) } - cfg, err := m.getConfig() - if err != nil { - return xerrors.Errorf("getting the sealing delay: %w", err) - } - - spt, err := m.currentSealProof(ctx) - if err != nil { - return xerrors.Errorf("getting current seal proof: %w", err) - } - ssize, err := spt.SectorSize() - if err != nil { - return err - } - - // m.unsealedInfoMap.lk.Lock() taken early in .New to prevent races - defer m.unsealedInfoMap.lk.Unlock() - for _, sector := range trackedSectors { if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil { log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err) } - - if sector.State == WaitDeals { - - // put the sector in the unsealedInfoMap - if _, ok := m.unsealedInfoMap.infos[sector.SectorNumber]; ok { - // something's funky here, but probably safe to move on - log.Warnf("sector %v was already in the unsealedInfoMap when restarting", sector.SectorNumber) - } else { - ui := UnsealedSectorInfo{ - ssize: ssize, - } - for _, p := range sector.Pieces { - if p.DealInfo != nil { - ui.numDeals++ - } - ui.stored += p.Piece.Size - ui.pieceSizes = append(ui.pieceSizes, p.Piece.Size.Unpadded()) - } - - m.unsealedInfoMap.infos[sector.SectorNumber] = ui - } - - // start a fresh timer for the sector - if cfg.WaitDealsDelay > 0 { - timer := time.NewTimer(cfg.WaitDealsDelay) - go func() { - <-timer.C - if err := m.StartPacking(sector.SectorNumber); err != nil { - log.Errorf("starting sector %d: %+v", sector.SectorNumber, err) - } - }() - } - } } // TODO: Grab on-chain sector set and diff with trackedSectors @@ -494,56 +450,72 @@ func final(events []statemachine.Event, state *SectorInfo) (uint64, error) { return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) } -func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) { - return func() (mutator, func(*SectorInfo) error) { - return mut, func(state *SectorInfo) error { +func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool, error)) { + return func() (mutator, func(*SectorInfo) (bool, error)) { + return mut, func(state *SectorInfo) (bool, error) { state.State = next - return nil + return false, nil + } + } +} + +// like `on`, but doesn't change state +func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) { + return func() (mutator, func(*SectorInfo) (bool, error)) { + return mut, func(state *SectorInfo) (bool, error) { + return true, nil } } } -func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) { - return func() (mutator, func(*SectorInfo) error) { - return mut, func(state *SectorInfo) error { +func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) { + return func() (mutator, func(*SectorInfo) (bool, error)) { + return mut, func(state *SectorInfo) (bool, error) { if state.Return == "" { - return xerrors.Errorf("return state not set") + return false, xerrors.Errorf("return state not set") } state.State = SectorState(state.Return) state.Return = "" - return nil + return false, nil } } } -func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { +func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { return func(events []statemachine.Event, state *SectorInfo) (uint64, error) { - if gm, ok := events[0].User.(globalMutator); ok { - gm.applyGlobal(state) - return 1, nil - } + for i, event := range events { + if gm, ok := event.User.(globalMutator); ok { + gm.applyGlobal(state) + return uint64(i + 1), nil + } - for _, t := range ts { - mut, next := t() + for _, t := range ts { + mut, next := t() - if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) { - continue - } + if reflect.TypeOf(event.User) != reflect.TypeOf(mut) { + continue + } + + if err, iserr := event.User.(error); iserr { + log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, event.User, err) + } - if err, iserr := events[0].User.(error); iserr { - log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, events[0].User, err) + event.User.(mutator).apply(state) + more, err := next(state) + if err != nil || !more { + return uint64(i + 1), err + } } - events[0].User.(mutator).apply(state) - return 1, next(state) - } + _, ok := event.User.(Ignorable) + if ok { + continue + } - _, ok := events[0].User.(Ignorable) - if ok { - return 1, nil + return uint64(i + 1), xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, event.User, event) } - return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0]) + return uint64(len(events)), nil } } diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 4c3958d66d2..160278d8782 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -1,12 +1,15 @@ package sealing import ( - "github.com/EpiK-Protocol/go-epik/chain/actors/builtin/miner" + "time" + "github.com/ipfs/go-cid" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-storage/storage" + + "github.com/EpiK-Protocol/go-epik/chain/actors/builtin/miner" ) type mutator interface { @@ -75,14 +78,27 @@ func (evt SectorStartCC) apply(state *SectorInfo) { state.SectorType = evt.SectorType } -type SectorAddPiece struct { - NewPiece Piece -} +type SectorAddPiece struct{} func (evt SectorAddPiece) apply(state *SectorInfo) { - state.Pieces = append(state.Pieces, evt.NewPiece) + if state.CreationTime == 0 { + state.CreationTime = time.Now().Unix() + } } +type SectorPieceAdded struct { + NewPieces []Piece +} + +func (evt SectorPieceAdded) apply(state *SectorInfo) { + state.Pieces = append(state.Pieces, evt.NewPieces...) +} + +type SectorAddPieceFailed struct{ error } + +func (evt SectorAddPieceFailed) FormatError(xerrors.Printer) (next error) { return evt.error } +func (evt SectorAddPieceFailed) apply(si *SectorInfo) {} + type SectorStartPacking struct{} func (evt SectorStartPacking) apply(*SectorInfo) {} diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go new file mode 100644 index 00000000000..fe28f69858d --- /dev/null +++ b/extern/storage-sealing/input.go @@ -0,0 +1,424 @@ +package sealing + +import ( + "context" + "sort" + "time" + + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/specs-storage/storage" + + sectorstorage "github.com/EpiK-Protocol/go-epik/extern/sector-storage" + "github.com/EpiK-Protocol/go-epik/extern/sector-storage/ffiwrapper" +) + +func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { + var used abi.UnpaddedPieceSize + for _, piece := range sector.Pieces { + used += piece.Piece.Size.Unpadded() + } + + m.inputLk.Lock() + + started, err := m.maybeStartSealing(ctx, sector, used) + if err != nil || started { + delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) + + m.inputLk.Unlock() + + return err + } + + m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ + used: used, + maybeAccept: func(cid cid.Cid) error { + // todo check deal start deadline (configurable) + + sid := m.minerSectorID(sector.SectorNumber) + m.assignedPieces[sid] = append(m.assignedPieces[sid], cid) + + return ctx.Send(SectorAddPiece{}) + }, + } + + go func() { + defer m.inputLk.Unlock() + if err := m.updateInput(ctx.Context(), sector.SectorType); err != nil { + log.Errorf("%+v", err) + } + }() + + return nil +} + +func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) { + now := time.Now() + st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)] + if st != nil { + if !st.Stop() { // timer expired, SectorStartPacking was/is being sent + // we send another SectorStartPacking in case one was sent in the handleAddPiece state + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") + return true, ctx.Send(SectorStartPacking{}) + } + } + + ssize, err := sector.SectorType.SectorSize() + if err != nil { + return false, xerrors.Errorf("getting sector size") + } + + maxDeals, err := getDealPerSectorLimit(ssize) + if err != nil { + return false, xerrors.Errorf("getting per-sector deal limit: %w", err) + } + + if len(sector.dealIDs()) >= maxDeals { + // can't accept more deals + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals") + return true, ctx.Send(SectorStartPacking{}) + } + + if used.Padded() == abi.PaddedPieceSize(ssize) { + // sector full + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled") + return true, ctx.Send(SectorStartPacking{}) + } + + if sector.CreationTime != 0 { + cfg, err := m.getConfig() + if err != nil { + m.inputLk.Unlock() + return false, xerrors.Errorf("getting storage config: %w", err) + } + + // todo check deal age, start sealing if any deal has less than X (configurable) to start deadline + sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay) + + if now.After(sealTime) { + m.inputLk.Unlock() + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") + return true, ctx.Send(SectorStartPacking{}) + } + + m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() { + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timer") + + if err := ctx.Send(SectorStartPacking{}); err != nil { + log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err) + } + }) + } + + return false, nil +} + +func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error { + ssize, err := sector.SectorType.SectorSize() + if err != nil { + return err + } + + res := SectorPieceAdded{} + + m.inputLk.Lock() + + pending, ok := m.assignedPieces[m.minerSectorID(sector.SectorNumber)] + if ok { + delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber)) + } + m.inputLk.Unlock() + if !ok { + // nothing to do here (might happen after a restart in AddPiece) + return ctx.Send(res) + } + + var offset abi.UnpaddedPieceSize + pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces)) + for i, p := range sector.Pieces { + pieceSizes[i] = p.Piece.Size.Unpadded() + offset += p.Piece.Size.Unpadded() + } + + maxDeals, err := getDealPerSectorLimit(ssize) + if err != nil { + return xerrors.Errorf("getting per-sector deal limit: %w", err) + } + + for i, piece := range pending { + m.inputLk.Lock() + deal, ok := m.pendingPieces[piece] + m.inputLk.Unlock() + if !ok { + return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber) + } + + if len(sector.dealIDs())+(i+1) > maxDeals { + // todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it + deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("too many deals assigned to sector %d, dropping deal", sector.SectorNumber)) + continue + } + + pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded()) + + if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) { + // todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it + deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("piece %s assigned to sector %d with not enough space", piece, sector.SectorNumber)) + continue + } + + offset += padLength.Unpadded() + + for _, p := range pads { + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), + m.minerSector(sector.SectorType, sector.SectorNumber), + pieceSizes, + p.Unpadded(), + NewNullReader(p.Unpadded())) + if err != nil { + err = xerrors.Errorf("writing padding piece: %w", err) + deal.accepted(sector.SectorNumber, offset, err) + return ctx.Send(SectorAddPieceFailed{err}) + } + + pieceSizes = append(pieceSizes, p.Unpadded()) + res.NewPieces = append(res.NewPieces, Piece{ + Piece: ppi, + }) + } + + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), + m.minerSector(sector.SectorType, sector.SectorNumber), + pieceSizes, + deal.size, + deal.data) + if err != nil { + err = xerrors.Errorf("writing piece: %w", err) + deal.accepted(sector.SectorNumber, offset, err) + return ctx.Send(SectorAddPieceFailed{err}) + } + + log.Infow("deal added to a sector", "deal", deal.deal.DealID, "sector", sector.SectorNumber, "piece", ppi.PieceCID) + + deal.accepted(sector.SectorNumber, offset, nil) + + offset += deal.size + pieceSizes = append(pieceSizes, deal.size) + + res.NewPieces = append(res.NewPieces, Piece{ + Piece: ppi, + DealInfo: &deal.deal, + }) + } + + return ctx.Send(res) +} + +func (m *Sealing) handleAddPieceFailed(ctx statemachine.Context, sector SectorInfo) error { + log.Errorf("No recovery plan for AddPiece failing") + // todo: cleanup sector / just go retry (requires adding offset param to AddPiece in sector-storage for this to be safe) + return nil +} + +func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { + log.Infof("Adding piece for deal %d (publish msg: %s)", deal.DealID, deal.PublishCid) + if (padreader.PaddedSize(uint64(size))) != size { + return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") + } + + sp, err := m.currentSealProof(ctx) + if err != nil { + return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) + } + + ssize, err := sp.SectorSize() + if err != nil { + return 0, 0, err + } + + if size > abi.PaddedPieceSize(ssize).Unpadded() { + return 0, 0, xerrors.Errorf("piece cannot fit into a sector") + } + + if _, err := deal.DealProposal.Cid(); err != nil { + return 0, 0, xerrors.Errorf("getting proposal CID: %w", err) + } + + m.inputLk.Lock() + if _, exist := m.pendingPieces[proposalCID(deal)]; exist { + m.inputLk.Unlock() + return 0, 0, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal)) + } + + resCh := make(chan struct { + sn abi.SectorNumber + offset abi.UnpaddedPieceSize + err error + }, 1) + + m.pendingPieces[proposalCID(deal)] = &pendingPiece{ + size: size, + deal: deal, + data: data, + assigned: false, + accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { + resCh <- struct { + sn abi.SectorNumber + offset abi.UnpaddedPieceSize + err error + }{sn: sn, offset: offset, err: err} + }, + } + + go func() { + defer m.inputLk.Unlock() + if err := m.updateInput(ctx, sp); err != nil { + log.Errorf("%+v", err) + } + }() + + res := <-resCh + + return res.sn, res.offset.Padded(), res.err +} + +// called with m.inputLk +func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error { + ssize, err := sp.SectorSize() + if err != nil { + return err + } + + type match struct { + sector abi.SectorID + deal cid.Cid + + size abi.UnpaddedPieceSize + padding abi.UnpaddedPieceSize + } + + var matches []match + toAssign := map[cid.Cid]struct{}{} // used to maybe create new sectors + + // todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners + // (unlikely to be a problem now) + for proposalCid, piece := range m.pendingPieces { + if piece.assigned { + continue // already assigned to a sector, skip + } + + toAssign[proposalCid] = struct{}{} + + for id, sector := range m.openSectors { + avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used + + if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding) + matches = append(matches, match{ + sector: id, + deal: proposalCid, + + size: piece.size, + padding: avail % piece.size, + }) + } + } + } + sort.Slice(matches, func(i, j int) bool { + if matches[i].padding != matches[j].padding { // less padding is better + return matches[i].padding < matches[j].padding + } + + if matches[i].size != matches[j].size { // larger pieces are better + return matches[i].size < matches[j].size + } + + return matches[i].sector.Number < matches[j].sector.Number // prefer older sectors + }) + + var assigned int + for _, mt := range matches { + if m.pendingPieces[mt.deal].assigned { + assigned++ + continue + } + + if _, found := m.openSectors[mt.sector]; !found { + continue + } + + err := m.openSectors[mt.sector].maybeAccept(mt.deal) + if err != nil { + m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece + } + + m.pendingPieces[mt.deal].assigned = true + delete(toAssign, mt.deal) + + if err != nil { + log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err) + continue + } + + delete(m.openSectors, mt.sector) + } + + if len(toAssign) > 0 { + if err := m.tryCreateDealSector(ctx, sp); err != nil { + log.Errorw("Failed to create a new sector for deals", "error", err) + } + } + + return nil +} + +func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error { + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting storage config: %w", err) + } + + if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals { + return nil + } + + if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors { + return nil + } + + // Now actually create a new sector + + sid, err := m.sc.Next() + if err != nil { + return xerrors.Errorf("getting sector number: %w", err) + } + + err = m.sealer.NewSector(ctx, m.minerSector(sp, sid)) + if err != nil { + return xerrors.Errorf("initializing sector: %w", err) + } + + log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) + return m.sectors.Send(uint64(sid), SectorStart{ + ID: sid, + SectorType: sp, + }) +} + +func (m *Sealing) StartPacking(sid abi.SectorNumber) error { + return m.sectors.Send(uint64(sid), SectorStartPacking{}) +} + +func proposalCID(deal DealInfo) cid.Cid { + pc, err := deal.DealProposal.Cid() + if err != nil { + log.Errorf("DealProposal.Cid error: %+v", err) + return cid.Undef + } + + return pc +} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index bca5d5a6455..4ea9a4f5402 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -3,8 +3,6 @@ package sealing import ( "context" "errors" - "io" - "math" "sync" "time" @@ -15,7 +13,6 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - padreader "github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/dline" @@ -89,8 +86,11 @@ type Sealing struct { sc SectorIDCounter verif ffiwrapper.Verifier - /* pcp PreCommitPolicy */ - unsealedInfoMap UnsealedSectorMap + inputLk sync.Mutex + openSectors map[abi.SectorID]*openSector + sectorTimers map[abi.SectorID]*time.Timer + pendingPieces map[cid.Cid]*pendingPiece + assignedPieces map[abi.SectorID][]cid.Cid /* upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} */ @@ -112,17 +112,20 @@ type FeeConfig struct { MaxTerminateGasFee abi.TokenAmount } -type UnsealedSectorMap struct { - infos map[abi.SectorNumber]UnsealedSectorInfo - lk sync.Mutex +type openSector struct { + used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors + + maybeAccept func(cid.Cid) error // called with inputLk } -type UnsealedSectorInfo struct { - numDeals uint64 - // stored should always equal sum of pieceSizes.Padded() - stored abi.PaddedPieceSize - pieceSizes []abi.UnpaddedPieceSize - ssize abi.SectorSize +type pendingPiece struct { + size abi.UnpaddedPieceSize + deal DealInfo + + data storage.Data + + assigned bool // assigned to a sector? + accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error) } func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier /* , pcp PreCommitPolicy */, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing { @@ -135,13 +138,12 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds sealer: sealer, sc: sc, verif: verif, - /* pcp: pcp, */ - unsealedInfoMap: UnsealedSectorMap{ - infos: make(map[abi.SectorNumber]UnsealedSectorInfo), - lk: sync.Mutex{}, - }, - /* toUpgrade: map[abi.SectorNumber]struct{}{}, */ + openSectors: map[abi.SectorID]*openSector{}, + sectorTimers: map[abi.SectorID]*time.Timer{}, + pendingPieces: map[cid.Cid]*pendingPiece{}, + assignedPieces: map[abi.SectorID][]cid.Cid{}, + // toUpgrade: map[abi.SectorNumber]struct{}{}, notifee: notifee, addrSel: as, @@ -158,8 +160,6 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) - s.unsealedInfoMap.lk.Lock() // released after initialized in .Run() - return s } @@ -183,104 +183,6 @@ func (m *Sealing) Stop(ctx context.Context) error { return nil } -func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { - log.Infof("Adding piece for deal %d (publish msg: %s)", d.DealID, d.PublishCid) - if (padreader.PaddedSize(uint64(size))) != size { - return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") - } - - sp, err := m.currentSealProof(ctx) - if err != nil { - return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) - } - - ssize, err := sp.SectorSize() - if err != nil { - return 0, 0, err - } - - if size > abi.PaddedPieceSize(ssize).Unpadded() { - return 0, 0, xerrors.Errorf("piece cannot fit into a sector") - } - - m.unsealedInfoMap.lk.Lock() - - sid, pads, err := m.getSectorAndPadding(ctx, size) - if err != nil { - m.unsealedInfoMap.lk.Unlock() - return 0, 0, xerrors.Errorf("getting available sector: %w", err) - } - - for _, p := range pads { - err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil) - if err != nil { - m.unsealedInfoMap.lk.Unlock() - return 0, 0, xerrors.Errorf("writing pads: %w", err) - } - } - - offset := m.unsealedInfoMap.infos[sid].stored - err = m.addPiece(ctx, sid, size, r, &d) - - if err != nil { - m.unsealedInfoMap.lk.Unlock() - return 0, 0, xerrors.Errorf("adding piece to sector: %w", err) - } - - startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(ssize) - - m.unsealedInfoMap.lk.Unlock() - - if startPacking { - if err := m.StartPacking(sid); err != nil { - return 0, 0, xerrors.Errorf("start packing: %w", err) - } - } - - return sid, offset, nil -} - -// Caller should hold m.unsealedInfoMap.lk -func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error { - log.Infof("Adding piece to sector %d", sectorID) - sp, err := m.currentSealProof(ctx) - if err != nil { - return xerrors.Errorf("getting current seal proof type: %w", err) - } - ssize, err := sp.SectorSize() - if err != nil { - return err - } - - ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sp, sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r) - if err != nil { - return xerrors.Errorf("writing piece: %w", err) - } - piece := Piece{ - Piece: ppi, - DealInfo: di, - } - - err = m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece}) - if err != nil { - return err - } - - ui := m.unsealedInfoMap.infos[sectorID] - num := m.unsealedInfoMap.infos[sectorID].numDeals - if di != nil { - num = num + 1 - } - m.unsealedInfoMap.infos[sectorID] = UnsealedSectorInfo{ - numDeals: num, - stored: ui.stored + piece.Piece.Size, - pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()), - ssize: ssize, - } - - return nil -} - func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorRemove{}) } @@ -297,168 +199,6 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) return m.terminator.Pending(ctx) } -// Caller should NOT hold m.unsealedInfoMap.lk -func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { - // locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else - m.unsealedInfoMap.lk.Lock() - defer m.unsealedInfoMap.lk.Unlock() - - // cannot send SectorStartPacking to sectors that have already been packed, otherwise it will cause the state machine to exit - if _, ok := m.unsealedInfoMap.infos[sectorID]; !ok { - log.Warnf("call start packing, but sector %v not in unsealedInfoMap.infos, maybe have called", sectorID) - return nil - } - log.Infof("Starting packing sector %d", sectorID) - err := m.sectors.Send(uint64(sectorID), SectorStartPacking{}) - if err != nil { - return err - } - log.Infof("send Starting packing event success sector %d", sectorID) - - delete(m.unsealedInfoMap.infos, sectorID) - - return nil -} - -// Caller should hold m.unsealedInfoMap.lk -func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { - for tries := 0; tries < 100; tries++ { - for k, v := range m.unsealedInfoMap.infos { - pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded()) - - if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) { - return k, pads, nil - } - } - - if len(m.unsealedInfoMap.infos) > 0 { - log.Infow("tried to put a piece into an open sector, found none with enough space", "open", len(m.unsealedInfoMap.infos), "size", size, "tries", tries) - } - - ns, ssize, err := m.newDealSector(ctx) - switch err { - case nil: - m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{ - numDeals: 0, - stored: 0, - pieceSizes: nil, - ssize: ssize, - } - case errTooManySealing: - m.unsealedInfoMap.lk.Unlock() - - select { - case <-time.After(2 * time.Second): - case <-ctx.Done(): - m.unsealedInfoMap.lk.Lock() - return 0, nil, xerrors.Errorf("getting sector for piece: %w", ctx.Err()) - } - - m.unsealedInfoMap.lk.Lock() - continue - default: - return 0, nil, xerrors.Errorf("creating new sector: %w", err) - } - - return ns, nil, nil - } - - return 0, nil, xerrors.Errorf("failed to allocate piece to a sector") -} - -var errTooManySealing = errors.New("too many sectors sealing") - -// newDealSector creates a new sector for deal storage -func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) { - // First make sure we don't have too many 'open' sectors - - cfg, err := m.getConfig() - if err != nil { - return 0, 0, xerrors.Errorf("getting config: %w", err) - } - - if cfg.MaxSealingSectorsForDeals > 0 { - if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { - return 0, 0, ErrTooManySectorsSealing - } - } - - if cfg.MaxWaitDealsSectors > 0 && uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors { - // Too many sectors are sealing in parallel. Start sealing one, and retry - // allocating the piece to a sector (we're dropping the lock here, so in - // case other goroutines are also trying to create a sector, we retry in - // getSectorAndPadding instead of here - otherwise if we have lots of - // parallel deals in progress, we can start creating a ton of sectors - // with just a single deal in them) - var mostStored abi.PaddedPieceSize = math.MaxUint64 - var best abi.SectorNumber = math.MaxUint64 - - for sn, info := range m.unsealedInfoMap.infos { - if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0 - best = sn - } - } - - if best != math.MaxUint64 { - m.unsealedInfoMap.lk.Unlock() - err := m.StartPacking(best) - m.unsealedInfoMap.lk.Lock() - - if err != nil { - log.Errorf("newDealSector StartPacking error: %+v", err) - // let's pretend this is fine - } - } - - return 0, 0, errTooManySealing // will wait a bit and retry - } - - spt, err := m.currentSealProof(ctx) - if err != nil { - return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) - } - - // Now actually create a new sector - - sid, err := m.sc.Next() - if err != nil { - return 0, 0, xerrors.Errorf("getting sector number: %w", err) - } - - err = m.sealer.NewSector(context.TODO(), m.minerSector(spt, sid)) - if err != nil { - return 0, 0, xerrors.Errorf("initializing sector: %w", err) - } - - log.Infof("Creating sector %d", sid) - err = m.sectors.Send(uint64(sid), SectorStart{ - ID: sid, - SectorType: spt, - }) - - if err != nil { - return 0, 0, xerrors.Errorf("starting the sector fsm: %w", err) - } - - cf, err := m.getConfig() - if err != nil { - return 0, 0, xerrors.Errorf("getting the sealing delay: %w", err) - } - - if cf.WaitDealsDelay > 0 { - timer := time.NewTimer(cf.WaitDealsDelay) - go func() { - <-timer.C - if err := m.StartPacking(sid); err != nil { - log.Errorf("starting sector %d: %+v", sid, err) - } - }() - } - - ssize, err := spt.SectorSize() - return sid, ssize, err -} - // newSectorCC accepts a slice of pieces with no deal (junk data) func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error { spt, err := m.currentSealProof(ctx) @@ -511,9 +251,9 @@ func (m *Sealing) Address() address.Address { return m.maddr } -func getDealPerSectorLimit(size abi.SectorSize) uint64 { +func getDealPerSectorLimit(size abi.SectorSize) (int, error) { if size < 64<<30 { - return 256 + return 256, nil } - return 512 + return 512, nil } diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index 49a60795895..b636614d1e8 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -6,6 +6,8 @@ var ExistSectorStateList = map[SectorState]struct{}{ Empty: {}, WaitDeals: {}, Packing: {}, + AddPiece: {}, + AddPieceFailed: {}, GetTicket: {}, PreCommit1: {}, PreCommit2: {}, @@ -43,8 +45,9 @@ const ( UndefinedSectorState SectorState = "" // happy path - Empty SectorState = "Empty" + Empty SectorState = "Empty" // deprecated WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector + AddPiece SectorState = "AddPiece" // put deal data (and padding if required) into the sector Packing SectorState = "Packing" // sector not in sealStore, and not on chain GetTicket SectorState = "GetTicket" // generate ticket PreCommit1 SectorState = "PreCommit1" // do PreCommit1 @@ -59,6 +62,7 @@ const ( Proving SectorState = "Proving" // error modes FailedUnrecoverable SectorState = "FailedUnrecoverable" + AddPieceFailed SectorState = "AddPieceFailed" SealPreCommit1Failed SectorState = "SealPreCommit1Failed" SealPreCommit2Failed SectorState = "SealPreCommit2Failed" PreCommitFailed SectorState = "PreCommitFailed" @@ -85,7 +89,9 @@ const ( func toStatState(st SectorState) statSectorState { switch st { - case Empty, WaitDeals, Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector: + case UndefinedSectorState, Empty, WaitDeals, AddPiece: + return sstStaging + case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector: return sstSealing case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: return sstProving diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 576d22f0de9..c45abb4c3eb 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -24,6 +24,24 @@ var DealSectorPriority = 1024 var MaxTicketAge = abi.ChainEpoch(builtin.EpochsInDay * 2) func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error { + m.inputLk.Lock() + // make sure we not accepting deals into this sector + for _, c := range m.assignedPieces[m.minerSectorID(sector.SectorNumber)] { + pp := m.pendingPieces[c] + delete(m.pendingPieces, c) + if pp == nil { + log.Errorf("nil assigned pending piece %s", c) + continue + } + + // todo: return to the sealing queue (this is extremely unlikely to happen) + pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector entered packing state early")) + } + + delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) + delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber)) + m.inputLk.Unlock() + log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber) var allocated abi.UnpaddedPieceSize diff --git a/extern/storage-sealing/stats.go b/extern/storage-sealing/stats.go index 78630c216be..10852937572 100644 --- a/extern/storage-sealing/stats.go +++ b/extern/storage-sealing/stats.go @@ -9,7 +9,8 @@ import ( type statSectorState int const ( - sstSealing statSectorState = iota + sstStaging statSectorState = iota + sstSealing sstFailed sstProving nsst @@ -41,5 +42,13 @@ func (ss *SectorStats) curSealing() uint64 { ss.lk.Lock() defer ss.lk.Unlock() - return ss.totals[sstSealing] + ss.totals[sstFailed] + return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed] +} + +// return the number of sectors waiting to enter the sealing pipeline +func (ss *SectorStats) curStaging() uint64 { + ss.lk.Lock() + defer ss.lk.Unlock() + + return ss.totals[sstStaging] } diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index bde4e1c8172..72faf851262 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -71,7 +71,8 @@ type SectorInfo struct { SectorType abi.RegisteredSealProof // Packing - Pieces []Piece + CreationTime int64 // unix seconds + Pieces []Piece // PreCommit1 TicketValue abi.SealRandomness diff --git a/node/config/def.go b/node/config/def.go index af08d8cf7b8..2ff139108e4 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -74,6 +74,11 @@ type SealingConfig struct { WaitDealsDelay Duration AlwaysKeepUnsealedCopy bool + + // Keep this many sectors in sealing pipeline, start CC if needed + // todo TargetSealingSectors uint64 + + // todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above } type MinerFeeConfig struct {