Skip to content

Commit

Permalink
p2p/discover, p2p/enode: rework endpoint proof handling, packet loggi…
Browse files Browse the repository at this point in the history
…ng (#18963)

This change resolves multiple issues around handling of endpoint proofs.
The proof is now done separately for each IP and completing the proof
requires a matching ping hash.

Also remove waitping because it's equivalent to sleep. waitping was
slightly more efficient, but that may cause issues with findnode if
packets are reordered and the remote end sees findnode before pong.

Logging of received packets was hitherto done after handling the packet,
which meant that sent replies were logged before the packet that
generated them. This change splits up packet handling into 'preverify'
and 'handle'. The error from 'preverify' is logged, but 'handle' happens
after the message is logged. This fixes the order. Packet logs now
contain the node ID.
  • Loading branch information
fjl committed Jan 29, 2019
1 parent 1f3dfed commit 4cd90e0
Show file tree
Hide file tree
Showing 8 changed files with 595 additions and 332 deletions.
3 changes: 2 additions & 1 deletion p2p/discover/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
// The fields of Node may not be modified.
type node struct {
enode.Node
addedAt time.Time // time when the node was added to the table
addedAt time.Time // time when the node was added to the table
livenessChecks uint // how often liveness was checked
}

type encPubkey [64]byte
Expand Down
84 changes: 39 additions & 45 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ type Table struct {
net transport
refreshReq chan chan struct{}
initDone chan struct{}
closeReq chan struct{}
closed chan struct{}

closeOnce sync.Once
closeReq chan struct{}
closed chan struct{}

nodeAddedHook func(*node) // for testing
}
Expand Down Expand Up @@ -180,16 +182,14 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) {

// Close terminates the network listener and flushes the node database.
func (tab *Table) Close() {
if tab.net != nil {
tab.net.close()
}

select {
case <-tab.closed:
// already closed.
case tab.closeReq <- struct{}{}:
<-tab.closed // wait for refreshLoop to end.
}
tab.closeOnce.Do(func() {
if tab.net != nil {
tab.net.close()
}
// Wait for loop to end.
close(tab.closeReq)
<-tab.closed
})
}

// setFallbackNodes sets the initial points of contact. These nodes
Expand Down Expand Up @@ -290,31 +290,39 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
// we have asked all closest nodes, stop the search
break
}
// wait for the next reply
for _, n := range <-reply {
if n != nil && !seen[n.ID()] {
seen[n.ID()] = true
result.push(n, bucketSize)
select {
case nodes := <-reply:
for _, n := range nodes {
if n != nil && !seen[n.ID()] {
seen[n.ID()] = true
result.push(n, bucketSize)
}
}
case <-tab.closeReq:
return nil // shutdown, no need to continue.
}
pendingQueries--
}
return result.entries
}

func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
fails := tab.db.FindFails(n.ID())
fails := tab.db.FindFails(n.ID(), n.IP())
r, err := tab.net.findnode(n.ID(), n.addr(), targetKey)
if err != nil || len(r) == 0 {
if err == errClosed {
// Avoid recording failures on shutdown.
reply <- nil
return
} else if err != nil || len(r) == 0 {
fails++
tab.db.UpdateFindFails(n.ID(), fails)
tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
tab.delete(n)
}
} else if fails > 0 {
tab.db.UpdateFindFails(n.ID(), fails-1)
tab.db.UpdateFindFails(n.ID(), n.IP(), fails-1)
}

// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
Expand All @@ -329,7 +337,7 @@ func (tab *Table) refresh() <-chan struct{} {
done := make(chan struct{})
select {
case tab.refreshReq <- done:
case <-tab.closed:
case <-tab.closeReq:
close(done)
}
return done
Expand Down Expand Up @@ -433,7 +441,7 @@ func (tab *Table) loadSeedNodes() {
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }}
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.add(seed)
}
Expand All @@ -458,16 +466,17 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
b := tab.buckets[bi]
if err == nil {
// The node responded, move it to the front.
log.Debug("Revalidated node", "b", bi, "id", last.ID())
last.livenessChecks++
log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks)
b.bump(last)
return
}
// No reply received, pick a replacement or delete the node if there aren't
// any replacements.
if r := tab.replace(b, last); r != nil {
log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "r", r.ID(), "rip", r.IP())
log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP())
} else {
log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP())
log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks)
}
}

