Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(@lib2p/transport-webrtc): add webRTC transport compliance tests #2053

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
521c8e6
refactor(@libp2p/interface-compliance-tests): for listening addrs (#1…
maschad Sep 11, 2023
a375b6d
wip
maschad Sep 12, 2023
4acaea4
Merge branch 'master' into test/add-webrtc-transport-tests
maschad Sep 13, 2023
bcc0c49
wip
maschad Sep 14, 2023
86f7f26
Merge branch 'master' into test/add-webrtc-transport-tests
maschad Sep 14, 2023
8d262f8
wip
maschad Sep 15, 2023
fa05fe1
wip
maschad Sep 18, 2023
b7fa73c
Merge branch 'master' into test/add-webrtc-transport-tests
maschad Sep 20, 2023
6022a52
test: update test setup for webRTC (#1836)
maschad Sep 21, 2023
30b2513
fix: close webrtc streams
achingbrain Sep 23, 2023
7d1ff01
fix: add FIN_ACK message to WebRTC streams to prevent early closing
achingbrain Sep 23, 2023
005ddbf
chore: revert aegir change
achingbrain Sep 23, 2023
d4ef6ca
chore: fix up tests
achingbrain Sep 26, 2023
772cc10
Merge branch 'master' into fix/webrtc-stream-closing
achingbrain Sep 26, 2023
7fada6c
Merge branch 'fix/webrtc-stream-closing' into fix/add-webrtc-fin-ack
achingbrain Sep 26, 2023
7a6e0b4
chore: fix deps
achingbrain Sep 26, 2023
ae2bdf6
chore: fix linting
achingbrain Sep 26, 2023
35cc13b
Merge branch 'fix/webrtc-stream-closing' into fix/add-webrtc-fin-ack
achingbrain Sep 26, 2023
519e472
chore: fix deps
achingbrain Sep 26, 2023
0425b20
chore: add comment
achingbrain Sep 26, 2023
e8f9730
chore: update dep
achingbrain Sep 26, 2023
08a9d13
Merge branch 'fix/webrtc-stream-closing' into fix/add-webrtc-fin-ack
achingbrain Sep 26, 2023
9ef3fcb
chore: send/wait for FIN_ACK on close
achingbrain Sep 26, 2023
565b91e
chore: restore status
achingbrain Sep 26, 2023
3d99139
Merge branch 'master' into test/add-webrtc-transport-tests
maschad Sep 29, 2023
52c814b
Merge branch 'fix/add-webrtc-fin-ack' into test/add-webrtc-transport-…
maschad Sep 29, 2023
bfb8fe2
wip
maschad Sep 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions packages/interface-compliance-tests/src/mocks/transport-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
import type { TransportManager } from "@libp2p/interface-internal/transport-manager"
import type { EventEmitter } from "@libp2p/interface/events"
import type { Libp2pEvents } from "@libp2p/interface"
import type { Startable } from "@libp2p/interface/startable"
import { FaultTolerance, type Listener, type Transport, type Upgrader } from "@libp2p/interface/transport"
import type { Connection } from "@libp2p/interface/src/connection"
import type { Multiaddr } from "@multiformats/multiaddr"
import { CodeError } from "@libp2p/interface/errors"

export interface MockTransportManagerComponents {
events: EventEmitter<Libp2pEvents>
upgrader: Upgrader
}

class MockTransportManager implements TransportManager, Startable {
private readonly components: MockTransportManagerComponents
private readonly transports: Map<string, Transport>
private readonly listeners: Map<string, Listener[]>
private readonly faultTolerance: FaultTolerance
private started: boolean

constructor(components: MockTransportManagerComponents) {
this.components = components
this.started = false
this.transports = new Map<string, Transport>()
this.listeners = new Map<string, Listener[]>()
this.faultTolerance = FaultTolerance.FATAL_ALL
}

isStarted(): boolean {
return this.started
}

async start(): Promise<void> {
this.started = true
}

async stop(): Promise<void> {
const tasks = []
for (const [_, listeners] of this.listeners) {
while (listeners.length > 0) {
const listener = listeners.pop()

if (listener == null) {
continue
}

tasks.push(listener.close())
}
}

await Promise.all(tasks)
for (const key of this.listeners.keys()) {
this.listeners.set(key, [])
}

this.started = false
}

getAddrs(): Multiaddr[] {
let addrs: Multiaddr[] = []
for (const listeners of this.listeners.values()) {
for (const listener of listeners) {
addrs = [...addrs, ...listener.getAddrs()]
}
}
return addrs
}

/**
* Returns all the transports instances
*/
getTransports(): Transport[] {
return Array.of(...this.transports.values())
}

/**
* Returns all the listener instances
*/
getListeners(): Listener[] {
return Array.of(...this.listeners.values()).flat()
}

add(transport: Transport): void {
const tag = transport[Symbol.toStringTag]

if (tag == null) {
throw new CodeError("Transport must have a valid tag", "INVALID_TAG")
}

if (this.transports.has(tag)) {
throw new CodeError(`There is already a transport with the tag ${tag}`, "DUPLICATE")
}

this.transports.set(tag, transport)

if (!this.listeners.has(tag)) {
this.listeners.set(tag, [])
}
}

transportForMultiaddr(ma: Multiaddr): Transport | undefined {
for (const transport of this.transports.values()) {
const addrs = transport.filter([ma])

if (addrs.length > 0) {
return transport
}
}
}

async dial(ma: Multiaddr, options?: any): Promise<Connection> {
const transport = this.transportForMultiaddr(ma)

if (transport == null) {
throw new CodeError(`No transport available for address ${String(ma)}`, "TRANSPORT_UNAVAILABLE")
}

try {
return await transport.dial(ma, {
...options,
upgrader: this.components.upgrader,
})
} catch (err: any) {
if (err.code == null) {
err.code = "TRANSPORT_DIAL_FAILED"
}

throw err
}
}

/**
* Starts listeners for each listen Multiaddr
*/
async listen(addrs: Multiaddr[]): Promise<void> {
if (!this.isStarted()) {
throw new CodeError("Not started", "ERR_NODE_NOT_STARTED")
}

if (addrs == null || addrs.length === 0) {
return
}

const couldNotListen = []

for (const [key, transport] of this.transports.entries()) {
const supportedAddrs = transport.filter(addrs)
const tasks = []

// For each supported multiaddr, create a listener
for (const addr of supportedAddrs) {
const listener = transport.createListener({
upgrader: this.components.upgrader,
})

let listeners: Listener[] = this.listeners.get(key) ?? []

if (listeners == null) {
listeners = []
this.listeners.set(key, listeners)
}

listeners.push(listener)

// Track listen/close events
listener.addEventListener("listening", () => {
this.components.events.safeDispatchEvent("transport:listening", {
detail: listener,
})
})
listener.addEventListener("close", () => {
const index = listeners.findIndex((l) => l === listener)

// remove the listener
listeners.splice(index, 1)

this.components.events.safeDispatchEvent("transport:close", {
detail: listener,
})
})

// We need to attempt to listen on everything
tasks.push(listener.listen(addr))
}

// Keep track of transports we had no addresses for
if (tasks.length === 0) {
couldNotListen.push(key)
continue
}

const results = await Promise.allSettled(tasks)

const isListening = results.find((r) => r.status === "fulfilled")
if (isListening == null && this.faultTolerance !== FaultTolerance.NO_FATAL) {
throw new CodeError(`Transport (${key}) could not listen on any available address`, "ERR_NO_VALID_ADDRESSES")
}
}

if (couldNotListen.length === this.transports.size) {
const message = `no valid addresses were provided for transports [${couldNotListen.join(", ")}]`
if (this.faultTolerance === FaultTolerance.FATAL_ALL) {
throw new CodeError(message, "ERR_NO_VALID_ADDRESSES")
}
}
}

async remove(key: string): Promise<void> {
// Close any running listeners
for (const listener of this.listeners.get(key) ?? []) {
await listener.close()
}

this.transports.delete(key)
this.listeners.delete(key)
}

async removeAll(): Promise<void> {
const tasks = []
for (const key of this.transports.keys()) {
tasks.push(this.remove(key))
}

await Promise.all(tasks)
}
}

export function mockTransportManager(components: MockTransportManagerComponents): TransportManager {
return new MockTransportManager(components)
}
23 changes: 14 additions & 9 deletions packages/interface-compliance-tests/src/transport/dial-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
let upgrader: Upgrader
let registrar: Registrar
let addrs: Multiaddr[]
let listeningAddrs: Multiaddr[]
let transport: Transport
let connector: Connector
let listener: Listener
let hasListener: boolean

before(async () => {
registrar = mockRegistrar()
Expand All @@ -30,24 +32,27 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
events: new EventEmitter()
});

({ addrs, transport, connector } = await common.setup())
({ addrs, transport, connector, listeningAddrs =[], hasListener = true } = await common.setup())
})

