Skip to content

Commit

Permalink
protocols/kad/src/behaviour/test: Enable building single nodes
Browse files Browse the repository at this point in the history
Introduces the `build_node` function to build a single not connected
node. Along the way replace the notion of a `port_base` with returning
the actual `Multiaddr` of the node.
  • Loading branch information
mxinden committed Apr 14, 2020
1 parent 4bcb4b2 commit b8f85fc
Showing 1 changed file with 128 additions and 70 deletions.
198 changes: 128 additions & 70 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use libp2p_core::{
Transport,
identity,
transport::MemoryTransport,
multiaddr::{Protocol, multiaddr},
multiaddr::{Protocol, Multiaddr, multiaddr},
muxing::StreamMuxerBox,
upgrade
};
Expand All @@ -49,61 +49,83 @@ use multihash::{wrap, Code, Multihash};

type TestSwarm = Swarm<Kademlia<MemoryStore>>;

/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes(num: usize) -> (u64, Vec<TestSwarm>) {
build_nodes_with_config(num, Default::default())
fn build_node() -> (Multiaddr, TestSwarm) {
build_node_with_config(Default::default())
}

/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<TestSwarm>) {
let port_base = 1 + random::<u64>() % (u64::MAX - num as u64);
let mut result: Vec<Swarm<_, _>> = Vec::with_capacity(num);
fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(local_key))
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
.boxed();

for _ in 0 .. num {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(local_key))
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
.boxed();

let local_id = local_public_key.clone().into_peer_id();
let store = MemoryStore::new(local_id.clone());
let behaviour = Kademlia::with_config(local_id.clone(), store, cfg.clone());
result.push(Swarm::new(transport, behaviour, local_id));
}
let local_id = local_public_key.clone().into_peer_id();
let store = MemoryStore::new(local_id.clone());
let behaviour = Kademlia::with_config(local_id.clone(), store, cfg.clone());

for (i, s) in result.iter_mut().enumerate() {
Swarm::listen_on(s, Protocol::Memory(port_base + i as u64).into()).unwrap();
}
let mut swarm = Swarm::new(transport, behaviour, local_id);

let address: Multiaddr = Protocol::Memory(random::<u64>()).into();
Swarm::listen_on(&mut swarm, address.clone()).unwrap();

(port_base, result)
(address, swarm)
}

fn build_connected_nodes(total: usize, step: usize) -> (Vec<PeerId>, Vec<TestSwarm>) {
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes(num: usize) -> Vec<(Multiaddr, TestSwarm)> {
build_nodes_with_config(num, Default::default())
}

/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> Vec<(Multiaddr, TestSwarm)> {
(0..num).map(|_| build_node_with_config(cfg.clone())).collect()
}

fn build_connected_nodes(total: usize, step: usize) -> Vec<(Multiaddr, TestSwarm)> {
build_connected_nodes_with_config(total, step, Default::default())
}

fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaConfig)
-> (Vec<PeerId>, Vec<TestSwarm>)
-> Vec<(Multiaddr, TestSwarm)>
{
let (port_base, mut swarms) = build_nodes_with_config(total, cfg);
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
let mut swarms = build_nodes_with_config(total, cfg);
let swarm_ids: Vec<_> = swarms.iter()
.map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone()))
.collect();

let mut i = 0;
for (j, peer) in swarm_ids.iter().enumerate().skip(1) {
for (j, (addr, peer_id)) in swarm_ids.iter().enumerate().skip(1) {
if i < swarm_ids.len() {
swarms[i].add_address(&peer, Protocol::Memory(port_base + j as u64).into());
swarms[i].1.add_address(peer_id, addr.clone());
}
if j % step == 0 {
i += step;
}
}

(swarm_ids, swarms)
swarms
}

fn build_fully_connected_nodes_with_config(total: usize, cfg: KademliaConfig)
-> Vec<(Multiaddr, TestSwarm)>
{
let mut swarms = build_nodes_with_config(total, cfg);
let swarm_addr_and_peer_id: Vec<_> = swarms.iter()
.map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone()))
.collect();

for (_addr, swarm) in swarms.iter_mut() {
for (addr, peer) in &swarm_addr_and_peer_id {
swarm.add_address(&peer, addr.clone());
}
}

swarms
}

