Skip to content

Commit

Permalink
GraphQL resolvers for LID (#1494)
Browse files Browse the repository at this point in the history
* wip

* rename

* sectorUnsealedCopies and SectorProvingState
  • Loading branch information
nonsense authored Jun 7, 2023
1 parent 3f79cda commit 4e14ce0
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 222 deletions.
5 changes: 4 additions & 1 deletion gql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/boost/node/modules/dtypes"
"github.com/filecoin-project/boost/piecedirectory"
"github.com/filecoin-project/boost/retrievalmarket/rtvllog"
"github.com/filecoin-project/boost/sectorstatemgr"
"github.com/filecoin-project/boost/storagemanager"
"github.com/filecoin-project/boost/storagemarket"
"github.com/filecoin-project/boost/storagemarket/sealingpipeline"
Expand Down Expand Up @@ -62,14 +63,15 @@ type resolver struct {
legacyProv gfm_storagemarket.StorageProvider
legacyDT dtypes.ProviderDataTransfer
ps piecestore.PieceStore
ssm *sectorstatemgr.SectorStateMgr
sa retrievalmarket.SectorAccessor
piecedirectory *piecedirectory.PieceDirectory
publisher *storageadapter.DealPublisher
spApi sealingpipeline.API
fullNode v1api.FullNode
}

func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, fullNode v1api.FullNode) *resolver {
func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, fullNode v1api.FullNode, ssm *sectorstatemgr.SectorStateMgr) *resolver {
return &resolver{
ctx: ctx,
cfg: cfg,
Expand All @@ -91,6 +93,7 @@ func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo
publisher: publisher,
spApi: spApi,
fullNode: fullNode,
ssm: ssm,
}
}

Expand Down
91 changes: 91 additions & 0 deletions gql/resolver_lid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package gql

import (
"context"
"time"

"github.com/filecoin-project/boost/db"
gqltypes "github.com/filecoin-project/boost/gql/types"
"github.com/filecoin-project/boost/sectorstatemgr"
)

type dealData struct {
Indexed gqltypes.Uint64
FlaggedUnsealed gqltypes.Uint64
FlaggedSealed gqltypes.Uint64
}

type pieces struct {
Indexed int32
FlaggedUnsealed int32
FlaggedSealed int32
}

type sectorUnsealedCopies struct {
Unsealed int32
Sealed int32
}

type sectorProvingState struct {
Active int32
Inactive int32
}

type lidState struct {
DealData dealData
Pieces pieces
SectorUnsealedCopies sectorUnsealedCopies
SectorProvingState sectorProvingState
FlaggedPieces int32
}

// query: lid: [LID]
func (r *resolver) LID(ctx context.Context) (*lidState, error) {
var lu *sectorstatemgr.SectorStateUpdates
for lu == nil {
r.ssm.LatestUpdateMu.Lock()
lu = r.ssm.LatestUpdate
r.ssm.LatestUpdateMu.Unlock()
if lu == nil {
time.Sleep(2 * time.Second)
}
}

var sealed, unsealed int32
for _, s := range lu.SectorStates { // TODO: consider adding this data directly in SSM
if s == db.SealStateUnsealed {
unsealed++
} else if s == db.SealStateSealed {
sealed++
}
}

fp, err := r.piecedirectory.FlaggedPiecesCount(ctx)
if err != nil {
return nil, err
}

ls := &lidState{
FlaggedPieces: int32(fp),
DealData: dealData{
Indexed: gqltypes.Uint64(12094627905536),
FlaggedUnsealed: gqltypes.Uint64(1094627905536),
FlaggedSealed: gqltypes.Uint64(18094627905536),
},
Pieces: pieces{
Indexed: 360,
FlaggedUnsealed: 33,
FlaggedSealed: 480,
},
SectorUnsealedCopies: sectorUnsealedCopies{
Sealed: sealed,
Unsealed: unsealed,
},
SectorProvingState: sectorProvingState{
Active: int32(len(lu.ActiveSectors)),
Inactive: int32(len(lu.SectorStates) - len(lu.ActiveSectors)), // TODO: add an explicit InactiveSectors in ssm
},
}

return ls, nil
}
33 changes: 33 additions & 0 deletions gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,36 @@ type SealingPipeline {
Workers: [Worker]!
}

type DealData {
Indexed: Uint64!
FlaggedUnsealed: Uint64!
FlaggedSealed: Uint64!
}

type Pieces {
Indexed: Int!
FlaggedUnsealed: Int!
FlaggedSealed: Int!
}

type SectorUnsealedCopies {
Unsealed: Int!
Sealed: Int!
}

