Skip to content

Commit

Permalink
Merge pull request ethereum#64 from lochjin/v1.14.11-qng
Browse files Browse the repository at this point in the history
feat:optimize p2p server for QNG
  • Loading branch information
dindinw authored Dec 12, 2024
2 parents 1042c1f + d0cf6b5 commit 8fadd4d
Showing 1 changed file with 126 additions and 12 deletions.
138 changes: 126 additions & 12 deletions p2p/server_qng.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,53 @@ package p2p

import (
"bytes"
"cmp"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"net"
"slices"
"sync"
"time"
)

type QngServer struct {
cfg *Config

ourHandshake *protoHandshake
peerFeed event.Feed
log log.Logger
peerFeed event.Feed

peers map[enode.ID]*Peer

lock *sync.RWMutex

s *Server
}

func (srv *QngServer) Start(s *Server) error {
srv.s = s
srv.cfg = &s.Config
srv.ourHandshake = s.ourHandshake
srv.log = s.log
return nil
}

func (srv *QngServer) Stop() {
srv.lock.Lock()
for _, p := range srv.peers {
p.Disconnect(DiscQuitting)
}
srv.lock.Unlock()

for srv.PeerCount() > 0 {
select {
case <-time.After(time.Second):
srv.s.log.Info("Waiting for all peers closed in QngServer")
}
}
srv.s.log.Info("QngServer stopped")
}

func (srv *QngServer) Connect(fd net.Conn, dialDest *enode.Node) (bool, error) {
flags := trustedConn
if dialDest == nil {
Expand Down Expand Up @@ -61,26 +83,26 @@ func (srv *QngServer) setupConn(c *conn, dialDest *enode.Node) (bool, error) {
dialPubkey := new(ecdsa.PublicKey)
if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
err = fmt.Errorf("%w: dial destination doesn't have a secp256k1 public key", errEncHandshakeError)
srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
srv.s.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return false, err
}
}

// Run the RLPx handshake.
remotePubkey, err := c.doEncHandshake(srv.cfg.PrivateKey)
if err != nil {
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
srv.s.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return false, fmt.Errorf("%w: %v", errEncHandshakeError, err)
}
if dialDest != nil {
c.node = dialDest
} else {
c.node = nodeFromConn(remotePubkey, c.fd)
}
clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
clog := srv.s.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)

// Run the capability negotiation handshake.
phs, err := c.doProtoHandshake(srv.ourHandshake)
phs, err := c.doProtoHandshake(srv.s.ourHandshake)
if err != nil {
clog.Trace("Failed p2p handshake", "err", err)
return false, fmt.Errorf("%w: %v", errProtoHandshakeError, err)
Expand Down Expand Up @@ -108,7 +130,7 @@ func (srv *QngServer) addPeerChecks(c *conn) error {
}

func (srv *QngServer) launchPeer(c *conn) (*Peer, bool) {
p := newPeer(srv.log, c, srv.cfg.Protocols)
p := newPeer(srv.s.log, c, srv.cfg.Protocols)
if srv.cfg.EnableMsgEvents {
// If message events are enabled, pass the peerFeed
// to the peer.
Expand All @@ -119,6 +141,9 @@ func (srv *QngServer) launchPeer(c *conn) (*Peer, bool) {
}

func (srv *QngServer) runPeer(p *Peer) bool {
start := time.Now()
srv.addPeer(p)

srv.peerFeed.Send(&PeerEvent{
Type: PeerEventTypeAdd,
Peer: p.ID(),
Expand All @@ -137,10 +162,99 @@ func (srv *QngServer) runPeer(p *Peer) bool {
LocalAddress: p.LocalAddr().String(),
})

srv.delPeer(p, time.Since(start), remoteRequested, err)
return remoteRequested
}

func (srv *QngServer) addPeer(p *Peer) {
srv.lock.Lock()
defer srv.lock.Unlock()

srv.peers[p.ID()] = p
srv.s.log.Debug("Adding QNG p2p peer", "peercount", len(srv.peers), "id", p.ID(), "conn", p.rw.flags, "addr", p.RemoteAddr(), "name", p.Name())
}

func (srv *QngServer) delPeer(p *Peer, dur time.Duration, requested bool, err error) {
srv.lock.Lock()
defer srv.lock.Unlock()

delete(srv.peers, p.ID())
srv.s.log.Debug("Removing QNG p2p peer", "peercount", len(srv.peers), "id", p.ID(), "duration", dur, "req", requested, "err", err)
}

func (srv *QngServer) PeerCount() int {
srv.lock.RLock()
defer srv.lock.RUnlock()

return len(srv.peers)
}

func (srv *QngServer) Peers() []*Peer {
srv.lock.RLock()
defer srv.lock.RUnlock()

var ps []*Peer
for _, p := range srv.peers {
ps = append(ps, p)
}
return ps
}

func (srv *QngServer) GetPeer(id enode.ID) *Peer {
srv.lock.RLock()
defer srv.lock.RUnlock()

p, ok := srv.peers[id]
if !ok {
return nil
}
return p
}

func (srv *QngServer) NodeInfo() *NodeInfo {
node := srv.s.Self()
info := &NodeInfo{
Name: srv.s.Name,
Enode: node.URLv4(),
ID: node.ID().String(),
IP: node.IPAddr().String(),
ListenAddr: srv.s.ListenAddr,
Protocols: make(map[string]interface{}),
}
info.Ports.Discovery = node.UDP()
info.Ports.Listener = node.TCP()
info.ENR = node.String()

for _, proto := range srv.s.Protocols {
if _, ok := info.Protocols[proto.Name]; !ok {
nodeInfo := interface{}("unknown")
if query := proto.NodeInfo; query != nil {
nodeInfo = proto.NodeInfo()
}
info.Protocols[proto.Name] = nodeInfo
}
}
return info
}

func (srv *QngServer) PeersInfo() []*PeerInfo {
infos := make([]*PeerInfo, 0, srv.PeerCount())
for _, peer := range srv.Peers() {
if peer != nil {
infos = append(infos, peer.Info())
}
}
slices.SortFunc(infos, func(a, b *PeerInfo) int {
return cmp.Compare(a.ID, b.ID)
})

return infos
}

func NewQngServer() *QngServer {
qs := &QngServer{}
qs := &QngServer{
peers: make(map[enode.ID]*Peer),
lock: &sync.RWMutex{},
}
return qs
}

0 comments on commit 8fadd4d

Please sign in to comment.