Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

fix: port listener to ES6 class syntax #214

Merged
merged 4 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import * as mafmt from '@multiformats/mafmt'
import errCode from 'err-code'
import { logger } from '@libp2p/logger'
import { toMultiaddrConnection } from './socket-to-conn.js'
import { createListener } from './listener.js'
import { TCPListener } from './listener.js'
import { multiaddrToNetConfig } from './utils.js'
import { AbortError } from '@libp2p/interfaces/errors'
import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js'
import { CreateListenerOptions, DialOptions, symbol, Transport } from '@libp2p/interface-transport'
import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from '@libp2p/interface-transport'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import type { Connection } from '@libp2p/interface-connection'
Expand Down Expand Up @@ -155,8 +155,8 @@ export class TCP implements Transport {
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
*/
createListener (options: TCPCreateListenerOptions) {
return createListener({
createListener (options: TCPCreateListenerOptions): Listener {
return new TCPListener({
...options,
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
Expand All @@ -166,7 +166,7 @@ export class TCP implements Transport {
/**
* Takes a list of `Multiaddr`s and returns only valid TCP addresses
*/
filter (multiaddrs: Multiaddr[]) {
filter (multiaddrs: Multiaddr[]): Multiaddr[] {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]

return multiaddrs.filter(ma => {
Expand Down
176 changes: 84 additions & 92 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,12 @@ import {
} from './utils.js'
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events'
import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection'
import type { Upgrader, Listener } from '@libp2p/interface-transport'
import type { Server } from 'net'
import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { TCPCreateListenerOptions } from './index.js'

const log = logger('libp2p:tcp:listener')

interface ServerWithMultiaddrConnections extends Server {
__connections: MultiaddrConnection[]
}

/**
* Attempts to close the given maConn. If a failure occurs, it will be logged
*/
Expand All @@ -37,20 +32,29 @@ interface Context extends TCPCreateListenerOptions {
socketCloseTimeout?: number
}

/**
* Create listener
*/
export function createListener (context: Context) {
const {
handler, upgrader, socketInactivityTimeout, socketCloseTimeout
} = context
type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null }

export class TCPListener extends EventEmitter<ListenerEvents> implements Listener {
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()

context.keepAlive = context.keepAlive ?? true
private status: Status = { started: false }

let peerId: string | null
let listeningAddr: Multiaddr
constructor (private readonly context: Context) {
super()

context.keepAlive = context.keepAlive ?? true

this.server = net.createServer(context, this.onSocket.bind(this))

this.server
.on('listening', () => this.dispatchEvent(new CustomEvent('listening')))
.on('error', err => this.dispatchEvent(new CustomEvent<Error>('error', { detail: err })))
.on('close', () => this.dispatchEvent(new CustomEvent('close')))
}

const server: ServerWithMultiaddrConnections = Object.assign(net.createServer(context, socket => {
private onSocket (socket: net.Socket) {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
Expand All @@ -59,9 +63,9 @@ export function createListener (context: Context) {
let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
listeningAddr,
socketInactivityTimeout,
socketCloseTimeout
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout
})
} catch (err) {
log.error('inbound connection failed', err)
Expand All @@ -70,16 +74,20 @@ export function createListener (context: Context) {

log('new inbound connection %s', maConn.remoteAddr)
try {
upgrader.upgradeInbound(maConn)
this.context.upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)
trackConn(server, maConn, socket)
this.connections.add(maConn)

if (handler != null) {
handler(conn)
socket.once('close', () => {
this.connections.delete(maConn)
})

if (this.context.handler != null) {
this.context.handler(conn)
}

listener.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
})
.catch(async err => {
log.error('inbound connection failed', err)
Expand All @@ -97,85 +105,69 @@ export function createListener (context: Context) {
log.error('closing inbound connection failed', err)
})
}
}),
// Keep track of open connections to destroy in case of timeout
{ __connections: [] })
}

const listener: Listener = Object.assign(new EventEmitter(), {
getAddrs: () => {
let addrs: Multiaddr[] = []
const address = server.address()
getAddrs () {
if (!this.status.started) {
return []
}

if (address == null) {
return []
}
let addrs: Multiaddr[] = []
const address = this.server.address()
const { listeningAddr, peerId } = this.status

if (typeof address === 'string') {
addrs = [listeningAddr]
} else {
try {
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}
} catch (err) {
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
if (address == null) {
return []
}

if (typeof address === 'string') {
addrs = [listeningAddr]
} else {
try {
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}
} catch (err) {
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
}
}

return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)
},
listen: async (ma: Multiaddr) => {
listeningAddr = ma
peerId = ma.getPeerId()

if (peerId == null) {
listeningAddr = ma.decapsulateCode(CODE_P2P)
}
return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)
}

return await new Promise<void>((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
server.listen(options, (err?: any) => {
if (err != null) {
return reject(err)
}
log('Listening on %s', server.address())
resolve()
})
})
},
close: async () => {
if (!server.listening) {
return
}
async listen (ma: Multiaddr) {
const peerId = ma.getPeerId()
const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma

await Promise.all(
server.__connections.map(async maConn => await attemptClose(maConn))
)
this.status = { started: true, listeningAddr, peerId }

await new Promise<void>((resolve, reject) => {
server.close(err => (err != null) ? reject(err) : resolve())
return await new Promise<void>((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
this.server.listen(options, (err?: any) => {
if (err != null) {
return reject(err)
}
log('Listening on %s', this.server.address())
resolve()
})
}
})

server
.on('listening', () => listener.dispatchEvent(new CustomEvent('listening')))
.on('error', err => listener.dispatchEvent(new CustomEvent<Error>('error', { detail: err })))
.on('close', () => listener.dispatchEvent(new CustomEvent('close')))
})
}

return listener
}
async close () {
if (!this.server.listening) {
return
}

function trackConn (server: ServerWithMultiaddrConnections, maConn: MultiaddrConnection, socket: net.Socket) {
server.__connections.push(maConn)
await Promise.all(
Array.from(this.connections.values()).map(async maConn => await attemptClose(maConn))
)

const untrackConn = () => {
server.__connections = server.__connections.filter(c => c !== maConn)
await new Promise<void>((resolve, reject) => {
this.server.close(err => (err != null) ? reject(err) : resolve())
})
}

socket.once('close', untrackConn)
}