Skip to content

Commit

Permalink
cmd/devp2p, p2p: dial using node iterator, discovery crawler (#20132)
Browse files Browse the repository at this point in the history
* p2p/enode: add Iterator and associated utilities

* p2p/discover: add RandomNodes iterator

* p2p: dial using iterator

* cmd/devp2p: add discv4 crawler

* cmd/devp2p: WIP nodeset filter

* cmd/devp2p: fixup lesFilter

* core/forkid: add NewStaticFilter

* cmd/devp2p: make -eth-network filter actually work

* cmd/devp2p: improve crawl timestamp handling

* cmd/devp2p: fix typo

* p2p/enode: fix comment typos

* p2p/discover: fix comment typos

* p2p/discover: rename lookup.next to 'advance'

* p2p: lower discovery mixer timeout

* p2p/enode: implement dynamic FairMix timeouts

* cmd/devp2p: add ropsten support in -eth-network filter

* cmd/devp2p: tweak crawler log message
  • Loading branch information
fjl authored and karalabe committed Oct 29, 2019
1 parent b0b2775 commit 2c37142
Show file tree
Hide file tree
Showing 19 changed files with 1,558 additions and 413 deletions.
152 changes: 152 additions & 0 deletions cmd/devp2p/crawl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.

package main

import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
)

type crawler struct {
input nodeSet
output nodeSet
disc *discover.UDPv4
iters []enode.Iterator
inputIter enode.Iterator
ch chan *enode.Node
closed chan struct{}

// settings
revalidateInterval time.Duration
}

func newCrawler(input nodeSet, disc *discover.UDPv4, iters ...enode.Iterator) *crawler {
c := &crawler{
input: input,
output: make(nodeSet, len(input)),
disc: disc,
iters: iters,
inputIter: enode.IterNodes(input.nodes()),
ch: make(chan *enode.Node),
closed: make(chan struct{}),
}
c.iters = append(c.iters, c.inputIter)
// Copy input to output initially. Any nodes that fail validation
// will be dropped from output during the run.
for id, n := range input {
c.output[id] = n
}
return c
}

func (c *crawler) run(timeout time.Duration) nodeSet {
var (
timeoutTimer = time.NewTimer(timeout)
timeoutCh <-chan time.Time
doneCh = make(chan enode.Iterator, len(c.iters))
liveIters = len(c.iters)
)
for _, it := range c.iters {
go c.runIterator(doneCh, it)
}

loop:
for {
select {
case n := <-c.ch:
c.updateNode(n)
case it := <-doneCh:
if it == c.inputIter {
// Enable timeout when we're done revalidating the input nodes.
log.Info("Revalidation of input set is done", "len", len(c.input))
if timeout > 0 {
timeoutCh = timeoutTimer.C
}
}
if liveIters--; liveIters == 0 {
break loop
}
case <-timeoutCh:
break loop
}
}

close(c.closed)
for _, it := range c.iters {
it.Close()
}
for ; liveIters > 0; liveIters-- {
<-doneCh
}
return c.output
}

func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) {
defer func() { done <- it }()
for it.Next() {
select {
case c.ch <- it.Node():
case <-c.closed:
return
}
}
}

func (c *crawler) updateNode(n *enode.Node) {
node, ok := c.output[n.ID()]

// Skip validation of recently-seen nodes.
if ok && time.Since(node.LastCheck) < c.revalidateInterval {
return
}

// Request the node record.
nn, err := c.disc.RequestENR(n)
node.LastCheck = truncNow()
if err != nil {
if node.Score == 0 {
// Node doesn't implement EIP-868.
log.Debug("Skipping node", "id", n.ID())
return
}
node.Score /= 2
} else {
node.N = nn
node.Seq = nn.Seq()
node.Score++
if node.FirstResponse.IsZero() {
node.FirstResponse = node.LastCheck
}
node.LastResponse = node.LastCheck
}

// Store/update node in output set.
if node.Score <= 0 {
log.Info("Removing node", "id", n.ID())
delete(c.output, n.ID())
} else {
log.Info("Updating node", "id", n.ID(), "seq", n.Seq(), "score", node.Score)
c.output[n.ID()] = node
}
}

func truncNow() time.Time {
return time.Now().UTC().Truncate(1 * time.Second)
}
70 changes: 51 additions & 19 deletions cmd/devp2p/discv4cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
discv4RequestRecordCommand,
discv4ResolveCommand,
discv4ResolveJSONCommand,
discv4CrawlCommand,
},
}
discv4PingCommand = cli.Command{
Expand Down Expand Up @@ -67,12 +68,25 @@ var (
Flags: []cli.Flag{bootnodesFlag},
ArgsUsage: "<nodes.json file>",
}
discv4CrawlCommand = cli.Command{
Name: "crawl",
Usage: "Updates a nodes.json file with random nodes found in the DHT",
Action: discv4Crawl,
Flags: []cli.Flag{bootnodesFlag, crawlTimeoutFlag},
}
)

var bootnodesFlag = cli.StringFlag{
Name: "bootnodes",
Usage: "Comma separated nodes used for bootstrapping",
}
var (
bootnodesFlag = cli.StringFlag{
Name: "bootnodes",
Usage: "Comma separated nodes used for bootstrapping",
}
crawlTimeoutFlag = cli.DurationFlag{
Name: "timeout",
Usage: "Time limit for the crawl.",
Value: 30 * time.Minute,
}
)

