Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use unidirectional streams v0.3.x #79

Merged
merged 2 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"err-code": "^2.0.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.0.1",
"libp2p-pubsub": "~0.4.2",
"libp2p-pubsub": "~0.4.5",
"p-map": "^4.0.0",
"peer-id": "~0.13.3",
"peer-info": "~0.17.0",
Expand Down
45 changes: 34 additions & 11 deletions test/2-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -299,28 +299,51 @@ describe('2 nodes', () => {
} = await createGossipsubNodes(2, true))
})

after(() => Promise.all(nodes.map((n) => n.stop())))

it('existing subscriptions are sent upon peer connection', async function () {
this.timeout(5000)
// Make subscriptions prior to new nodes
before(() => {
nodes[0].subscribe('Za')
nodes[1].subscribe('Zb')

expect(nodes[0].peers.size).to.equal(0)
expectSet(nodes[0].subscriptions, ['Za'])
expect(nodes[1].peers.size).to.equal(0)
expectSet(nodes[1].subscriptions, ['Zb'])
})

// Connect nodes
const onConnect0 = registrarRecords[0][multicodec].onConnect
const onConnect1 = registrarRecords[1][multicodec].onConnect
after(() => Promise.all(nodes.map((n) => n.stop())))

// Notice peers of connection
const [d0, d1] = ConnectionPair()
onConnect0(nodes[1].peerInfo, d0)
onConnect1(nodes[0].peerInfo, d1)
it('existing subscriptions are sent upon peer connection', async function () {
this.timeout(5000)

const dial = async () => {
// Connect nodes
const onConnect0 = registrarRecords[0][multicodec].onConnect
const onConnect1 = registrarRecords[1][multicodec].onConnect
const handle0 = registrarRecords[0][multicodec].handler
const handle1 = registrarRecords[1][multicodec].handler

// Notice peers of connection
const [d0, d1] = ConnectionPair()
await onConnect0(nodes[1].peerInfo, d0)
await handle1({
protocol: multicodec,
stream: d1.stream,
connection: {
remotePeer: nodes[0].peerInfo.id
}
})
await onConnect1(nodes[0].peerInfo, d1)
await handle0({
protocol: multicodec,
stream: d0.stream,
connection: {
remotePeer: nodes[1].peerInfo.id
}
})
}

await Promise.all([
dial(),
new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)),
new Promise((resolve) => nodes[1].once('pubsub:subscription-change', resolve))
])
Expand Down
56 changes: 52 additions & 4 deletions test/floodsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,27 @@ describe('gossipsub fallbacks to floodsub', () => {

const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
const handleGs = registrarRecords[0][floodsubMulticodec].handler
const handleFs = registrarRecords[1][floodsubMulticodec].handler

// Notice peers of connection
const [d0, d1] = ConnectionPair()
onConnectGs(nodeFs.peerInfo, d0)
onConnectFs(nodeGs.peerInfo, d1)
await onConnectGs(nodeFs.peerInfo, d0)
await handleFs({
protocol: floodsubMulticodec,
stream: d1.stream,
connection: {
remotePeer: nodeGs.peerInfo.id
}
})
await onConnectFs(nodeGs.peerInfo, d1)
await handleGs({
protocol: floodsubMulticodec,
stream: d0.stream,
connection: {
remotePeer: nodeFs.peerInfo.id
}
})
})

after(async function () {
Expand Down Expand Up @@ -167,11 +183,27 @@ describe('gossipsub fallbacks to floodsub', () => {

const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
const handleGs = registrarRecords[0][floodsubMulticodec].handler
const handleFs = registrarRecords[1][floodsubMulticodec].handler

// Notice peers of connection
const [d0, d1] = ConnectionPair()
onConnectGs(nodeFs.peerInfo, d0)
onConnectFs(nodeGs.peerInfo, d1)
await onConnectGs(nodeFs.peerInfo, d0)
await handleFs({
protocol: floodsubMulticodec,
stream: d1.stream,
connection: {
remotePeer: nodeGs.peerInfo.id
}
})
await onConnectFs(nodeGs.peerInfo, d1)
await handleGs({
protocol: floodsubMulticodec,
stream: d0.stream,
connection: {
remotePeer: nodeFs.peerInfo.id
}
})

nodeGs.subscribe(topic)
nodeFs.subscribe(topic)
Expand Down Expand Up @@ -288,11 +320,27 @@ describe('gossipsub fallbacks to floodsub', () => {

const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
const handleGs = registrarRecords[0][floodsubMulticodec].handler
const handleFs = registrarRecords[1][floodsubMulticodec].handler

// Notice peers of connection
const [d0, d1] = ConnectionPair()
await onConnectGs(nodeFs.peerInfo, d0)
await handleFs({
protocol: floodsubMulticodec,
stream: d1.stream,
connection: {
remotePeer: nodeGs.peerInfo.id
}
})
await onConnectFs(nodeGs.peerInfo, d1)
await handleGs({
protocol: floodsubMulticodec,
stream: d0.stream,
connection: {
remotePeer: nodeFs.peerInfo.id
}
})

nodeGs.subscribe(topic)
nodeFs.subscribe(topic)
Expand Down
4 changes: 2 additions & 2 deletions test/gossip.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('gossip', () => {
// add subscriptions to each node
nodes.forEach((n) => n.subscribe(topic))

connectGossipsubNodes(nodes, registrarRecords, multicodec)
await connectGossipsubNodes(nodes, registrarRecords, multicodec)

await new Promise((resolve) => setTimeout(resolve, 1000))

Expand Down Expand Up @@ -69,7 +69,7 @@ describe('gossip', () => {
nodes.forEach((n) => n.subscribe(topic))

// every node connected to every other
connectGossipsubNodes(nodes, registrarRecords, multicodec)
await connectGossipsubNodes(nodes, registrarRecords, multicodec)
await new Promise((resolve) => setTimeout(resolve, 500))
// await mesh rebalancing
await Promise.all(nodes.map((n) => new Promise((resolve) => n.once('gossipsub:heartbeat', resolve))))
Expand Down
20 changes: 18 additions & 2 deletions test/mesh.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,31 @@ describe('mesh overlay', () => {
// connect N (< GossipsubD) nodes to node0
const N = 4
const onConnect0 = registrarRecords[0][multicodec].onConnect
const handle0 = registrarRecords[0][multicodec].handler

for (let i = nodes.length; i > nodes.length - N; i--) {
const n = i - 1
const onConnectN = registrarRecords[n][multicodec].onConnect
const handleN = registrarRecords[n][multicodec].handler

// Notice peers of connection
const [d0, d1] = ConnectionPair()
onConnect0(nodes[n].peerInfo, d0)
onConnectN(nodes[0].peerInfo, d1)
await onConnect0(nodes[n].peerInfo, d0)
await handleN({
protocol: multicodec,
stream: d1.stream,
connection: {
remotePeer: nodes[0].peerInfo.id
}
})
await onConnectN(nodes[0].peerInfo, d1)
await handle0({
protocol: multicodec,
stream: d0.stream,
connection: {
remotePeer: nodes[n].peerInfo.id
}
})
}

// await mesh rebalancing
Expand Down
Loading