after(async () => {
await common.teardown()
})

beforeEach(async () => {
listener = transport.createListener({
upgrader
})
await listener.listen(addrs[0])
if (hasListener) {
listener = transport.createListener({
upgrader
})
listeningAddrs.length > 0 ? await listener.listen(listeningAddrs[0]) : await listener.listen(addrs[0])
}
})

afterEach(async () => {
sinon.restore()
connector.restore()
await listener.close()
if (hasListener)
await listener.close()
})

it('simple', async () => {
Expand All @@ -56,13 +61,13 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
void pipe([
uint8ArrayFromString('hey')
],
data.stream,
drain
data.stream,
drain
)
})

const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
const conn = await transport.dial(addrs[0], {
const conn = await transport.dial(listeningAddrs[0], {
upgrader
})

Expand Down
2 changes: 2 additions & 0 deletions packages/interface-compliance-tests/src/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ export interface Connector {

export interface TransportTestFixtures {
addrs: Multiaddr[]
listeningAddrs?: Multiaddr[]
transport: Transport
connector: Connector
hasListener?: boolean
}

export default (common: TestSetup<TransportTestFixtures>): void => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
describe('listen', () => {
let upgrader: Upgrader
let addrs: Multiaddr[]
let listeningAddrs: Multiaddr[]
let transport: Transport
let registrar: Registrar

Expand All @@ -30,7 +31,7 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
events: new EventEmitter()
});

({ transport, addrs } = await common.setup())
({ transport, addrs, listeningAddrs = [] } = await common.setup())
})

after(async () => {
Expand All @@ -45,7 +46,7 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
const listener = transport.createListener({
upgrader
})
await listener.listen(addrs[0])
listeningAddrs.length > 0 ? await listener.listen(listeningAddrs[0]) : await listener.listen(addrs[0])
await listener.close()
})

Expand All @@ -66,7 +67,7 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
})

