Skip to content

Commit

Permalink
revert #7646 from v1.14.0 - this is not concensus breaking change and…
Browse files Browse the repository at this point in the history
… needs more testing before landing

remove ctx datastore from autobatch

testplans go mod tidy

fix ctx
  • Loading branch information
jennijuju committed Jan 14, 2022
1 parent 07847c7 commit 3f6c235
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 472 deletions.
42 changes: 21 additions & 21 deletions blockstore/autobatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int)
return bs
}

func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error {
func (bs *AutobatchBlockstore) Put(blk block.Block) error {
bs.stateLock.Lock()
defer bs.stateLock.Unlock()

Expand Down Expand Up @@ -94,33 +94,33 @@ func (bs *AutobatchBlockstore) flushWorker(ctx context.Context) {
case <-bs.flushCh:
// TODO: check if we _should_ actually flush. We could get a spurious wakeup
// here.
putErr := bs.doFlush(ctx, false)
putErr := bs.doFlush(false)
for putErr != nil {
select {
case <-ctx.Done():
return
case <-time.After(bs.flushRetryDelay):
autolog.Errorf("FLUSH ERRORED: %w, retrying after %v", putErr, bs.flushRetryDelay)
putErr = bs.doFlush(ctx, true)
putErr = bs.doFlush(true)
}
}
case <-ctx.Done():
// Do one last flush.
_ = bs.doFlush(ctx, false)
_ = bs.doFlush(false)
return
}
}
}

// caller must NOT hold stateLock
// set retryOnly to true to only retry a failed flush and not flush anything new.
func (bs *AutobatchBlockstore) doFlush(ctx context.Context, retryOnly bool) error {
func (bs *AutobatchBlockstore) doFlush(retryOnly bool) error {
bs.doFlushLock.Lock()
defer bs.doFlushLock.Unlock()

// If we failed to flush last time, try flushing again.
if bs.flushErr != nil {
bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList)
bs.flushErr = bs.backingBs.PutMany(bs.flushingBatch.blockList)
}

// If we failed, or we're _only_ retrying, bail.
Expand All @@ -137,7 +137,7 @@ func (bs *AutobatchBlockstore) doFlush(ctx context.Context, retryOnly bool) erro
bs.stateLock.Unlock()

// And try to flush it.
bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList)
bs.flushErr = bs.backingBs.PutMany(bs.flushingBatch.blockList)

