From 7fc97b8e74a46fd28ba2e3a996c533633f612db4 Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 4 Dec 2018 08:12:07 +0100 Subject: [PATCH 1/2] swarm/network, swarm/pss: remove isproxbin bool from kad.Each* iterfunc --- swarm/network/discovery.go | 6 ++--- swarm/network/hive.go | 4 +-- swarm/network/hive_test.go | 2 +- swarm/network/kademlia.go | 30 +++++++--------------- swarm/network/kademlia_test.go | 2 +- swarm/network/networkid_test.go | 2 +- swarm/network/stream/delivery.go | 3 +-- swarm/network/stream/messages.go | 2 +- swarm/network/stream/snapshot_sync_test.go | 2 ++ swarm/pss/pss.go | 2 +- swarm/pss/pss_test.go | 6 ++--- 11 files changed, 25 insertions(+), 36 deletions(-) diff --git a/swarm/network/discovery.go b/swarm/network/discovery.go index c6f5224301e6..4c503047a5b4 100644 --- a/swarm/network/discovery.go +++ b/swarm/network/discovery.go @@ -65,7 +65,7 @@ func (d *Peer) HandleMsg(ctx context.Context, msg interface{}) error { // NotifyDepth sends a message to all connections if depth of saturation is changed func NotifyDepth(depth uint8, kad *Kademlia) { - f := func(val *Peer, po int, _ bool) bool { + f := func(val *Peer, po int) bool { val.NotifyDepth(depth) return true } @@ -74,7 +74,7 @@ func NotifyDepth(depth uint8, kad *Kademlia) { // NotifyPeer informs all peers about a newly added node func NotifyPeer(p *BzzAddr, k *Kademlia) { - f := func(val *Peer, po int, _ bool) bool { + f := func(val *Peer, po int) bool { val.NotifyPeer(p, uint8(po)) return true } @@ -160,7 +160,7 @@ func (d *Peer) handleSubPeersMsg(msg *subPeersMsg) error { if !d.sentPeers { d.setDepth(msg.Depth) var peers []*BzzAddr - d.kad.EachConn(d.Over(), 255, func(p *Peer, po int, isproxbin bool) bool { + d.kad.EachConn(d.Over(), 255, func(p *Peer, po int) bool { if pob, _ := Pof(d, d.kad.BaseAddr(), 0); pob > po { return false } diff --git a/swarm/network/hive.go b/swarm/network/hive.go index ebef5459297a..a0b6b988abe2 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -114,7 +114,7 @@ func (h *Hive) Stop() error { } } log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4])) - h.EachConn(nil, 255, func(p *Peer, _ int, _ bool) bool { + h.EachConn(nil, 255, func(p *Peer, _ int) bool { log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4])) p.Drop(nil) return true @@ -228,7 +228,7 @@ func (h *Hive) loadPeers() error { // savePeers, savePeer implement persistence callback/ func (h *Hive) savePeers() error { var peers []*BzzAddr - h.Kademlia.EachAddr(nil, 256, func(pa *BzzAddr, i int, _ bool) bool { + h.Kademlia.EachAddr(nil, 256, func(pa *BzzAddr, i int) bool { if pa == nil { log.Warn(fmt.Sprintf("empty addr: %v", i)) return true diff --git a/swarm/network/hive_test.go b/swarm/network/hive_test.go index 56adc5a8e9d2..a29e7308332f 100644 --- a/swarm/network/hive_test.go +++ b/swarm/network/hive_test.go @@ -103,7 +103,7 @@ func TestHiveStatePersistance(t *testing.T) { pp.Start(s1.Server) i := 0 - pp.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int, nn bool) bool { + pp.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int) bool { delete(peers, addr.String()) i++ return true diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index c5c2d79e3b96..c35d55e834b3 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -390,46 +390,42 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con // EachConn is an iterator with args (base, po, f) applies f to each live peer // that has proximity order po or less as measured from the base // if base is nil, kademlia base address is used -// It returns peers in order deepest to shallowest -func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int, bool) bool) { +func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int) bool) { k.lock.RLock() defer k.lock.RUnlock() k.eachConn(base, o, f) } -func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int, bool) bool) { +func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int) bool) { if len(base) == 0 { base = k.base } - depth := depthForPot(k.conns, k.MinProxBinSize, k.base) k.conns.EachNeighbour(base, Pof, func(val pot.Val, po int) bool { if po > o { return true } - return f(val.(*Peer), po, po >= depth) + return f(val.(*Peer), po) }) } // EachAddr called with (base, po, f) is an iterator applying f to each known peer // that has proximity order o or less as measured from the base // if base is nil, kademlia base address is used -// It returns peers in order deepest to shallowest -func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) { +func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int) bool) { k.lock.RLock() defer k.lock.RUnlock() k.eachAddr(base, o, f) } -func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) { +func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) { if len(base) == 0 { base = k.base } - depth := depthForPot(k.conns, k.MinProxBinSize, k.base) k.addrs.EachNeighbour(base, Pof, func(val pot.Val, po int) bool { if po > o { return true } - return f(val.(*entry).BzzAddr, po, po >= depth) + return f(val.(*entry).BzzAddr, po) }) } @@ -687,12 +683,8 @@ func (k *Kademlia) saturation() int { // TODO move to separate testing tools file func (k *Kademlia) knowNeighbours(addrs [][]byte) (got bool, n int, missing [][]byte) { pm := make(map[string]bool) - - // create a map with all peers at depth and deeper known in the kademlia - // in order deepest to shallowest compared to the kademlia base address - // all bins (except self) are included (0 <= bin <= 255) - depth := depthForPot(k.addrs, k.MinProxBinSize, k.base) - k.eachAddr(nil, 255, func(p *BzzAddr, po int, nn bool) bool { + depth := depthForPot(k.conns, k.MinProxBinSize, k.base) + k.eachAddr(nil, 255, func(p *BzzAddr, po int) bool { if po < depth { return false } @@ -724,12 +716,8 @@ func (k *Kademlia) knowNeighbours(addrs [][]byte) (got bool, n int, missing [][] // It is used in Healthy function for testing only func (k *Kademlia) connectedNeighbours(peers [][]byte) (got bool, n int, missing [][]byte) { pm := make(map[string]bool) - - // create a map with all peers at depth and deeper that are connected in the kademlia - // in order deepest to shallowest compared to the kademlia base address - // all bins (except self) are included (0 <= bin <= 255) depth := depthForPot(k.conns, k.MinProxBinSize, k.base) - k.eachConn(nil, 255, func(p *Peer, po int, nn bool) bool { + k.eachConn(nil, 255, func(p *Peer, po int) bool { if po < depth { return false } diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index 773f201ac0cb..8d320c172847 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -232,7 +232,7 @@ func assertHealth(t *testing.T, k *Kademlia, expectHealthy bool, expectSaturatio t.Helper() kid := common.Bytes2Hex(k.BaseAddr()) addrs := [][]byte{k.BaseAddr()} - k.EachAddr(nil, 255, func(addr *BzzAddr, po int, _ bool) bool { + k.EachAddr(nil, 255, func(addr *BzzAddr, po int) bool { addrs = append(addrs, addr.Address()) return true }) diff --git a/swarm/network/networkid_test.go b/swarm/network/networkid_test.go index 191d67e5b6fc..3cd683e1c9de 100644 --- a/swarm/network/networkid_test.go +++ b/swarm/network/networkid_test.go @@ -92,7 +92,7 @@ func TestNetworkID(t *testing.T) { if kademlias[node].addrs.Size() != len(netIDGroup)-1 { t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1) } - kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int, _ bool) bool { + kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int) bool { found := false for _, nd := range netIDGroup { if bytes.Equal(kademlias[nd].BaseAddr(), addr.Address()) { diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index c73298d9aa24..e1a13fe8d517 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -19,7 +19,6 @@ package stream import ( "context" "errors" - "fmt" "github.com/ethereum/go-ethereum/metrics" @@ -245,7 +244,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( return nil, nil, fmt.Errorf("source peer %v not found", spID.String()) } } else { - d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool { + d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool { id := p.ID() if p.LightNode { // skip light nodes diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index eb1b2983e123..b293724cc713 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -336,7 +336,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) // launch in go routine since GetBatch blocks until new hashes arrive go func() { if err := p.SendOfferedHashes(s, req.From, req.To); err != nil { - log.Warn("SendOfferedHashes error", "err", err) + log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) } }() // go p.SendOfferedHashes(s, req.From, req.To) diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 7a883644b9be..8ee7fd2edf5e 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -76,6 +76,7 @@ func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, //they are expected to store based on the syncing protocol. //Number of chunks and nodes can be provided via commandline too. func TestSyncingViaGlobalSync(t *testing.T) { + t.Skip("not working") if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { t.Skip("Flaky on mac on travis") } @@ -107,6 +108,7 @@ func TestSyncingViaGlobalSync(t *testing.T) { } func TestSyncingViaDirectSubscribe(t *testing.T) { + t.Skip("not working") if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { t.Skip("Flaky on mac on travis") } diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 631d27f09594..bee64b0df544 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -964,7 +964,7 @@ func (p *Pss) forward(msg *PssMsg) error { onlySendOnce = true } - p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { + p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int) bool { if po < broadcastThreshold && sent > 0 { return false // stop iterating } diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index ec46504c236b..b0753ad17840 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -491,12 +491,12 @@ func TestAddressMatchProx(t *testing.T) { // meanwhile test regression for kademlia since we are compiling the test parameters from different packages var proxes int var conns int - kad.EachConn(nil, peerCount, func(p *network.Peer, po int, prox bool) bool { + depth := kad.NeighbourhoodDepth() + kad.EachConn(nil, peerCount, func(p *network.Peer, po int) bool { conns++ - if prox { + if po >= depth { proxes++ } - log.Trace("kadconn", "po", po, "peer", p, "prox", prox) return true }) if proxes != nnPeerCount { From 35cebf227c62626d7856648074107c8884933b73 Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 9 Jan 2019 10:27:16 +0100 Subject: [PATCH 2/2] swarm/network: restore comment and unskip snapshot sync tests --- swarm/network/kademlia.go | 3 +++ swarm/network/stream/snapshot_sync_test.go | 2 -- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index c35d55e834b3..3214e151d114 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -684,7 +684,10 @@ func (k *Kademlia) saturation() int { func (k *Kademlia) knowNeighbours(addrs [][]byte) (got bool, n int, missing [][]byte) { pm := make(map[string]bool) depth := depthForPot(k.conns, k.MinProxBinSize, k.base) + // create a map with all peers at depth and deeper known in the kademlia k.eachAddr(nil, 255, func(p *BzzAddr, po int) bool { + // in order deepest to shallowest compared to the kademlia base address + // all bins (except self) are included (0 <= bin <= 255) if po < depth { return false } diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 8ee7fd2edf5e..7a883644b9be 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -76,7 +76,6 @@ func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, //they are expected to store based on the syncing protocol. //Number of chunks and nodes can be provided via commandline too. func TestSyncingViaGlobalSync(t *testing.T) { - t.Skip("not working") if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { t.Skip("Flaky on mac on travis") } @@ -108,7 +107,6 @@ func TestSyncingViaGlobalSync(t *testing.T) { } func TestSyncingViaDirectSubscribe(t *testing.T) { - t.Skip("not working") if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { t.Skip("Flaky on mac on travis") }