Skip to content

Commit

Permalink
refactor: use common commp throttle for direct and regular deals (#1695)
Browse files Browse the repository at this point in the history
* feat: watch sealing state until finalized

* fix: dont check deal id when watching sealing state for direct deals

* fix: checking sealing sector state

* refactor: use common commp throttle for direct and regular deals
  • Loading branch information
dirkmc authored Sep 12, 2023
1 parent b4abb59 commit af3ad60
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 39 deletions.
1 change: 1 addition & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ func ConfigBoost(cfg *config.Boost) Option {
Override(new(*storagemarket.ChainDealManager), modules.NewChainDealManager),
Override(new(smtypes.CommpCalculator), From(new(lotus_modules.MinerStorageService))),

Override(new(storagemarket.CommpThrottle), modules.NewCommpThrottle(cfg)),
Override(new(*storagemarket.DirectDealsProvider), modules.NewDirectDealsProvider(walletMiner, cfg)),
Override(new(*storagemarket.Provider), modules.NewStorageMarketProvider(walletMiner, cfg)),
Override(new(*mpoolmonitor.MpoolMonitor), modules.NewMpoolMonitor(cfg)),
Expand Down
6 changes: 3 additions & 3 deletions node/modules/directdeals.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"go.uber.org/fx"
)

func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, fullnodeApi v1api.FullNode, sqldb *sql.DB, directDealsDB *db.DirectDataDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, commpc types.CommpCalculator, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, lp gfm_storagemarket.StorageProvider, cdm *storagemarket.ChainDealManager) (*storagemarket.DirectDealsProvider, error) {
func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, fullnodeApi v1api.FullNode, sqldb *sql.DB, directDealsDB *db.DirectDataDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, lp gfm_storagemarket.StorageProvider, cdm *storagemarket.ChainDealManager) (*storagemarket.DirectDealsProvider, error) {
return func(lc fx.Lifecycle, h host.Host, fullnodeApi v1api.FullNode, sqldb *sql.DB, directDealsDB *db.DirectDataDB,
fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks,
commpc types.CommpCalculator, sps sealingpipeline.API,
commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API,
df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB,
piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper,
lp gfm_storagemarket.StorageProvider, cdm *storagemarket.ChainDealManager) (*storagemarket.DirectDealsProvider, error) {
Expand Down Expand Up @@ -60,7 +60,7 @@ func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc
RemoteCommp: cfg.Dealmaking.RemoteCommp,
}

prov := storagemarket.NewDirectDealsProvider(ddpCfg, fullnodeApi, secb, commpc, sps, directDealsDB, dl)
prov := storagemarket.NewDirectDealsProvider(ddpCfg, fullnodeApi, secb, commpc, commpt, sps, directDealsDB, dl)
return prov, nil
}
}
21 changes: 15 additions & 6 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,18 +596,27 @@ func NewLegacyStorageProvider(cfg *config.Boost) func(minerAddress lotus_dtypes.
}
}

