diff --git a/chain/blocksync/client.go b/chain/blocksync/client.go index 38e1f6d2c2d..893759f6a0f 100644 --- a/chain/blocksync/client.go +++ b/chain/blocksync/client.go @@ -11,6 +11,7 @@ import ( inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "go.opencensus.io/trace" + "go.uber.org/fx" "golang.org/x/xerrors" cborutil "github.com/filecoin-project/go-cbor-util" @@ -36,12 +37,13 @@ type BlockSync struct { } func NewClient( + lc fx.Lifecycle, host host.Host, pmgr peermgr.MaybePeerMgr, ) *BlockSync { return &BlockSync{ host: host, - peerTracker: newPeerTracker(pmgr.Mgr), + peerTracker: newPeerTracker(lc, host, pmgr.Mgr), } } @@ -360,6 +362,7 @@ func (client *BlockSync) sendRequestToPeer( supported, err := client.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID) if err != nil { + client.RemovePeer(peer) return nil, xerrors.Errorf("failed to get protocols for peer: %w", err) } if len(supported) == 0 || supported[0] != BlockSyncProtocolID { @@ -385,7 +388,7 @@ func (client *BlockSync) sendRequestToPeer( _ = stream.SetWriteDeadline(time.Now().Add(WRITE_REQ_DEADLINE)) if err := cborutil.WriteCborRPC(stream, req); err != nil { _ = stream.SetWriteDeadline(time.Time{}) - client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) + client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length) // FIXME: Should we also remove peer here? return nil, err } @@ -398,7 +401,7 @@ func (client *BlockSync) sendRequestToPeer( bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)), &res) if err != nil { - client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) + client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length) return nil, xerrors.Errorf("failed to read blocksync response: %w", err) } @@ -412,7 +415,7 @@ func (client *BlockSync) sendRequestToPeer( ) } - client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart)) + client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart), uint64(len(res.Chain))) // FIXME: We should really log a success only after we validate the response. // It might be a bit hard to do. return &res, nil diff --git a/chain/blocksync/peer_tracker.go b/chain/blocksync/peer_tracker.go index f1f6ede07ac..bb350aa5115 100644 --- a/chain/blocksync/peer_tracker.go +++ b/chain/blocksync/peer_tracker.go @@ -3,11 +3,14 @@ package blocksync // FIXME: This needs to be reviewed. import ( + "context" "sort" "sync" "time" + host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "go.uber.org/fx" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/lib/peermgr" @@ -29,11 +32,30 @@ type bsPeerTracker struct { pmgr *peermgr.PeerMgr } -func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker { - return &bsPeerTracker{ +func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeerTracker { + bsPt := &bsPeerTracker{ peers: make(map[peer.ID]*peerStats), pmgr: pmgr, } + + sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer)) + if err != nil { + panic(err) + } + + go func() { + for newPeer := range sub.Out() { + bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id) + } + }() + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return sub.Close() + }, + }) + + return bsPt } func (bpt *bsPeerTracker) addPeer(p peer.ID) { @@ -72,16 +94,7 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID { var costI, costJ float64 getPeerInitLat := func(p peer.ID) float64 { - var res float64 - if bpt.pmgr != nil { - if lat, ok := bpt.pmgr.GetPeerLatency(p); ok { - res = float64(lat) - } - } - if res == 0 { - res = float64(bpt.avgGlobalTime) - } - return res * newPeerMul + return float64(bpt.avgGlobalTime) * newPeerMul } if pi.successes+pi.failures > 0 { @@ -107,8 +120,8 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID { const ( // xInvAlpha = (N+1)/2 - localInvAlpha = 5 // 86% of the value is the last 9 - globalInvAlpha = 20 // 86% of the value is the last 39 + localInvAlpha = 10 // 86% of the value is the last 19 + globalInvAlpha = 25 // 86% of the value is the last 49 ) func (bpt *bsPeerTracker) logGlobalSuccess(dur time.Duration) { @@ -133,7 +146,7 @@ func logTime(pi *peerStats, dur time.Duration) { } -func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) { +func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration, reqSize uint64) { bpt.lk.Lock() defer bpt.lk.Unlock() @@ -145,10 +158,13 @@ func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) { } pi.successes++ - logTime(pi, dur) + if reqSize == 0 { + reqSize = 1 + } + logTime(pi, dur/time.Duration(reqSize)) } -func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) { +func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration, reqSize uint64) { bpt.lk.Lock() defer bpt.lk.Unlock() @@ -160,7 +176,10 @@ func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) { } pi.failures++ - logTime(pi, dur) + if reqSize == 0 { + reqSize = 1 + } + logTime(pi, dur/time.Duration(reqSize)) } func (bpt *bsPeerTracker) removePeer(p peer.ID) { diff --git a/lib/peermgr/peermgr.go b/lib/peermgr/peermgr.go index 80b05e8ce0a..2f9d3467499 100644 --- a/lib/peermgr/peermgr.go +++ b/lib/peermgr/peermgr.go @@ -10,7 +10,10 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "go.opencensus.io/stats" "go.uber.org/fx" + "go.uber.org/multierr" + "golang.org/x/xerrors" + "github.com/libp2p/go-libp2p-core/event" host "github.com/libp2p/go-libp2p-core/host" net "github.com/libp2p/go-libp2p-core/network" peer "github.com/libp2p/go-libp2p-core/peer" @@ -50,12 +53,17 @@ type PeerMgr struct { h host.Host dht *dht.IpfsDHT - notifee *net.NotifyBundle + notifee *net.NotifyBundle + filPeerEmitter event.Emitter done chan struct{} } -func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) *PeerMgr { +type NewFilPeer struct { + Id peer.ID +} + +func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) { pm := &PeerMgr{ h: h, dht: dht, @@ -69,10 +77,18 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes done: make(chan struct{}), } + emitter, err := h.EventBus().Emitter(new(NewFilPeer)) + if err != nil { + return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err) + } + pm.filPeerEmitter = emitter lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { - return pm.Stop(ctx) + return multierr.Combine( + pm.filPeerEmitter.Close(), + pm.Stop(ctx), + ) }, }) @@ -84,10 +100,11 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes h.Network().Notify(pm.notifee) - return pm + return pm, nil } func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { + _ = pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() pmgr.peers[p] = time.Duration(0)