Skip to content

Commit

Permalink
Merge pull request #6358 from filecoin-project/feat/limit-stream-dead…
Browse files Browse the repository at this point in the history
…lines

feat: libp2p: stream cleanup
  • Loading branch information
simlecode authored Jun 19, 2024
2 parents 70ff456 + f852d90 commit a8d2410
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/net/exchange/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {

var costI, costJ float64

getPeerInitLat := func(p peer.ID) float64 {
getPeerInitLat := func(_ peer.ID) float64 {
return float64(bpt.avgGlobalTime) * newPeerMul
}

Expand Down
1 change: 1 addition & 0 deletions pkg/net/exchange/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
ReadResMinSpeed = 50 << 10
ShufflePeersPrefix = 16
WriteResDeadline = 60 * time.Second
streamReadDeadline = 10 * time.Second
)

// `Request` processed and validated to query the tipsets needed.
Expand Down
4 changes: 4 additions & 0 deletions pkg/net/exchange/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ func (s *server) handleStream(stream inet.Stream) {
defer stream.Close() //nolint:errcheck

var req exchange.Request
_ = stream.SetReadDeadline(time.Now().Add(streamReadDeadline))
if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil {
_ = stream.SetReadDeadline(time.Time{})
exchangeServerLog.Warnf("failed to read block sync request: %s", err)
return
}
_ = stream.SetReadDeadline(time.Time{})

exchangeServerLog.Debugw("block sync request", "start", req.Head, "len", req.Length, "remote peer", stream.Conn().RemotePeer())

resp, err := s.processRequest(ctx, &req)
Expand Down
5 changes: 3 additions & 2 deletions pkg/net/helloprotocol/hello_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (h *HelloProtocolHandler) handleNewStream(s net.Stream) {
// can't process a hello received in error, but leave this connection
// open because we connections are innocent until proven guilty
// (with bad genesis)
_ = s.Conn().Close()
return
}
latencyMsg := &LatencyMessage{TArrival: time.Now().UnixNano()}
Expand Down Expand Up @@ -230,13 +231,13 @@ func (h *HelloProtocolHandler) loadLocalFullTipset(ctx context.Context, tsk type
// ErrBadGenesis is the error returned when a mismatch in genesis blocks happens.
var ErrBadGenesis = fmt.Errorf("bad genesis block")

func (h *HelloProtocolHandler) receiveHello(ctx context.Context, s net.Stream) (*HelloMessage, error) {
func (h *HelloProtocolHandler) receiveHello(_ context.Context, s net.Stream) (*HelloMessage, error) {
var hello HelloMessage
err := hello.UnmarshalCBOR(s)
return &hello, err
}

func (h *HelloProtocolHandler) receiveLatency(ctx context.Context, s net.Stream) (*LatencyMessage, error) {
func (h *HelloProtocolHandler) receiveLatency(_ context.Context, s net.Stream) (*LatencyMessage, error) {
var latency LatencyMessage
err := latency.UnmarshalCBOR(s)
if err != nil {
Expand Down

0 comments on commit a8d2410

Please sign in to comment.