Skip to content

Commit

Permalink
refactor: build indexes for legacy deals (#1539)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored and LexLuthr committed Jul 20, 2023
1 parent 5f093e2 commit 4ad47b4
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 57 deletions.
5 changes: 3 additions & 2 deletions gql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/fundmanager"
gqltypes "github.com/filecoin-project/boost/gql/types"
"github.com/filecoin-project/boost/indexprovider"
"github.com/filecoin-project/boost/lib/legacy"
"github.com/filecoin-project/boost/lib/mpoolmonitor"
"github.com/filecoin-project/boost/markets/storageadapter"
Expand Down Expand Up @@ -64,6 +63,7 @@ type resolver struct {
fundMgr *fundmanager.FundManager
storageMgr *storagemanager.StorageManager
provider *storagemarket.Provider
legacyDeals *legacy.LegacyDealsManager
legacyProv gfm_storagemarket.StorageProvider
legacyDT dtypes.ProviderDataTransfer
ps piecestore.PieceStore
Expand All @@ -75,7 +75,7 @@ type resolver struct {
fullNode v1api.FullNode
}

func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, fullNode v1api.FullNode, ssm *sectorstatemgr.SectorStateMgr) *resolver {
func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, legacyDeals *legacy.LegacyDealsManager, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, fullNode v1api.FullNode, ssm *sectorstatemgr.SectorStateMgr, mpool *mpoolmonitor.MpoolMonitor) *resolver {
return &resolver{
ctx: ctx,
cfg: cfg,
Expand All @@ -89,6 +89,7 @@ func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo
fundMgr: fundMgr,
storageMgr: storageMgr,
provider: provider,
legacyDeals: legacyDeals,
legacyProv: legacyProv,
legacyDT: legacyDT,
ps: ps,
Expand Down
72 changes: 41 additions & 31 deletions gql/resolver_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package gql
import (
"context"
"fmt"
"sort"
"time"

"github.com/filecoin-project/boost-gfm/retrievalmarket"
"github.com/filecoin-project/boost-gfm/storagemarket"
gqltypes "github.com/filecoin-project/boost/gql/types"
pdtypes "github.com/filecoin-project/boost/piecedirectory/types"
"github.com/filecoin-project/boostd-data/svc/types"
Expand All @@ -15,6 +15,7 @@ import (
"github.com/graph-gophers/graphql-go"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"golang.org/x/sync/errgroup"
)

type IndexStatus string
Expand Down Expand Up @@ -65,8 +66,10 @@ type pieceResolver struct {
}

type flaggedPieceResolver struct {
Piece *pieceResolver
CreatedAt graphql.Time
PieceCid string
IndexStatus *indexStatus
DealCount int32
CreatedAt graphql.Time
}

type piecesFlaggedArgs struct {
Expand Down Expand Up @@ -117,22 +120,40 @@ func (r *resolver) PiecesFlagged(ctx context.Context, args piecesFlaggedArgs) (*
return nil, err
}

allLegacyDeals, err := r.legacyProv.ListLocalDeals()
if err != nil {
return nil, err
}

var eg errgroup.Group
flaggedPieceResolvers := make([]*flaggedPieceResolver, 0, len(flaggedPieces))
for _, flaggedPiece := range flaggedPieces {
pieceResolver, err := r.pieceStatus(ctx, flaggedPiece.PieceCid, allLegacyDeals)
if err != nil {
return nil, err
}
flaggedPieceResolvers = append(flaggedPieceResolvers, &flaggedPieceResolver{
Piece: pieceResolver,
CreatedAt: graphql.Time{Time: flaggedPiece.CreatedAt},
flaggedPiece := flaggedPiece
eg.Go(func() error {
// Get piece info from local index directory
pieceInfo, pmErr := r.piecedirectory.GetPieceMetadata(ctx, flaggedPiece.PieceCid)
if pmErr != nil && !types.IsNotFound(pmErr) {
return pmErr
}

// Get the state of the piece's index
idxStatus, err := r.getIndexStatus(pieceInfo, pmErr)
if err != nil {
return err
}

flaggedPieceResolvers = append(flaggedPieceResolvers, &flaggedPieceResolver{
PieceCid: flaggedPiece.PieceCid.String(),
IndexStatus: idxStatus,
DealCount: int32(len(pieceInfo.Deals)),
CreatedAt: graphql.Time{Time: flaggedPiece.CreatedAt},
})
return nil
})
}
err = eg.Wait()
if err != nil {
return nil, err
}

sort.Slice(flaggedPieceResolvers, func(i, j int) bool {
return flaggedPieceResolvers[i].CreatedAt.After(flaggedPieces[j].CreatedAt)
})

return &flaggedPieceListResolver{
TotalCount: int32(count),
Expand Down Expand Up @@ -255,15 +276,6 @@ func (r *resolver) PieceStatus(ctx context.Context, args struct{ PieceCid string
return nil, fmt.Errorf("%s is not a valid piece cid", args.PieceCid)
}

allLegacyDeals, err := r.legacyProv.ListLocalDeals()
if err != nil {
return nil, err
}

return r.pieceStatus(ctx, pieceCid, allLegacyDeals)
}

func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyDeals []storagemarket.MinerDeal) (*pieceResolver, error) {
// Get piece info from local index directory
pieceInfo, pmErr := r.piecedirectory.GetPieceMetadata(ctx, pieceCid)
if pmErr != nil && !types.IsNotFound(pmErr) {
Expand All @@ -277,11 +289,9 @@ func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyD
}

// Get legacy markets deals by piece Cid
var legacyDeals []storagemarket.MinerDeal
for _, dl := range allLegacyDeals {
if dl.Ref.PieceCid != nil && *dl.Ref.PieceCid == pieceCid {
legacyDeals = append(legacyDeals, dl)
}
legacyDeals, err := r.legacyDeals.ByPieceCid(ctx, pieceCid)
if err != nil {
return nil, err
}

// Convert local index directory deals to graphQL format
Expand Down Expand Up @@ -362,7 +372,7 @@ func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyD
}

// Get the state of the piece's index
idxStatus, err := r.getIndexStatus(ctx, pieceCid, pieceInfo, pmErr, deals)
idxStatus, err := r.getIndexStatus(pieceInfo, pmErr)
if err != nil {
return nil, err
}
Expand All @@ -375,7 +385,7 @@ func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyD
}, nil
}

func (r *resolver) getIndexStatus(ctx context.Context, pieceCid cid.Cid, md pdtypes.PieceDirMetadata, mdErr error, deals []*pieceDealResolver) (*indexStatus, error) {
func (r *resolver) getIndexStatus(md pdtypes.PieceDirMetadata, mdErr error) (*indexStatus, error) {
var idxst IndexStatus
idxerr := ""

Expand Down
4 changes: 3 additions & 1 deletion gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ type PieceStatus {

type FlaggedPieceStatus {
CreatedAt: Time!
Piece: PieceStatus!
PieceCid: String!
IndexStatus: IndexStatus!
DealCount: Int!
}

type FlaggedPiecesList {
Expand Down
2 changes: 2 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/filecoin-project/boost/fundmanager"
"github.com/filecoin-project/boost/gql"
"github.com/filecoin-project/boost/indexprovider"
"github.com/filecoin-project/boost/lib/legacy"
"github.com/filecoin-project/boost/lib/mpoolmonitor"
"github.com/filecoin-project/boost/markets/idxprov"
"github.com/filecoin-project/boost/markets/retrievaladapter"
storageadapter "github.com/filecoin-project/boost/markets/storageadapter"
Expand Down
24 changes: 20 additions & 4 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,22 @@ func NewChainDealManager(a v1api.FullNode) *storagemarket.ChainDealManager {
return storagemarket.NewChainDealManager(a, cdmCfg)
}

func NewLegacyDealsManager(lc fx.Lifecycle, legacyProv gfm_storagemarket.StorageProvider) *legacy.LegacyDealsManager {
ctx, cancel := context.WithCancel(context.Background())
mgr := legacy.NewLegacyDealsManager(legacyProv)
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
go mgr.Run(ctx)
return nil
},
OnStop: func(_ context.Context) error {
cancel()
return nil
},
})
return mgr
}

func NewStorageAsk(ctx helpers.MetricsCtx, fapi v1api.FullNode, ds lotus_dtypes.MetadataDS, minerAddress lotus_dtypes.MinerAddress, spn gfm_storagemarket.StorageProviderNode) (*storedask.StoredAsk, error) {
mi, err := fapi.StateMinerInfo(ctx, address.Address(minerAddress), ltypes.EmptyTSK)
if err != nil {
Expand Down Expand Up @@ -630,14 +646,14 @@ func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(
}
}

func NewGraphqlServer(cfg *config.Boost) func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, prov *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, publisher *storageadapter.DealPublisher, spApi sealingpipeline.API, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter, ssm *sectorstatemgr.SectorStateMgr) *gql.Server {
func NewGraphqlServer(cfg *config.Boost) func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, prov *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, publisher *storageadapter.DealPublisher, spApi sealingpipeline.API, legacyDeals *legacy.LegacyDealsManager, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter, ssm *sectorstatemgr.SectorStateMgr, mpool *mpoolmonitor.MpoolMonitor) *gql.Server {
return func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, prov *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager,
storageMgr *storagemanager.StorageManager, publisher *storageadapter.DealPublisher, spApi sealingpipeline.API,
legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer,
ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter, ssm *sectorstatemgr.SectorStateMgr) *gql.Server {
legacyDeals *legacy.LegacyDealsManager, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer,
ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter, ssm *sectorstatemgr.SectorStateMgr, mpool *mpoolmonitor.MpoolMonitor) *gql.Server {

resolverCtx, cancel := context.WithCancel(context.Background())
resolver := gql.NewResolver(resolverCtx, cfg, r, h, dealsDB, logsDB, retDB, plDB, fundsDB, fundMgr, storageMgr, spApi, prov, legacyProv, legacyDT, ps, sa, piecedirectory, publisher, fullNode, ssm)
resolver := gql.NewResolver(resolverCtx, cfg, r, h, dealsDB, logsDB, retDB, plDB, fundsDB, fundMgr, storageMgr, spApi, prov, legacyDeals, legacyProv, legacyDT, ps, sa, piecedirectory, publisher, fullNode, ssm, mpool)
server := gql.NewServer(resolver, bg)

lc.Append(fx.Hook{
Expand Down
10 changes: 5 additions & 5 deletions react/src/LID.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ function FlaggedPieces({setSearchQuery}) {

{rows.map(piece => (
<FlaggedPieceRow
key={piece.Piece.PieceCid}
piece={piece.Piece}
key={piece.PieceCid}
piece={piece}
setSearchQuery={setSearchQuery}
/>
))}
Expand Down Expand Up @@ -348,7 +348,7 @@ function FlaggedPieceRow({piece}) {
</Link>
</td>
<td>{piece.IndexStatus.Status}</td>
<td>{piece.Deals.length}</td>
<td>{piece.DealCount}</td>
</tr>
}

Expand Down Expand Up @@ -436,8 +436,8 @@ function NoUnsealedSectorPieces() {

{rows.map(piece => (
<FlaggedPieceRow
key={piece.Piece.PieceCid}
piece={piece.Piece}
key={piece.PieceCid}
piece={piece}
/>
))}
</tbody>
Expand Down
19 changes: 5 additions & 14 deletions react/src/gql.js
Original file line number Diff line number Diff line change
Expand Up @@ -402,21 +402,12 @@ const FlaggedPiecesQuery = gql`
piecesFlagged(hasUnsealedCopy: $hasUnsealedCopy, cursor: $cursor, offset: $offset, limit: $limit) {
pieces {
CreatedAt
Piece {
PieceCid
IndexStatus {
Status
Error
}
Deals {
Deal {
ID
IsLegacy
CreatedAt
DealDataRoot
}
}
PieceCid
IndexStatus {
Status
Error
}
DealCount
}
totalCount
more
Expand Down

0 comments on commit 4ad47b4

Please sign in to comment.