fn random_multihash() -> Multihash {
Expand All @@ -115,7 +137,11 @@ fn bootstrap() {
fn run(rng: &mut impl Rng) {
let num_total = rng.gen_range(2, 20);
let num_group = rng.gen_range(1, num_total);
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group);

let mut swarms = build_connected_nodes(num_total, num_group).into_iter()
.map(|(_a, s)| s)
.collect::<Vec<_>>();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();

swarms[0].bootstrap();

Expand Down Expand Up @@ -166,7 +192,10 @@ fn query_iter() {

fn run(rng: &mut impl Rng) {
let num_total = rng.gen_range(2, 20);
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, 1);
let mut swarms = build_connected_nodes(num_total, 1).into_iter()
.map(|(_a, s)| s)
.collect::<Vec<_>>();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();

// Ask the first peer in the list to search a random peer. The search should
// propagate forwards through the list of peers.
Expand Down Expand Up @@ -218,7 +247,9 @@ fn unresponsive_not_returned_direct() {
// Build one node. It contains fake addresses to non-existing nodes. We ask it to find a
// random peer. We make sure that no fake address is returned.

let (_, mut swarms) = build_nodes(1);
let mut swarms = build_nodes(1).into_iter()
.map(|(_a, s)| s)
.collect::<Vec<_>>();

// Add fake addresses.
for _ in 0 .. 10 {
Expand Down Expand Up @@ -258,16 +289,20 @@ fn unresponsive_not_returned_indirect() {
// non-existing nodes. We ask node #2 to find a random peer. We make sure that no fake address
// is returned.

let (port_base, mut swarms) = build_nodes(2);
let mut swarms = build_nodes(2);

// Add fake addresses to first.
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
for _ in 0 .. 10 {
swarms[0].add_address(&PeerId::random(), multiaddr![Udp(10u16)]);
swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)]);
}

// Connect second to first.
swarms[1].add_address(&first_peer_id, Protocol::Memory(port_base).into());
let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone();
let first_address = swarms[0].0.clone();
swarms[1].1.add_address(&first_peer_id, first_address);

// Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>();

// Ask second to search a random value.
let search_target = PeerId::random();
Expand Down Expand Up @@ -299,12 +334,19 @@ fn unresponsive_not_returned_indirect() {

#[test]
fn get_record_not_found() {
let (port_base, mut swarms) = build_nodes(3);
let mut swarms = build_nodes(3);

let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
let swarm_ids: Vec<_> = swarms.iter()
.map(|(_addr, swarm)| Swarm::local_peer_id(swarm))
.cloned()
.collect();

swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
let (second, third) = (swarms[1].0.clone(), swarms[2].0.clone());
swarms[0].1.add_address(&swarm_ids[1], second);
swarms[1].1.add_address(&swarm_ids[2], third);

// Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>();

let target_key = record::Key::from(random_multihash());
swarms[0].get_record(&target_key, Quorum::One);
Expand Down Expand Up @@ -347,7 +389,8 @@ fn put_record() {

let mut config = KademliaConfig::default();
config.set_replication_factor(replication_factor);
let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config);
let mut swarms = build_connected_nodes_with_config(num_total, num_group, config);
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();

let records = records.into_iter()
.take(num_total)
Expand Down Expand Up @@ -408,10 +451,14 @@ fn put_record() {
assert_eq!(r.key, expected.key);
assert_eq!(r.value, expected.value);
assert_eq!(r.expires, expected.expires);
assert_eq!(r.publisher.as_ref(), Some(&swarm_ids[0]));
assert_eq!(r.publisher.as_ref(), Some(Swarm::local_peer_id(&swarms[0])));

This comment has been minimized.

Copy link
@romanb

romanb Apr 14, 2020

Contributor

Why not use swarm_ids? Of if swarm_ids is not used, then do we need to construct it?

This comment has been minimized.

Copy link
@mxinden

mxinden Apr 14, 2020

Author Member

let key = kbucket::Key::new(r.key.clone());
let mut expected = swarm_ids.clone().split_off(1);
let mut expected = swarms.iter()

This comment has been minimized.

Copy link
@romanb

romanb Apr 14, 2020

Contributor

Why not use swarm_ids? Of if swarm_ids is not used, then do we need to construct it?

This comment has been minimized.

Copy link
@mxinden

mxinden Apr 14, 2020

Author Member
.skip(1)
.map(Swarm::local_peer_id)
.cloned()
.collect::<Vec<_>>();
expected.sort_by(|id1, id2|
kbucket::Key::new(id1.clone()).distance(&key).cmp(
&kbucket::Key::new(id2.clone()).distance(&key)));
Expand All @@ -421,10 +468,11 @@ fn put_record() {
.take(replication_factor.get())
.collect::<HashSet<_>>();

let actual = swarms.iter().enumerate().skip(1)
.filter_map(|(i, s)|
if s.store.get(key.preimage()).is_some() {
Some(swarm_ids[i].clone())
let actual = swarms.iter()
.skip(1)
.filter_map(|swarm|
if swarm.store.get(key.preimage()).is_some() {
Some(Swarm::local_peer_id(swarm).clone())
} else {
None
})
Expand Down Expand Up @@ -457,12 +505,15 @@ fn put_record() {

#[test]
fn get_value() {
let (port_base, mut swarms) = build_nodes(3);
let mut swarms = build_nodes(3);

let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
for i in 1..3 {
let (peer_id, address) = (Swarm::local_peer_id(&swarms[i].1).clone(), swarms[i].0.clone());
swarms[0].1.add_address(&peer_id, address);

This comment has been minimized.

Copy link
@romanb

romanb Apr 14, 2020

Contributor

This lets the first node know about the other two, but if I see it correctly, the previous behaviour was to let the first know about the second and the second about the third? Is the change intentional?

This comment has been minimized.

Copy link
@mxinden

mxinden Apr 14, 2020

Author Member

Thanks for catching this. Fixed in f2d49d8.

}

swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
// Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>();

let record = Record::new(random_multihash(), vec![4,5,6]);

Expand Down Expand Up @@ -496,7 +547,9 @@ fn get_value() {
fn get_value_many() {
// TODO: Randomise
let num_nodes = 12;
let (_, mut swarms) = build_connected_nodes(num_nodes, num_nodes);
let mut swarms = build_connected_nodes(num_nodes, 3).into_iter()
.map(|(_addr, swarm)| swarm)
.collect::<Vec<_>>();
let num_results = 10;

let record = Record::new(random_multihash(), vec![4,5,6]);
Expand Down Expand Up @@ -540,7 +593,8 @@ fn add_provider() {
let mut config = KademliaConfig::default();
config.set_replication_factor(replication_factor);

let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config);
let mut swarms = build_connected_nodes_with_config(num_total, num_group, config);
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();

This comment has been minimized.

Copy link
@romanb

romanb Apr 14, 2020

Contributor

Is this actually used below?

This comment has been minimized.

Copy link
@mxinden

let keys: HashSet<_> = keys.into_iter().take(num_total).collect();

Expand Down Expand Up @@ -594,10 +648,10 @@ fn add_provider() {
// each key was published to the `replication_factor` closest peers.
while let Some(key) = results.pop() {
// Collect the nodes that have a provider record for `key`.
let actual = swarms.iter().enumerate().skip(1)
.filter_map(|(i, s)|
if s.store.providers(&key).len() == 1 {
Some(swarm_ids[i].clone())
let actual = swarms.iter().skip(1)
.filter_map(|swarm|
if swarm.store.providers(&key).len() == 1 {
Some(Swarm::local_peer_id(&swarm).clone())
} else {
None
})
Expand All @@ -609,7 +663,11 @@ fn add_provider() {
return Poll::Pending
}

let mut expected = swarm_ids.clone().split_off(1);
let mut expected = swarms.iter()
.skip(1)
.map(Swarm::local_peer_id)
.cloned()
.collect::<Vec<_>>();
let kbucket_key = kbucket::Key::new(key);
expected.sort_by(|id1, id2|
kbucket::Key::new(id1.clone()).distance(&kbucket_key).cmp(
Expand All @@ -625,8 +683,8 @@ fn add_provider() {

// One round of publishing is complete.
assert!(results.is_empty());
for s in &swarms {
assert_eq!(s.queries.size(), 0);
for swarm in &swarms {
assert_eq!(swarm.queries.size(), 0);
}

if republished {
Expand Down Expand Up @@ -656,19 +714,19 @@ fn add_provider() {
/// arithmetic overflow, see https://github.com/libp2p/rust-libp2p/issues/1290.
#[test]
fn exceed_jobs_max_queries() {
let (_, mut swarms) = build_nodes(1);
let (_addr, mut swarm) = build_node();
let num = JOBS_MAX_QUERIES + 1;
for _ in 0 .. num {
swarms[0].bootstrap();
swarm.bootstrap();
}

assert_eq!(swarms[0].queries.size(), num);
assert_eq!(swarm.queries.size(), num);

block_on(
poll_fn(move |ctx| {
for _ in 0 .. num {
// There are no other nodes, so the queries finish instantly.
if let Poll::Ready(Some(e)) = swarms[0].poll_next_unpin(ctx) {
if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) {
if let KademliaEvent::BootstrapResult(r) = e {
assert!(r.is_ok(), "Unexpected error")
} else {
Expand Down

0 comments on commit b8f85fc

Please sign in to comment.