Expand Down Expand Up @@ -502,7 +511,7 @@ func (tab *Table) copyLiveNodes() {
now := time.Now()
for _, b := range &tab.buckets {
for _, n := range b.entries {
if now.Sub(n.addedAt) >= seedMinTableTime {
if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime {
tab.db.UpdateNode(unwrapNode(n))
}
}
Expand All @@ -518,7 +527,9 @@ func (tab *Table) closest(target enode.ID, nresults int) *nodesByDistance {
close := &nodesByDistance{target: target}
for _, b := range &tab.buckets {
for _, n := range b.entries {
close.push(n, nresults)
if n.livenessChecks > 0 {
close.push(n, nresults)
}
}
}
return close
Expand Down Expand Up @@ -572,23 +583,6 @@ func (tab *Table) addThroughPing(n *node) {
tab.add(n)
}

// stuff adds nodes the table to the end of their corresponding bucket
// if the bucket is not full. The caller must not hold tab.mutex.
func (tab *Table) stuff(nodes []*node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()

for _, n := range nodes {
if n.ID() == tab.self().ID() {
continue // don't add self
}
b := tab.bucket(n.ID())
if len(b.entries) < bucketSize {
tab.bumpOrAdd(b, n)
}
}
}

// delete removes an entry from the node table. It is used to evacuate dead nodes.
func (tab *Table) delete(node *node) {
tab.mutex.Lock()
Expand Down
34 changes: 22 additions & 12 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func TestTable_pingReplace(t *testing.T) {
func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding bool) {
transport := newPingRecorder()
tab, db := newTestTable(transport)
defer tab.Close()
defer db.Close()
defer tab.Close()

<-tab.initDone

Expand Down Expand Up @@ -137,8 +137,8 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
func TestTable_IPLimit(t *testing.T) {
transport := newPingRecorder()
tab, db := newTestTable(transport)
defer tab.Close()
defer db.Close()
defer tab.Close()

for i := 0; i < tableIPLimit+1; i++ {
n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)})
Expand All @@ -153,8 +153,8 @@ func TestTable_IPLimit(t *testing.T) {
func TestTable_BucketIPLimit(t *testing.T) {
transport := newPingRecorder()
tab, db := newTestTable(transport)
defer tab.Close()
defer db.Close()
defer tab.Close()

d := 3
for i := 0; i < bucketIPLimit+1; i++ {
Expand All @@ -173,9 +173,9 @@ func TestTable_closest(t *testing.T) {
// for any node table, Target and N
transport := newPingRecorder()
tab, db := newTestTable(transport)
defer tab.Close()
defer db.Close()
tab.stuff(test.All)
defer tab.Close()
fillTable(tab, test.All)

// check that closest(Target, N) returns nodes
result := tab.closest(test.Target, test.N).entries
Expand Down Expand Up @@ -234,13 +234,13 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) {
test := func(buf []*enode.Node) bool {
transport := newPingRecorder()
tab, db := newTestTable(transport)
defer tab.Close()
defer db.Close()
defer tab.Close()
<-tab.initDone

for i := 0; i < len(buf); i++ {
ld := cfg.Rand.Intn(len(tab.buckets))
tab.stuff([]*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))})
fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))})
}
gotN := tab.ReadRandomNodes(buf)
if gotN != tab.len() {
Expand Down Expand Up @@ -272,25 +272,29 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
N: rand.Intn(bucketSize),
}
for _, id := range gen([]enode.ID{}, rand).([]enode.ID) {
n := enode.SignNull(new(enr.Record), id)
t.All = append(t.All, wrapNode(n))
r := new(enr.Record)
r.Set(enr.IP(genIP(rand)))
n := wrapNode(enode.SignNull(r, id))
n.livenessChecks = 1
t.All = append(t.All, n)
}
return reflect.ValueOf(t)
}

func TestTable_Lookup(t *testing.T) {
tab, db := newTestTable(lookupTestnet)
defer tab.Close()
defer db.Close()
defer tab.Close()

// lookup on empty table returns no nodes
if results := tab.lookup(lookupTestnet.target, false); len(results) > 0 {
t.Fatalf("lookup on empty table returned %d results: %#v", len(results), results)
}
// seed table with initial node (otherwise lookup will terminate immediately)
seedKey, _ := decodePubkey(lookupTestnet.dists[256][0])
seed := wrapNode(enode.NewV4(seedKey, net.IP{}, 0, 256))
tab.stuff([]*node{seed})
seed := wrapNode(enode.NewV4(seedKey, net.IP{127, 0, 0, 1}, 0, 256))
seed.livenessChecks = 1
fillTable(tab, []*node{seed})

results := tab.lookup(lookupTestnet.target, true)
t.Logf("results:")
Expand Down Expand Up @@ -578,6 +582,12 @@ func gen(typ interface{}, rand *rand.Rand) interface{} {
return v.Interface()
}

func genIP(rand *rand.Rand) net.IP {
ip := make(net.IP, 4)
rand.Read(ip)
return ip
}

func quickcfg() *quick.Config {
return &quick.Config{
MaxCount: 5000,
Expand Down
21 changes: 17 additions & 4 deletions p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,23 @@ func fillBucket(tab *Table, n *node) (last *node) {
return b.entries[bucketSize-1]
}

// fillTable adds nodes the table to the end of their corresponding bucket
// if the bucket is not full. The caller must not hold tab.mutex.
func fillTable(tab *Table, nodes []*node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()

for _, n := range nodes {
if n.ID() == tab.self().ID() {
continue // don't add self
}
b := tab.bucket(n.ID())
if len(b.entries) < bucketSize {
tab.bumpOrAdd(b, n)
}
}
}

type pingRecorder struct {
mu sync.Mutex
dead, pinged map[enode.ID]bool
Expand All @@ -109,10 +126,6 @@ func (t *pingRecorder) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPu
return nil, nil
}

func (t *pingRecorder) waitping(from enode.ID) error {
return nil // remote always pings
}

func (t *pingRecorder) ping(toid enode.ID, toaddr *net.UDPAddr) error {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
Loading

0 comments on commit 4cd90e0

Please sign in to comment.