// Listen
await listener.listen(addrs[0])
listeningAddrs.length > 0 ? await listener.listen(listeningAddrs[0]) : await listener.listen(addrs[0])

// Create two connections to the listener
const [conn1] = await Promise.all([
Expand Down Expand Up @@ -113,7 +114,7 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
})

// Listen
await listener.listen(addrs[0])
listeningAddrs.length > 0 ? await listener.listen(listeningAddrs[0]) : await listener.listen(addrs[0])

// Create a connection to the listener
const conn = await transport.dial(addrs[0], {
Expand All @@ -139,7 +140,7 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
})

void (async () => {
await listener.listen(addrs[0])
listeningAddrs.length > 0 ? await listener.listen(listeningAddrs[0]) : await listener.listen(addrs[0])
await transport.dial(addrs[0], {
upgrader
})
Expand All @@ -159,7 +160,8 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
listener.addEventListener('listening', () => {
listener.close().then(done, done)
})
void listener.listen(addrs[0])
const addrToListenOn = listeningAddrs.length > 0 ? listeningAddrs[0] : addrs[0]
void listener.listen(addrToListenOn)
})

it('error', (done) => {
Expand All @@ -182,7 +184,7 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
listener.addEventListener('close', () => { done() })

void (async () => {
await listener.listen(addrs[0])
listeningAddrs.length > 0 ? await listener.listen(listeningAddrs[0]) : await listener.listen(addrs[0])
await listener.close()
})()
})
Expand Down
4 changes: 4 additions & 0 deletions packages/interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,15 @@
"it-stream-types": "^2.0.1",
"multiformats": "^12.0.1",
"p-defer": "^4.0.0",
"race-signal": "^1.0.0",
"uint8arraylist": "^2.4.3"
},
"devDependencies": {
"@types/sinon": "^10.0.15",
"aegir": "^40.0.8",
"delay": "^6.0.0",
"it-all": "^3.0.3",
"it-drain": "^3.0.3",
"sinon": "^16.0.0",
"sinon-ts": "^1.0.0"
}
Expand Down
Loading
Loading