From be49281e53ded47f7a8a8bcc9d74c7240e4b6c95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 20 Jan 2021 18:18:12 +0100 Subject: [PATCH] storagefsm: Add rest of checks in WaitDeals --- extern/storage-sealing/input.go | 40 +++++++++++++++++++------------ extern/storage-sealing/sealing.go | 7 +----- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 5fa2043b1fa..89cd27176f7 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -19,9 +19,6 @@ import ( ) func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { - // if full / oldish / has oldish deals goto seal - // ^ also per sector deal limit - m.inputLk.Lock() now := time.Now() @@ -35,12 +32,28 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e } } - maxDeals, err := getDealPerSectorLimit(sector.SectorType) + ssize, err := sector.SectorType.SectorSize() + if err != nil { + return xerrors.Errorf("getting sector size") + } + + maxDeals, err := getDealPerSectorLimit(ssize) if err != nil { return xerrors.Errorf("getting per-sector deal limit: %w", err) } if len(sector.dealIDs()) >= maxDeals { + // can't accept more deals + return ctx.Send(SectorStartPacking{}) + } + + var used abi.UnpaddedPieceSize + for _, piece := range sector.Pieces { + used += piece.Piece.Size.Unpadded() + } + + if used.Padded() == abi.PaddedPieceSize(ssize) { + // sector full return ctx.Send(SectorStartPacking{}) } @@ -51,7 +64,9 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e return xerrors.Errorf("getting storage config: %w", err) } + // todo check deal age, start sealing if any deal has less than X (configurable) to start deadline sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay) + if now.After(sealTime) { m.inputLk.Unlock() return ctx.Send(SectorStartPacking{}) @@ -64,17 +79,10 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e }) } - var used abi.UnpaddedPieceSize - for _, piece := range sector.Pieces { - used += piece.Piece.Size.Unpadded() - } - m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ used: used, maybeAccept: func(cid cid.Cid) error { - // todo double check space - - // todo check deal expiration + // todo check deal start deadline (configurable) sid := m.minerSectorID(sector.SectorNumber) m.assignedPieces[sid] = append(m.assignedPieces[sid], cid) @@ -120,7 +128,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er offset += p.Piece.Size.Unpadded() } - maxDeals, err := getDealPerSectorLimit(sector.SectorType) + maxDeals, err := getDealPerSectorLimit(ssize) if err != nil { return xerrors.Errorf("getting per-sector deal limit: %w", err) } @@ -134,7 +142,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er } if len(sector.dealIDs())+(i+1) > maxDeals { - // shouldn't happen, but just in case + // todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("too many deals assigned to sector %d, dropping deal", sector.SectorNumber)) continue } @@ -142,7 +150,9 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded()) if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) { - return xerrors.Errorf("piece assigned to a sector with not enough space") + // todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it + deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("piece %s assigned to sector %d with not enough space", piece, sector.SectorNumber)) + continue } offset += padLength.Unpadded() diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index aa1a73e6c87..6b8bb04d7a6 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -243,12 +243,7 @@ func (m *Sealing) Address() address.Address { return m.maddr } -func getDealPerSectorLimit(spt abi.RegisteredSealProof) (int, error) { - size, err := spt.SectorSize() - if err != nil { - return 0, err - } - +func getDealPerSectorLimit(size abi.SectorSize) (int, error) { if size < 64<<30 { return 256, nil }