func discv4Ping(ctx *cli.Context) error {
n := getNodeArg(ctx)
Expand Down Expand Up @@ -113,30 +127,48 @@ func discv4ResolveJSON(ctx *cli.Context) error {
if ctx.NArg() < 1 {
return fmt.Errorf("need nodes file as argument")
}
disc := startV4(ctx)
defer disc.Close()
file := ctx.Args().Get(0)

// Load existing nodes in file.
var nodes []*enode.Node
if common.FileExist(file) {
nodes = loadNodesJSON(file).nodes()
nodesFile := ctx.Args().Get(0)
inputSet := make(nodeSet)
if common.FileExist(nodesFile) {
inputSet = loadNodesJSON(nodesFile)
}
// Add nodes from command line arguments.

// Add extra nodes from command line arguments.
var nodeargs []*enode.Node
for i := 1; i < ctx.NArg(); i++ {
n, err := parseNode(ctx.Args().Get(i))
if err != nil {
exit(err)
}
nodes = append(nodes, n)
nodeargs = append(nodeargs, n)
}

result := make(nodeSet, len(nodes))
for _, n := range nodes {
n = disc.Resolve(n)
result[n.ID()] = nodeJSON{Seq: n.Seq(), N: n}
// Run the crawler.
disc := startV4(ctx)
defer disc.Close()
c := newCrawler(inputSet, disc, enode.IterNodes(nodeargs))
c.revalidateInterval = 0
output := c.run(0)
writeNodesJSON(nodesFile, output)
return nil
}

func discv4Crawl(ctx *cli.Context) error {
if ctx.NArg() < 1 {
return fmt.Errorf("need nodes file as argument")
}
nodesFile := ctx.Args().First()
var inputSet nodeSet
if common.FileExist(nodesFile) {
inputSet = loadNodesJSON(nodesFile)
}
writeNodesJSON(file, result)

disc := startV4(ctx)
defer disc.Close()
c := newCrawler(inputSet, disc, disc.RandomNodes())
c.revalidateInterval = 10 * time.Minute
output := c.run(ctx.Duration(crawlTimeoutFlag.Name))
writeNodesJSON(nodesFile, output)
return nil
}

Expand Down
23 changes: 13 additions & 10 deletions cmd/devp2p/dnscmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func dnsSync(ctx *cli.Context) error {
}
def := treeToDefinition(url, t)
def.Meta.LastModified = time.Now()
writeTreeDefinition(outdir, def)
writeTreeMetadata(outdir, def)
writeTreeNodes(outdir, def)
return nil
}

Expand Down Expand Up @@ -151,7 +152,7 @@ func dnsSign(ctx *cli.Context) error {

def = treeToDefinition(url, t)
def.Meta.LastModified = time.Now()
writeTreeDefinition(defdir, def)
writeTreeMetadata(defdir, def)
return nil
}

Expand Down Expand Up @@ -315,26 +316,28 @@ func ensureValidTreeSignature(t *dnsdisc.Tree, pubkey *ecdsa.PublicKey, sig stri
return nil
}

// writeTreeDefinition writes a DNS node tree definition to the given directory.
func writeTreeDefinition(directory string, def *dnsDefinition) {
// writeTreeMetadata writes a DNS node tree metadata file to the given directory.
func writeTreeMetadata(directory string, def *dnsDefinition) {
metaJSON, err := json.MarshalIndent(&def.Meta, "", jsonIndent)
if err != nil {
exit(err)
}
// Convert nodes.
nodes := make(nodeSet, len(def.Nodes))
nodes.add(def.Nodes...)
// Write.
if err := os.Mkdir(directory, 0744); err != nil && !os.IsExist(err) {
exit(err)
}
metaFile, nodesFile := treeDefinitionFiles(directory)
writeNodesJSON(nodesFile, nodes)
metaFile, _ := treeDefinitionFiles(directory)
if err := ioutil.WriteFile(metaFile, metaJSON, 0644); err != nil {
exit(err)
}
}

func writeTreeNodes(directory string, def *dnsDefinition) {
ns := make(nodeSet, len(def.Nodes))
ns.add(def.Nodes...)
_, nodesFile := treeDefinitionFiles(directory)
writeNodesJSON(nodesFile, ns)
}

func treeDefinitionFiles(directory string) (string, string) {
meta := filepath.Join(directory, "enrtree-info.json")
nodes := filepath.Join(directory, "nodes.json")
Expand Down
1 change: 1 addition & 0 deletions cmd/devp2p/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func init() {
enrdumpCommand,
discv4Command,
dnsCommand,
nodesetCommand,
}
}

Expand Down
15 changes: 15 additions & 0 deletions cmd/devp2p/nodeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sort"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
Expand All @@ -36,6 +38,15 @@ type nodeSet map[enode.ID]nodeJSON
type nodeJSON struct {
Seq uint64 `json:"seq"`
N *enode.Node `json:"record"`

// The score tracks how many liveness checks were performed. It is incremented by one
// every time the node passes a check, and halved every time it doesn't.
Score int `json:"score,omitempty"`
// These two track the time of last successful contact.
FirstResponse time.Time `json:"firstResponse,omitempty"`
LastResponse time.Time `json:"lastResponse,omitempty"`
// This one tracks the time of our last attempt to contact the node.
LastCheck time.Time `json:"lastCheck,omitempty"`
}

func loadNodesJSON(file string) nodeSet {
Expand All @@ -51,6 +62,10 @@ func writeNodesJSON(file string, nodes nodeSet) {
if err != nil {
exit(err)
}
if file == "-" {
os.Stdout.Write(nodesJSON)
return
}
if err := ioutil.WriteFile(file, nodesJSON, 0644); err != nil {
exit(err)
}
Expand Down
Loading

0 comments on commit 2c37142

Please sign in to comment.