Skip to content

Commit

Permalink
Upgrade/alphanet weight (#3536)
Browse files Browse the repository at this point in the history
* Integration test of new weight

* Upgrade weight function
- Introduce non-final arbitrary alphanet upgrade height
- IsHeavier handles upgrade selection

* Refactor weight to take state root

* Upgrade parent weight validation
- Fixed errors and got functional test passing
- Moved to selecting on versions from protocol table
  • Loading branch information
ZenGround0 authored Oct 10, 2019
1 parent f1d0cc1 commit 781f2ca
Show file tree
Hide file tree
Showing 20 changed files with 496 additions and 100 deletions.
66 changes: 55 additions & 11 deletions chain/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ type syncChainSelector interface {
// IsHeaver returns true if tipset a is heavier than tipset b and false if
// tipset b is heavier than tipset a.
IsHeavier(ctx context.Context, a, b types.TipSet, aStateID, bStateID cid.Cid) (bool, error)
// NewWeight returns the weight of a tipset after the upgrade to version 1
NewWeight(ctx context.Context, ts types.TipSet, stRoot cid.Cid) (uint64, error)
}

type syncStateEvaluator interface {
// RunStateTransition returns the state root CID resulting from applying the input ts to the
// prior `stateRoot`. It returns an error if the transition is invalid.
RunStateTransition(ctx context.Context, ts types.TipSet, tsMessages [][]*types.SignedMessage, tsReceipts [][]*types.MessageReceipt, ancestors []types.TipSet, stateID cid.Cid) (cid.Cid, error)
RunStateTransition(ctx context.Context, ts types.TipSet, tsMessages [][]*types.SignedMessage, tsReceipts [][]*types.MessageReceipt, ancestors []types.TipSet, parentWeight uint64, stateID cid.Cid) (cid.Cid, error)
}

// Syncer updates its chain.Store according to the methods of its
Expand Down Expand Up @@ -134,7 +136,7 @@ func NewSyncer(e syncStateEvaluator, cs syncChainSelector, s syncerChainReaderWr
//
// Precondition: the caller of syncOne must hold the syncer's lock (syncer.mu) to
// ensure head is not modified by another goroutine during run.
func (syncer *Syncer) syncOne(ctx context.Context, parent, next types.TipSet) error {
func (syncer *Syncer) syncOne(ctx context.Context, grandParent, parent, next types.TipSet) error {
priorHeadKey := syncer.chainStore.GetHead()

// if tipset is already priorHeadKey, we've been here before. do nothing.
Expand Down Expand Up @@ -162,6 +164,7 @@ func (syncer *Syncer) syncOne(ctx context.Context, parent, next types.TipSet) er
return err
}

// Gather tipset messages
var nextMessages [][]*types.SignedMessage
var nextReceipts [][]*types.MessageReceipt
for i := 0; i < next.Len(); i++ {
Expand All @@ -178,9 +181,15 @@ func (syncer *Syncer) syncOne(ctx context.Context, parent, next types.TipSet) er
nextReceipts = append(nextReceipts, rcpts)
}

// Gather validated parent weight
parentWeight, err := syncer.calculateParentWeight(ctx, parent, grandParent)
if err != nil {
return err
}

// Run a state transition to validate the tipset and compute
// a new state to add to the store.
root, err := syncer.stateEvaluator.RunStateTransition(ctx, next, nextMessages, nextReceipts, ancestors, stateRoot)
root, err := syncer.stateEvaluator.RunStateTransition(ctx, next, nextMessages, nextReceipts, ancestors, parentWeight, stateRoot)
if err != nil {
return err
}
Expand Down Expand Up @@ -233,6 +242,44 @@ func (syncer *Syncer) syncOne(ctx context.Context, parent, next types.TipSet) er
return nil
}

// TODO #3537 this should be stored the first time it is computed and retrieved
// from disk just like aggregate state roots.
func (syncer *Syncer) calculateParentWeight(ctx context.Context, parent, grandParent types.TipSet) (uint64, error) {
if grandParent.Equals(types.UndefTipSet) {
return syncer.chainSelector.NewWeight(ctx, parent, cid.Undef)
}
gpStRoot, err := syncer.chainStore.GetTipSetStateRoot(grandParent.Key())
if err != nil {
return 0, err
}
return syncer.chainSelector.NewWeight(ctx, parent, gpStRoot)
}

// ancestorsFromStore returns the parent and grandparent tipsets of `ts`
func (syncer *Syncer) ancestorsFromStore(ts types.TipSet) (types.TipSet, types.TipSet, error) {
parentCids, err := ts.Parents()
if err != nil {
return types.UndefTipSet, types.UndefTipSet, err
}
parent, err := syncer.chainStore.GetTipSet(parentCids)
if err != nil {
return types.UndefTipSet, types.UndefTipSet, err
}
grandParentCids, err := parent.Parents()
if err != nil {
return types.UndefTipSet, types.UndefTipSet, err
}
if grandParentCids.Empty() {
// parent == genesis ==> grandParent undef
return parent, types.UndefTipSet, nil
}
grandParent, err := syncer.chainStore.GetTipSet(grandParentCids)
if err != nil {
return types.UndefTipSet, types.UndefTipSet, err
}
return parent, grandParent, nil
}

func (syncer *Syncer) logReorg(ctx context.Context, curHead, newHead types.TipSet) {
curHeadIter := IterAncestors(ctx, syncer.chainStore, curHead)
newHeadIter := IterAncestors(ctx, syncer.chainStore, newHead)
Expand Down Expand Up @@ -329,7 +376,7 @@ func (syncer *Syncer) HandleNewTipSet(ctx context.Context, ci *types.ChainInfo,
span.AddAttributes(trace.StringAttribute("tipset", ci.Head.String()))
defer tracing.AddErrorEndSpan(ctx, span, &err)

// This lock could last a long time as we fetch all the blocks needed to block the chain.
// This lock could last a long time as we fetch all the blocks needed to sync the chain.
// This is justified because the app is pretty useless until it is synced.
// It's better for multiple calls to wait here than to try to fetch the chain independently.
syncer.mu.Lock()
Expand Down Expand Up @@ -379,11 +426,7 @@ func (syncer *Syncer) HandleNewTipSet(ctx context.Context, ci *types.ChainInfo,
// Fetcher returns chain in Traversal order, reverse it to height order
Reverse(chain)

parentCids, err := chain[0].Parents()
if err != nil {
return err
}
parent, err := syncer.chainStore.GetTipSet(parentCids)
parent, grandParent, err := syncer.ancestorsFromStore(chain[0])
if err != nil {
return err
}
Expand All @@ -401,7 +444,7 @@ func (syncer *Syncer) HandleNewTipSet(ctx context.Context, ci *types.ChainInfo,
}
if wts.Defined() {
logSyncer.Debug("attempt to sync after widen")
err = syncer.syncOne(ctx, parent, wts)
err = syncer.syncOne(ctx, grandParent, parent, wts)
if err != nil {
return err
}
Expand All @@ -414,7 +457,7 @@ func (syncer *Syncer) HandleNewTipSet(ctx context.Context, ci *types.ChainInfo,
// as a performance optimization, because this tipset cannot be heavier
// than the widened first tipset.
if !wts.Defined() || len(chain) > 1 {
err = syncer.syncOne(ctx, parent, ts)
err = syncer.syncOne(ctx, grandParent, parent, ts)
if err != nil {
// While `syncOne` can indeed fail for reasons other than consensus,
// adding to the badTipSets at this point is the simplest, since we
Expand All @@ -428,6 +471,7 @@ func (syncer *Syncer) HandleNewTipSet(ctx context.Context, ci *types.ChainInfo,
if i%500 == 0 {
logSyncer.Infof("processing block %d of %v for chain with head at %v", i, len(chain), ci.Head.String())
}
grandParent = parent
parent = ts
}
return nil
Expand Down
207 changes: 207 additions & 0 deletions chain/syncer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ import (
"time"

bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-hamt-ipld"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-exchange-offline"

"github.com/filecoin-project/go-filecoin/address"
"github.com/filecoin-project/go-filecoin/chain"
"github.com/filecoin-project/go-filecoin/consensus"
"github.com/filecoin-project/go-filecoin/repo"
"github.com/filecoin-project/go-filecoin/state"
th "github.com/filecoin-project/go-filecoin/testhelpers"
tf "github.com/filecoin-project/go-filecoin/testhelpers/testflags"
"github.com/filecoin-project/go-filecoin/types"
"github.com/filecoin-project/go-filecoin/version"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -107,3 +110,207 @@ func TestLoadFork(t *testing.T) {
// The left chain is ok without any fetching though.
assert.NoError(t, offlineSyncer.HandleNewTipSet(ctx, types.NewChainInfo("", left.Key(), heightFromTip(t, left)), true))
}

// Power table weight comparisons impact syncer's selection.
// One fork has more blocks but less total power.
// Verify that the heavier fork is the one with more power.
// All blocks in this test follow protocol version 1 upgrade weighting rules.
func TestSyncerWeighsPower(t *testing.T) {
cst := hamt.NewCborStore()
ctx := context.Background()
pvt, err := version.ConfigureProtocolVersions(version.TEST)
require.NoError(t, err)
isb := newIntegrationStateBuilder(t, cst, pvt)
builder := chain.NewBuilderWithState(t, address.Undef, isb)

// Construct genesis with readable state tree root
gen := builder.BuildOneOn(types.UndefTipSet, func(bb *chain.BlockBuilder) {})

// Builder constructs two different blocks with different state trees
// for building two forks.
split := builder.BuildOn(gen, 2, func(bb *chain.BlockBuilder, i int) {
if i == 1 {
keys := types.MustGenerateKeyInfo(1, 42)
mm := types.NewMessageMaker(t, keys)
addr := mm.Addresses()[0]
bb.AddMessages(
[]*types.SignedMessage{
mm.NewSignedMessage(addr, 1),
},
types.EmptyReceipts(1),
)
}
})
fork1 := types.RequireNewTipSet(t, split.At(0))
fork2 := types.RequireNewTipSet(t, split.At(1))

// Builder adds 3 blocks to fork 1 and total storage power 2^0
// 3 + 3*delta = 3 + 3[V*1 + bits(2^0)] = 3 + 3[2 + 1] = 3 + 9 = 12
head1 := builder.AppendManyOn(3, fork1)

// Builder adds 1 block to fork 2 and total storage power 2^9
// 3 + 1*delta = 3 + 1[V*1 + bits(2^9)] = 3 + 2 + 10 = 15
head2 := builder.AppendOn(fork2, 1)

// Verify that the syncer selects fork 2 (15 > 12)
as := newForkSnapshotGen(t, types.NewBytesAmount(1), types.NewBytesAmount(512), isb.c512)
dumpBlocksToCborStore(t, builder, cst, head1, head2)
store := chain.NewStore(repo.NewInMemoryRepo().ChainDatastore(), cst, &state.TreeStateLoader{}, chain.NewStatusReporter(), gen.At(0).Cid())
require.NoError(t, store.PutTipSetAndState(ctx, &chain.TipSetAndState{gen.At(0).StateRoot, gen}))
require.NoError(t, store.SetHead(ctx, gen))
syncer := chain.NewSyncer(&integrationStateEvaluator{c512: isb.c512}, consensus.NewChainSelector(cst, as, gen.At(0).Cid(), pvt), store, builder, builder, chain.NewStatusReporter(), th.NewFakeClock(time.Unix(1234567890, 0)))

// sync fork 1
assert.NoError(t, syncer.HandleNewTipSet(ctx, types.NewChainInfo("", head1.Key(), heightFromTip(t, head1)), true))
assert.Equal(t, head1.Key(), store.GetHead())
// sync fork 2
assert.NoError(t, syncer.HandleNewTipSet(ctx, types.NewChainInfo("", head2.Key(), heightFromTip(t, head1)), true))
assert.Equal(t, head2.Key(), store.GetHead())
}

// integrationStateBuilder is a chain/testing.go `StateBuilder` used for
// construction of a chain where the state root cids signify the total power
// in the power table without actually needing to construct a valid state
// state machine tree.
//
// All blocks with at least one message are assigned a special cid: c512.
// In TestSyncerWeighsPower this state root is interpreted as having
// 512 bytes of power.
//
// integrationStateBuilder also weighs the chain according to the protocol
// version 1 upgrade.
type integrationStateBuilder struct {
t *testing.T
c512 cid.Cid
cGen cid.Cid
cst *hamt.CborIpldStore
pvt *version.ProtocolVersionTable
}

func newIntegrationStateBuilder(t *testing.T, cst *hamt.CborIpldStore, pvt *version.ProtocolVersionTable) *integrationStateBuilder {
return &integrationStateBuilder{
t: t,
c512: cid.Undef,
cst: cst,
cGen: cid.Undef,
}
}

func (isb *integrationStateBuilder) ComputeState(prev cid.Cid, blocksMessages [][]*types.SignedMessage) (cid.Cid, error) {
// setup genesis with a state we can fetch from cborstor
if prev.Equals(types.CidFromString(isb.t, "null")) {
treeGen := state.TreeFromString(isb.t, "1Power", isb.cst)
genRoot, err := treeGen.Flush(context.Background())
require.NoError(isb.t, err)
return genRoot, nil
}
// setup fork with state we associate with more power
if len(blocksMessages[0]) > 0 {
treeFork := state.TreeFromString(isb.t, "512Power", isb.cst)
forkRoot, err := treeFork.Flush(context.Background())
require.NoError(isb.t, err)
isb.c512 = forkRoot
return forkRoot, nil
}
return prev, nil
}

func (isb *integrationStateBuilder) Weigh(tip types.TipSet, pstate cid.Cid) (uint64, error) {
if tip.Equals(types.UndefTipSet) {
return uint64(0), nil
}
if isb.cGen.Equals(cid.Undef) && tip.Len() == 1 {
isb.cGen = tip.At(0).Cid()
}

if tip.At(0).Cid().Equals(isb.cGen) {
return uint64(0), nil
}
as := newForkSnapshotGen(isb.t, types.NewBytesAmount(1), types.NewBytesAmount(512), isb.c512)
sel := consensus.NewChainSelector(isb.cst, as, isb.cGen, isb.pvt)
return sel.NewWeight(context.Background(), tip, pstate)
}

// integrationStateEvaluator returns the parent state root. If there are multiple
// parent blocks and any contain state root c512 then it will return c512.
type integrationStateEvaluator struct {
c512 cid.Cid
}

func (n *integrationStateEvaluator) RunStateTransition(_ context.Context, ts types.TipSet, _ [][]*types.SignedMessage, _ [][]*types.MessageReceipt, _ []types.TipSet, _ uint64, stateID cid.Cid) (cid.Cid, error) {
for i := 0; i < ts.Len(); i++ {
if ts.At(i).StateRoot.Equals(n.c512) {
return n.c512, nil
}
}
return ts.At(0).StateRoot, nil
}

// forkSnapshotGen reads power from fake state tree root cids. It reads
// power of `forkPower` from cid `forkRoot` and `defaultPower` from all others.
type forkSnapshotGen struct {
forkPower *types.BytesAmount
defaultPower *types.BytesAmount
forkRoot cid.Cid
t *testing.T
}

func newForkSnapshotGen(t *testing.T, dp, fp *types.BytesAmount, root cid.Cid) *forkSnapshotGen {
return &forkSnapshotGen{
t: t,
defaultPower: dp,
forkPower: fp,
forkRoot: root,
}
}

func (fs *forkSnapshotGen) StateTreeSnapshot(st state.Tree, bh *types.BlockHeight) consensus.ActorStateSnapshot {
totalPower := fs.defaultPower

root, err := st.Flush(context.Background())
require.NoError(fs.t, err)
if root.Equals(fs.forkRoot) {
totalPower = fs.forkPower
}

return &consensus.FakePowerTableViewSnapshot{
MinerPower: types.NewBytesAmount(0),
TotalPower: totalPower,
MinerToWorker: make(map[address.Address]address.Address),
}
}

// dumpBlocksToCborStore is a helper method that
// TODO #3078 we can avoid this byte shuffling by creating a simple testing type
// that implements the needed interface and grabs blocks from the builder as
// needed. Once #3078 is in place we will have the flexibility to use a
// testing type as the cbor store.
func dumpBlocksToCborStore(t *testing.T, builder *chain.Builder, cst *hamt.CborIpldStore, heads ...types.TipSet) {
cids := make(map[cid.Cid]struct{})
// traverse builder frontier adding cids to the map. Traverse
// duplicates over doing anything clever.
var err error
for _, head := range heads {
it := chain.IterAncestors(context.Background(), builder, head)
for ; !it.Complete(); err = it.Next() {
require.NoError(t, err)
for i := 0; i < it.Value().Len(); i++ {
blk := head.At(i)
c := blk.Cid()
cids[c] = struct{}{}
}
}
}

// get all blocks corresponding to the cids and put to the cst
var searchKey []cid.Cid
for c := range cids {
searchKey = append(searchKey, c)
}
blocks, err := builder.GetBlocks(context.Background(), searchKey)
require.NoError(t, err)
for _, blk := range blocks {
_, err = cst.Put(context.Background(), blk)
require.NoError(t, err)
}
}
Loading

0 comments on commit 781f2ca

Please sign in to comment.