// If we succeeded, reset the batch. Otherwise, we'll try again next time.
if bs.flushErr == nil {
Expand All @@ -151,7 +151,7 @@ func (bs *AutobatchBlockstore) doFlush(ctx context.Context, retryOnly bool) erro

// caller must NOT hold stateLock
func (bs *AutobatchBlockstore) Flush(ctx context.Context) error {
return bs.doFlush(ctx, false)
return bs.doFlush(false)
}

func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error {
Expand All @@ -169,9 +169,9 @@ func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error {
return bs.flushErr
}

func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
func (bs *AutobatchBlockstore) Get(c cid.Cid) (block.Block, error) {
// may seem backward to check the backingBs first, but that is the likeliest case
blk, err := bs.backingBs.Get(ctx, c)
blk, err := bs.backingBs.Get(c)
if err == nil {
return blk, nil
}
Expand All @@ -192,10 +192,10 @@ func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block,
return v, nil
}

return bs.Get(ctx, c)
return bs.Get(c)
}

func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error {
func (bs *AutobatchBlockstore) DeleteBlock(cid.Cid) error {
// if we wanted to support this, we would have to:
// - flush
// - delete from the backingBs (if present)
Expand All @@ -204,13 +204,13 @@ func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error {
return xerrors.New("deletion is unsupported")
}

func (bs *AutobatchBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
func (bs *AutobatchBlockstore) DeleteMany(cids []cid.Cid) error {
// see note in DeleteBlock()
return xerrors.New("deletion is unsupported")
}

func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, err := bs.Get(ctx, c)
func (bs *AutobatchBlockstore) Has(c cid.Cid) (bool, error) {
_, err := bs.Get(c)
if err == nil {
return true, nil
}
Expand All @@ -221,18 +221,18 @@ func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error)
return false, err
}

func (bs *AutobatchBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
blk, err := bs.Get(ctx, c)
func (bs *AutobatchBlockstore) GetSize(c cid.Cid) (int, error) {
blk, err := bs.Get(c)
if err != nil {
return 0, err
}

return len(blk.RawData()), nil
}

func (bs *AutobatchBlockstore) PutMany(ctx context.Context, blks []block.Block) error {
func (bs *AutobatchBlockstore) PutMany(blks []block.Block) error {
for _, blk := range blks {
if err := bs.Put(ctx, blk); err != nil {
if err := bs.Put(blk); err != nil {
return err
}
}
Expand All @@ -252,8 +252,8 @@ func (bs *AutobatchBlockstore) HashOnRead(enabled bool) {
bs.backingBs.HashOnRead(enabled)
}

func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
blk, err := bs.Get(ctx, cid)
func (bs *AutobatchBlockstore) View(cid cid.Cid, callback func([]byte) error) error {
blk, err := bs.Get(cid)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ var migrationsCmd = &cli.Command{

cache := nv15.NewMemMigrationCache()

blk, err := cs.GetBlock(ctx, blkCid)
blk, err := cs.GetBlock(blkCid)
if err != nil {
return err
}

migrationTs, err := cs.LoadTipSet(ctx, types.NewTipSetKey(blk.Parents...))
migrationTs, err := cs.LoadTipSet(types.NewTipSetKey(blk.Parents...))
if err != nil {
return err
}
Expand Down
181 changes: 181 additions & 0 deletions cmd/lotus-shed/terminations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package main

import (
"bytes"
"context"
"fmt"
"io"
"strconv"

"github.com/filecoin-project/lotus/chain/actors/builtin"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"

"github.com/filecoin-project/lotus/chain/actors/builtin/market"

"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/node/repo"
miner2 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/urfave/cli/v2"
)

var terminationsCmd = &cli.Command{
Name: "terminations",
Description: "Lists terminated deals from the past 2 days",
ArgsUsage: "[block to look back from] [lookback period (epochs)]",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
},
Action: func(cctx *cli.Context) error {
ctx := context.TODO()

if cctx.NArg() != 2 {
return fmt.Errorf("must pass block cid && lookback period")
}

blkCid, err := cid.Decode(cctx.Args().First())
if err != nil {
return fmt.Errorf("failed to parse input: %w", err)
}

fsrepo, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return err
}

lkrepo, err := fsrepo.Lock(repo.FullNode)
if err != nil {
return err
}

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}

defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()

mds, err := lkrepo.Datastore(context.Background(), "/metadata")
if err != nil {
return err
}

cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)

blk, err := cs.GetBlock(blkCid)
if err != nil {
return err
}

lbp, err := strconv.Atoi(cctx.Args().Get(1))
if err != nil {
return fmt.Errorf("failed to parse input: %w", err)
}

cutoff := blk.Height - abi.ChainEpoch(lbp)

for blk.Height > cutoff {
pts, err := cs.LoadTipSet(types.NewTipSetKey(blk.Parents...))
if err != nil {
return err
}

blk = pts.Blocks()[0]

msgs, err := cs.MessagesForTipset(pts)
if err != nil {
return err
}

for _, v := range msgs {
msg := v.VMMessage()
if msg.Method != miner.Methods.TerminateSectors {
continue
}

tree, err := state.LoadStateTree(cst, blk.ParentStateRoot)
if err != nil {
return err
}

minerAct, err := tree.GetActor(msg.To)
if err != nil {
return err
}

if !builtin.IsStorageMinerActor(minerAct.Code) {
continue
}

minerSt, err := miner.Load(store, minerAct)
if err != nil {
return err
}

marketAct, err := tree.GetActor(market.Address)
if err != nil {
return err
}

marketSt, err := market.Load(store, marketAct)
if err != nil {
return err
}

proposals, err := marketSt.Proposals()
if err != nil {
return err
}

var termParams miner2.TerminateSectorsParams
err = termParams.UnmarshalCBOR(bytes.NewBuffer(msg.Params))
if err != nil {
return err
}

for _, t := range termParams.Terminations {
sectors, err := minerSt.LoadSectors(&t.Sectors)
if err != nil {
return err
}

for _, sector := range sectors {
for _, deal := range sector.DealIDs {
prop, find, err := proposals.Get(deal)
if err != nil {
return err
}
if find {
fmt.Printf("%s, %d, %d, %s, %s, %s\n", msg.To, sector.SectorNumber, deal, prop.Client, prop.PieceCID, prop.Label)
}
}
}
}
}
}

return nil
},
}
4 changes: 2 additions & 2 deletions testplans/lotus-soup/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/drand/drand v1.2.1
github.com/filecoin-project/go-address v0.0.6
github.com/filecoin-project/go-data-transfer v1.11.4
github.com/filecoin-project/go-fil-markets v1.13.3
github.com/filecoin-project/go-fil-markets v1.13.4
github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-state-types v0.1.3
github.com/filecoin-project/go-storedcounter v0.1.0
Expand All @@ -20,7 +20,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/influxdata/influxdb v1.9.4 // indirect
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-datastore v0.4.6
github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-ipfs-files v0.0.9
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.3.0
Expand Down
Loading

0 comments on commit 3f6c235

Please sign in to comment.