Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p/discover, p2p/enode: rework endpoint proof handling, packet logging #18963

Merged
merged 6 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion p2p/discover/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
// The fields of Node may not be modified.
type node struct {
enode.Node
addedAt time.Time // time when the node was added to the table
addedAt time.Time // time when the node was added to the table
livenessChecks int // how often liveness was checked
fjl marked this conversation as resolved.
Show resolved Hide resolved
}

type encPubkey [64]byte
Expand Down
67 changes: 39 additions & 28 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ type Table struct {
net transport
refreshReq chan chan struct{}
initDone chan struct{}
closeReq chan struct{}
closed chan struct{}

closeOnce sync.Once
closeReq chan struct{}
closed chan struct{}

nodeAddedHook func(*node) // for testing
}
Expand Down Expand Up @@ -180,16 +182,14 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) {

// Close terminates the network listener and flushes the node database.
func (tab *Table) Close() {
if tab.net != nil {
tab.net.close()
}

select {
case <-tab.closed:
// already closed.
case tab.closeReq <- struct{}{}:
<-tab.closed // wait for refreshLoop to end.
}
tab.closeOnce.Do(func() {
if tab.net != nil {
tab.net.close()
}
// Wait for loop to end.
close(tab.closeReq)
<-tab.closed
})
}

// setFallbackNodes sets the initial points of contact. These nodes
Expand Down Expand Up @@ -290,31 +290,39 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
// we have asked all closest nodes, stop the search
break
}
// wait for the next reply
for _, n := range <-reply {
if n != nil && !seen[n.ID()] {
seen[n.ID()] = true
result.push(n, bucketSize)
select {
case nodes := <-reply:
for _, n := range nodes {
if n != nil && !seen[n.ID()] {
seen[n.ID()] = true
result.push(n, bucketSize)
}
}
case <-tab.closeReq:
return nil // shutdown, no need to continue.
}
pendingQueries--
}
return result.entries
}

func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
fails := tab.db.FindFails(n.ID())
fails := tab.db.FindFails(n.ID(), n.IP())
r, err := tab.net.findnode(n.ID(), n.addr(), targetKey)
if err != nil || len(r) == 0 {
if err == errClosed {
// Avoid recording failures on shutdown.
reply <- nil
return
} else if err != nil || len(r) == 0 {
fails++
tab.db.UpdateFindFails(n.ID(), fails)
tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
tab.delete(n)
}
} else if fails > 0 {
tab.db.UpdateFindFails(n.ID(), fails-1)
tab.db.UpdateFindFails(n.ID(), n.IP(), fails-1)
}

// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
Expand All @@ -329,7 +337,7 @@ func (tab *Table) refresh() <-chan struct{} {
done := make(chan struct{})
select {
case tab.refreshReq <- done:
case <-tab.closed:
case <-tab.closeReq:
close(done)
}
return done
Expand Down Expand Up @@ -433,7 +441,7 @@ func (tab *Table) loadSeedNodes() {
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }}
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.add(seed)
}
Expand All @@ -458,16 +466,17 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
b := tab.buckets[bi]
if err == nil {
// The node responded, move it to the front.
log.Debug("Revalidated node", "b", bi, "id", last.ID())
last.livenessChecks++
log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks)
b.bump(last)
return
}
// No reply received, pick a replacement or delete the node if there aren't
// any replacements.
if r := tab.replace(b, last); r != nil {
log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "r", r.ID(), "rip", r.IP())
log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP())
} else {
log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP())
log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks)
}
}

Expand Down Expand Up @@ -502,7 +511,7 @@ func (tab *Table) copyLiveNodes() {
now := time.Now()
for _, b := range &tab.buckets {
for _, n := range b.entries {
if now.Sub(n.addedAt) >= seedMinTableTime {
if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime {
tab.db.UpdateNode(unwrapNode(n))
}
}
Expand All @@ -518,7 +527,9 @@ func (tab *Table) closest(target enode.ID, nresults int) *nodesByDistance {
close := &nodesByDistance{target: target}
for _, b := range &tab.buckets {
for _, n := range b.entries {
close.push(n, nresults)
if n.livenessChecks > 0 {
close.push(n, nresults)
}
}
}
return close
Expand Down
12 changes: 10 additions & 2 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
N: rand.Intn(bucketSize),
}
for _, id := range gen([]enode.ID{}, rand).([]enode.ID) {
n := enode.SignNull(new(enr.Record), id)
r := new(enr.Record)
r.Set(enr.IP(genIP(rand)))
n := enode.SignNull(r, id)
t.All = append(t.All, wrapNode(n))
}
return reflect.ValueOf(t)
Expand All @@ -289,7 +291,7 @@ func TestTable_Lookup(t *testing.T) {
}
// seed table with initial node (otherwise lookup will terminate immediately)
seedKey, _ := decodePubkey(lookupTestnet.dists[256][0])
seed := wrapNode(enode.NewV4(seedKey, net.IP{}, 0, 256))
seed := wrapNode(enode.NewV4(seedKey, net.IP{127, 0, 0, 1}, 0, 256))
tab.stuff([]*node{seed})

results := tab.lookup(lookupTestnet.target, true)
Expand Down Expand Up @@ -578,6 +580,12 @@ func gen(typ interface{}, rand *rand.Rand) interface{} {
return v.Interface()
}

func genIP(rand *rand.Rand) net.IP {
ip := make(net.IP, 4)
rand.Read(ip)
return ip
}

func quickcfg() *quick.Config {
return &quick.Config{
MaxCount: 5000,
Expand Down
Loading