type SectorProvingState {
Active: Int!
Inactive: Int!
}

type LID {
DealData: DealData!
Pieces: Pieces!
SectorUnsealedCopies: SectorUnsealedCopies!
SectorProvingState: SectorProvingState!
FlaggedPieces: Int!
}

type FundsEscrow {
Available: BigInt!
Locked: BigInt!
Expand Down Expand Up @@ -476,6 +506,9 @@ type RootQuery {
"""Get sealing pipeline state"""
sealingpipeline: SealingPipeline!

"""Get LID state"""
lid: LID!

"""Get funds available"""
funds: Funds!

Expand Down
7 changes: 4 additions & 3 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/boost/piecedirectory"
brm "github.com/filecoin-project/boost/retrievalmarket/lib"
"github.com/filecoin-project/boost/retrievalmarket/rtvllog"
"github.com/filecoin-project/boost/sectorstatemgr"
"github.com/filecoin-project/boost/storagemanager"
"github.com/filecoin-project/boost/storagemarket"
"github.com/filecoin-project/boost/storagemarket/logs"
Expand Down Expand Up @@ -628,14 +629,14 @@ func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(
}
}

func NewGraphqlServer(cfg *config.Boost) func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, prov *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, publisher *storageadapter.DealPublisher, spApi sealingpipeline.API, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter) *gql.Server {
func NewGraphqlServer(cfg *config.Boost) func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, prov *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, publisher *storageadapter.DealPublisher, spApi sealingpipeline.API, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter, ssm *sectorstatemgr.SectorStateMgr) *gql.Server {
return func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, prov *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager,
storageMgr *storagemanager.StorageManager, publisher *storageadapter.DealPublisher, spApi sealingpipeline.API,
legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer,
ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter) *gql.Server {
ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter, ssm *sectorstatemgr.SectorStateMgr) *gql.Server {

resolverCtx, cancel := context.WithCancel(context.Background())
resolver := gql.NewResolver(resolverCtx, cfg, r, h, dealsDB, logsDB, retDB, plDB, fundsDB, fundMgr, storageMgr, spApi, prov, legacyProv, legacyDT, ps, sa, piecedirectory, publisher, fullNode)
resolver := gql.NewResolver(resolverCtx, cfg, r, h, dealsDB, logsDB, retDB, plDB, fundsDB, fundMgr, storageMgr, spApi, prov, legacyProv, legacyDT, ps, sa, piecedirectory, publisher, fullNode, ssm)
server := gql.NewServer(resolver, bg)

lc.Append(fx.Hook{
Expand Down
32 changes: 3 additions & 29 deletions piecedirectory/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"math/rand"
"sync"
"time"

"github.com/filecoin-project/boost/db"
Expand All @@ -27,9 +26,6 @@ var doclog = logging.Logger("piecedoc")
type Doctor struct {
store *bdclient.Store
ssm *sectorstatemgr.SectorStateMgr

latestUpdateMu sync.Mutex
latestUpdate *sectorstatemgr.SectorStateUpdates
}

func NewDoctor(store *bdclient.Store, ssm *sectorstatemgr.SectorStateMgr) *Doctor {
Expand All @@ -42,28 +38,6 @@ const avgCheckInterval = 30 * time.Second
func (d *Doctor) Run(ctx context.Context) {
doclog.Info("piece doctor: running")

go func() {
sub := d.ssm.PubSub.Subscribe()

for {
select {
case u, ok := <-sub:
if !ok {
log.Debugw("state updates subscription closed")
return
}
log.Debugw("got state updates from SectorStateMgr", "len(u.updates)", len(u.Updates), "len(u.active)", len(u.ActiveSectors), "u.updatedAt", u.UpdatedAt)

d.latestUpdateMu.Lock()
d.latestUpdate = u
d.latestUpdateMu.Unlock()

case <-ctx.Done():
return
}
}
}()

timer := time.NewTimer(0)
defer timer.Stop()

Expand All @@ -76,9 +50,9 @@ func (d *Doctor) Run(ctx context.Context) {

err := func() error {
var lu *sectorstatemgr.SectorStateUpdates
d.latestUpdateMu.Lock()
lu = d.latestUpdate
d.latestUpdateMu.Unlock()
d.ssm.LatestUpdateMu.Lock()
lu = d.ssm.LatestUpdate
d.ssm.LatestUpdateMu.Unlock()
if lu == nil {
doclog.Warn("sector state manager not yet updated")
return nil
Expand Down
Loading

0 comments on commit 4e14ce0

Please sign in to comment.