Skip to content

Commit

Permalink
Merge pull request #7578 from filecoin-project/rvagg/SimultaneousTran…
Browse files Browse the repository at this point in the history
…sfersForStoragePerClient

feat(graphsync): allow setting of per-peer incoming requests for miners
  • Loading branch information
magik6k authored Dec 17, 2021
2 parents 6806bff + 9e7d9af commit a4728d3
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 8 deletions.
11 changes: 11 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@
# env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORSTORAGE
#SimultaneousTransfersForStorage = 20

# The maximum number of simultaneous data transfers from any single client
# for storage deals.
# Unset by default (0), and values higher than SimultaneousTransfersForStorage
# will have no effect; i.e. the total number of simultaneous data transfers
# across all storage clients is bound by SimultaneousTransfersForStorage
# regardless of this number.
#
# type: uint64
# env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORSTORAGEPERCLIENT
#SimultaneousTransfersForStoragePerClient = 0

# The maximum number of parallel online data transfers for retrieval deals
#
# type: uint64
Expand Down
2 changes: 1 addition & 1 deletion itests/deals_concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestSimultanenousTransferLimit(t *testing.T) {
)
runTest := func(t *testing.T) {
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, graphsyncThrottle))),
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, 0, graphsyncThrottle))),
node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle, graphsyncThrottle)),
))
ens.InterconnectAll().BeginMining(250 * time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion node/builder_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func ConfigStorageMiner(c interface{}) Option {
If(cfg.Subsystems.EnableMarkets,
// Markets
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForRetrieval)),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForStoragePerClient, cfg.Dealmaking.SimultaneousTransfersForRetrieval)),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),

Expand Down
5 changes: 3 additions & 2 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ func DefaultStorageMiner() *StorageMiner {
MaxDealsPerPublishMsg: 8,
MaxProviderCollateralMultiplier: 2,

SimultaneousTransfersForStorage: DefaultSimultaneousTransfers,
SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers,
SimultaneousTransfersForStorage: DefaultSimultaneousTransfers,
SimultaneousTransfersForStoragePerClient: 0,
SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers,

StartEpochSealingBuffer: 480, // 480 epochs buffer == 4 hours from adding deal to sector to sector being sealed

Expand Down
11 changes: 11 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ type DealmakingConfig struct {
MaxStagingDealsBytes int64
// The maximum number of parallel online data transfers for storage deals
SimultaneousTransfersForStorage uint64
// The maximum number of simultaneous data transfers from any single client
// for storage deals.
// Unset by default (0), and values higher than SimultaneousTransfersForStorage
// will have no effect; i.e. the total number of simultaneous data transfers
// across all storage clients is bound by SimultaneousTransfersForStorage
// regardless of this number.
SimultaneousTransfersForStoragePerClient uint64
// The maximum number of parallel online data transfers for retrieval deals
SimultaneousTransfersForRetrieval uint64
// Minimum start epoch buffer to give time for sealing of sector with deal.
Expand Down
8 changes: 4 additions & 4 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
graphsync "github.com/ipfs/go-graphsync/impl"
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -396,7 +395,7 @@ func StagingBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRe

// StagingGraphsync creates a graphsync instance which reads and writes blocks
// to the StagingBlockstore
func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForStoragePerPeer uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
lsys := storeutil.LinkSystemForBlockstore(ibs)
Expand All @@ -405,9 +404,10 @@ func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRe
lsys,
graphsync.RejectAllRequestsByDefault(),
graphsync.MaxInProgressIncomingRequests(parallelTransfersForRetrieval),
graphsync.MaxInProgressIncomingRequestsPerPeer(parallelTransfersForStoragePerPeer),
graphsync.MaxInProgressOutgoingRequests(parallelTransfersForStorage),
graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))
graphsync.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
graphsync.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))

graphsyncStats(mctx, lc, gs)

Expand Down
1 change: 1 addition & 0 deletions tools/packer/repo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ListenAddresses = ["/ip4/0.0.0.0/tcp/5678", "/ip6/::/tcp/5678"]
# IpfsMAddr = ""
# IpfsUseForRetrieval = false
# SimultaneousTransfersForStorage = 20
# SimultaneousTransfersForStoragePerClient = 0
# SimultaneousTransfersForRetrieval = 20
#
[Metrics]
Expand Down

0 comments on commit a4728d3

Please sign in to comment.