func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, commpc types.CommpCalculator, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, lp gfm_storagemarket.StorageProvider, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) {
func NewCommpThrottle(cfg *config.Boost) func() storagemarket.CommpThrottle {
return func() storagemarket.CommpThrottle {
size := uint64(1)
if cfg.Dealmaking.MaxConcurrentLocalCommp > 1 {
size = cfg.Dealmaking.MaxConcurrentLocalCommp
}
return make(chan struct{}, size)
}
}

func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, lp gfm_storagemarket.StorageProvider, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) {
return func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB,
fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks,
commpc types.CommpCalculator, sps sealingpipeline.API,
commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API,
df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB,
piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper,
lp gfm_storagemarket.StorageProvider, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) {

prvCfg := storagemarket.Config{
MaxTransferDuration: time.Duration(cfg.Dealmaking.MaxTransferDuration),
RemoteCommp: cfg.Dealmaking.RemoteCommp,
MaxConcurrentLocalCommp: cfg.Dealmaking.MaxConcurrentLocalCommp,
MaxTransferDuration: time.Duration(cfg.Dealmaking.MaxTransferDuration),
RemoteCommp: cfg.Dealmaking.RemoteCommp,
TransferLimiter: storagemarket.TransferLimiterConfig{
MaxConcurrent: cfg.Dealmaking.HttpTransferMaxConcurrentDownloads,
StallCheckPeriod: time.Duration(cfg.Dealmaking.HttpTransferStallCheckPeriod),
Expand All @@ -619,7 +628,7 @@ func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(
}
dl := logs.NewDealLogger(logsDB)
tspt := httptransport.New(h, dl)
prov, err := storagemarket.NewProvider(prvCfg, sqldb, dealsDB, fundMgr, storageMgr, a, dp, provAddr, secb, commpc,
prov, err := storagemarket.NewProvider(prvCfg, sqldb, dealsDB, fundMgr, storageMgr, a, dp, provAddr, secb, commpc, commpt,
sps, cdm, df, logsSqlDB.db, logsDB, piecedirectory, ip, lp, &signatureVerifier{a}, dl, tspt)
if err != nil {
return nil, err
Expand Down
28 changes: 25 additions & 3 deletions storagemarket/deal_commp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,29 @@ func (p *Provider) generatePieceCommitment(filepath string, pieceSize abi.Padded
return pi.PieceCID, nil
}

// Throttle the number of concurrent local commp processes
type CommpThrottle chan struct{}

// reserve waits until a slot is available, or returns a context cancelled error
func (t CommpThrottle) reserve(ctx context.Context) *dealMakingError {
select {
case <-ctx.Done():
return &dealMakingError{
retry: types.DealRetryAuto,
error: fmt.Errorf("local commp cancelled: %w", ctx.Err()),
}
case t <- struct{}{}:
return nil
}
}

func (t CommpThrottle) release() {
<-t
}

// generatePieceCommitment generates commp either locally or remotely,
// depending on config, and pads it as necessary to match the piece size.
func generatePieceCommitment(ctx context.Context, commpCalc smtypes.CommpCalculator, throttle chan struct{}, filepath string, pieceSize abi.PaddedPieceSize, doRemoteCommP bool) (*abi.PieceInfo, *dealMakingError) {
func generatePieceCommitment(ctx context.Context, commpCalc smtypes.CommpCalculator, throttle CommpThrottle, filepath string, pieceSize abi.PaddedPieceSize, doRemoteCommP bool) (*abi.PieceInfo, *dealMakingError) {
// Check whether to send commp to a remote process or do it locally
var pi *abi.PieceInfo
if doRemoteCommP {
Expand All @@ -72,8 +92,10 @@ func generatePieceCommitment(ctx context.Context, commpCalc smtypes.CommpCalcula
return nil, err
}
} else {
throttle <- struct{}{}
defer func() { <-throttle }()
if cancelledErr := throttle.reserve(ctx); cancelledErr != nil {
return nil, cancelledErr
}
defer throttle.release()

var err error
pi, err = GenerateCommPLocally(filepath)
Expand Down
26 changes: 13 additions & 13 deletions storagemarket/direct_deals_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type DirectDealsProvider struct {
config DDPConfig
ctx context.Context // context to be stopped when stopping boostd

fullnodeApi v1api.FullNode
pieceAdder types.PieceAdder
commpCalc smtypes.CommpCalculator
sps sealingpipeline.API
fullnodeApi v1api.FullNode
pieceAdder types.PieceAdder
commpCalc smtypes.CommpCalculator
commpThrottle CommpThrottle
sps sealingpipeline.API

//db *sql.DB
directDealsDB *db.DirectDataDB
Expand All @@ -56,13 +57,14 @@ type DirectDealsProvider struct {
running map[uuid.UUID]struct{}
}

func NewDirectDealsProvider(cfg DDPConfig, fullnodeApi v1api.FullNode, pieceAdder types.PieceAdder, commpCalc smtypes.CommpCalculator, sps sealingpipeline.API, directDealsDB *db.DirectDataDB, dealLogger *logs.DealLogger) *DirectDealsProvider {
func NewDirectDealsProvider(cfg DDPConfig, fullnodeApi v1api.FullNode, pieceAdder types.PieceAdder, commpCalc smtypes.CommpCalculator, commpt CommpThrottle, sps sealingpipeline.API, directDealsDB *db.DirectDataDB, dealLogger *logs.DealLogger) *DirectDealsProvider {
return &DirectDealsProvider{
config: cfg,
fullnodeApi: fullnodeApi,
pieceAdder: pieceAdder,
commpCalc: commpCalc,
sps: sps,
config: cfg,
fullnodeApi: fullnodeApi,
pieceAdder: pieceAdder,
commpCalc: commpCalc,
commpThrottle: commpt,
sps: sps,

//db: db,
directDealsDB: directDealsDB,
Expand Down Expand Up @@ -285,12 +287,10 @@ func (ddp *DirectDealsProvider) execDeal(ctx context.Context, entry *smtypes.Dir
ddp.dealLogger.Infow(dealUuid, "size of deal", "filepath", entry.InboundFilePath, "size", fstat.Size())
ddp.dealLogger.Infow(dealUuid, "generating commp")

// throttle for local commp
throttle := make(chan struct{}, 1)
// TODO: should we be passing pieceSize here ??!?
pieceSize := abi.UnpaddedPieceSize(fstat.Size())

generatedPieceInfo, dmErr := generatePieceCommitment(ctx, ddp.commpCalc, throttle, entry.InboundFilePath, pieceSize.Padded(), ddp.config.RemoteCommp)
generatedPieceInfo, dmErr := generatePieceCommitment(ctx, ddp.commpCalc, ddp.commpThrottle, entry.InboundFilePath, pieceSize.Padded(), ddp.config.RemoteCommp)
if dmErr != nil {
return &dealMakingError{
retry: types.DealRetryManual,
Expand Down
17 changes: 5 additions & 12 deletions storagemarket/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ type Config struct {
// The maximum amount of time a transfer can take before it fails
MaxTransferDuration time.Duration
// Whether to do commp on the Boost node (local) or the sealing node (remote)
RemoteCommp bool
// The number of commp processes that can run in parallel
MaxConcurrentLocalCommp uint64
TransferLimiter TransferLimiterConfig
RemoteCommp bool
TransferLimiter TransferLimiterConfig
// Cleanup deal logs from DB older than this many number of days
DealLogDurationDays int
// Cache timeout for Sealing Pipeline status
Expand Down Expand Up @@ -120,7 +118,7 @@ type Provider struct {
transfers *dealTransfers

pieceAdder types.PieceAdder
commpThrottle chan struct{}
commpThrottle CommpThrottle
commpCalc smtypes.CommpCalculator
maxDealCollateralMultiplier uint64
chainDealManager types.ChainDealManager
Expand All @@ -139,7 +137,7 @@ type Provider struct {
}

func NewProvider(cfg Config, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager,
fullnodeApi v1api.FullNode, dp types.DealPublisher, addr address.Address, pa types.PieceAdder, commpCalc smtypes.CommpCalculator,
fullnodeApi v1api.FullNode, dp types.DealPublisher, addr address.Address, pa types.PieceAdder, commpCalc smtypes.CommpCalculator, commpThrottle CommpThrottle,
sps sealingpipeline.API, cm types.ChainDealManager, df dtypes.StorageDealFilter, logsSqlDB *sql.DB, logsDB *db.LogsDB,
piecedirectory *piecedirectory.PieceDirectory, ip types.IndexProvider, askGetter types.AskGetter,
sigVerifier types.SignatureVerifier, dl *logs.DealLogger, tspt transport.Transport) (*Provider, error) {
Expand All @@ -155,11 +153,6 @@ func NewProvider(cfg Config, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundma
}
ctx, cancel := context.WithCancel(context.Background())

// Make sure that max concurrent local commp is at least 1
if cfg.MaxConcurrentLocalCommp == 0 {
cfg.MaxConcurrentLocalCommp = 1
}

if cfg.SealingPipelineCacheTimeout < 0 {
cfg.SealingPipelineCacheTimeout = 30 * time.Second
}
Expand Down Expand Up @@ -191,7 +184,7 @@ func NewProvider(cfg Config, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundma
dealPublisher: dp,
fullnodeApi: fullnodeApi,
pieceAdder: pa,
commpThrottle: make(chan struct{}, cfg.MaxConcurrentLocalCommp),
commpThrottle: commpThrottle,
commpCalc: commpCalc,
chainDealManager: cm,
maxDealCollateralMultiplier: 2,
Expand Down
6 changes: 4 additions & 2 deletions storagemarket/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1639,7 +1639,8 @@ func NewHarness(t *testing.T, opts ...harnessOpt) *ProviderHarness {
SealingPipelineCacheTimeout: time.Second,
StorageFilter: "1",
}
prov, err := NewProvider(prvCfg, sqldb, dealsDB, fm, sm, fn, minerStub, minerAddr, minerStub, minerStub, sps, minerStub, df, sqldb,
commpThrottle := make(chan struct{}, 1)
prov, err := NewProvider(prvCfg, sqldb, dealsDB, fm, sm, fn, minerStub, minerAddr, minerStub, minerStub, commpThrottle, sps, minerStub, df, sqldb,
logsDB, pm, minerStub, askStore, &mockSignatureVerifier{true, nil}, dl, tspt)
require.NoError(t, err)
ph.Provider = prov
Expand Down Expand Up @@ -1706,8 +1707,9 @@ func (h *ProviderHarness) shutdownAndCreateNewProvider(t *testing.T, opts ...har
t.Cleanup(cancel)

// construct a new provider with pre-existing state
commpThrottle := make(chan struct{}, 1)
prov, err := NewProvider(h.Provider.config, h.Provider.db, h.Provider.dealsDB, h.Provider.fundManager,
h.Provider.storageManager, h.Provider.fullnodeApi, h.MinerStub, h.MinerAddr, h.MinerStub, h.MinerStub, h.MockSealingPipelineAPI, h.MinerStub,
h.Provider.storageManager, h.Provider.fullnodeApi, h.MinerStub, h.MinerAddr, h.MinerStub, h.MinerStub, commpThrottle, h.MockSealingPipelineAPI, h.MinerStub,
df, h.Provider.logsSqlDB, h.Provider.logsDB, pm, h.MinerStub, h.Provider.askGetter,
h.Provider.sigVerifier, h.Provider.dealLogger, h.Provider.Transport)

Expand Down

0 comments on commit af3ad60

Please sign in to comment.