Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: update interfaces (#146)
Browse files Browse the repository at this point in the history
Updates to latest code from libp2p/js-libp2p-interfaces#180
  • Loading branch information
achingbrain authored Mar 16, 2022
1 parent f433ef9 commit 26ef08b
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 108 deletions.
12 changes: 7 additions & 5 deletions .aegir.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ module.exports = {

const protocol = '/echo/1.0.0'
const registrar = mockRegistrar()
registrar.handle(protocol, (evt) => {
registrar.handle(protocol, ({ stream }) => {
void pipe(
evt.detail.stream,
evt.detail.stream
stream,
stream
)
})
const upgrader = mockUpgrader({
registrar
})

const ws = new WebSockets({ upgrader })
const ws = new WebSockets()
const ma = new Multiaddr('/ip4/127.0.0.1/tcp/9095/ws')
const listener = ws.createListener()
const listener = ws.createListener({
upgrader
})
await listener.listen(ma)
listener.addEventListener('error', (evt) => {
console.error(evt.detail)
Expand Down
17 changes: 8 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@
"release": "semantic-release"
},
"dependencies": {
"@libp2p/logger": "^1.0.2",
"@libp2p/utils": "^1.0.0",
"@multiformats/mafmt": "^11.0.1",
"@multiformats/multiaddr": "^10.0.0",
"@libp2p/logger": "^1.1.2",
"@libp2p/utils": "^1.0.9",
"@multiformats/mafmt": "^11.0.2",
"@multiformats/multiaddr": "^10.1.5",
"@multiformats/multiaddr-to-uri": "^9.0.0",
"abortable-iterator": "^4.0.2",
"err-code": "^3.0.1",
Expand All @@ -165,19 +165,18 @@
"wherearewe": "^1.0.0"
},
"devDependencies": {
"@libp2p/interface-compliance-tests": "^1.1.2",
"@libp2p/interfaces": "^1.3.2",
"@libp2p/interface-compliance-tests": "^1.1.17",
"@libp2p/interfaces": "^1.3.14",
"@types/ws": "^8.2.2",
"aegir": "^36.1.3",
"is-loopback-addr": "^2.0.1",
"it-all": "^1.0.6",
"it-drain": "^1.0.5",
"it-goodbye": "^4.0.1",
"it-pipe": "^2.0.2",
"it-pipe": "^2.0.3",
"it-take": "^1.0.2",
"p-wait-for": "^4.1.0",
"uint8arrays": "^3.0.0",
"util": "^0.12.3"
"uint8arrays": "^3.0.0"
},
"browser": {
"./dist/src/listener.js": "./dist/src/listener.browser.js"
Expand Down
52 changes: 27 additions & 25 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,50 @@ import env from 'wherearewe'
import { createListener } from './listener.js'
import { socketToMaConn } from './socket-to-conn.js'
import * as filters from './filters.js'
import type { Transport, Upgrader, MultiaddrFilter } from '@libp2p/interfaces/transport'
import { Transport, MultiaddrFilter, symbol, CreateListenerOptions, DialOptions } from '@libp2p/interfaces/transport'
import type { AbortOptions } from '@libp2p/interfaces'
import type { WebSocketListenerOptions } from './listener.js'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { DuplexWebSocket } from 'it-ws/dist/src/duplex'
import type { DuplexWebSocket } from 'it-ws/duplex'
import type { ClientOptions } from 'ws'
import type { Server } from 'http'

const log = logger('libp2p:websockets')

/**
* @class WebSockets
*/
export class WebSockets implements Transport<AbortOptions & WebSocketOptions, WebSocketListenerOptions> {
private readonly upgrader: Upgrader
private readonly _filter?: MultiaddrFilter
export interface WebSocketsInit extends AbortOptions, WebSocketOptions {
filter?: MultiaddrFilter
websocket?: ClientOptions
server?: Server
}

constructor (opts: { upgrader: Upgrader, filter?: MultiaddrFilter }) {
const { upgrader, filter } = opts
export class WebSockets implements Transport {
private readonly init?: WebSocketsInit

if (upgrader == null) {
throw new Error('An upgrader must be provided. See https://github.com/libp2p/js-libp2p-interfaces/tree/master/packages/libp2p-interfaces/src/transport#upgrader')
}
constructor (init?: WebSocketsInit) {
this.init = init
}

this.upgrader = upgrader
this._filter = filter
get [Symbol.toStringTag] () {
return this.constructor.name
}

[Symbol.toStringTag] = 'WebSockets'
get [symbol] (): true {
return true
}

async dial (ma: Multiaddr, options?: AbortOptions & WebSocketOptions) {
async dial (ma: Multiaddr, options: DialOptions) {
log('dialing %s', ma)
options = options ?? {}

const socket = await this._connect(ma, options)
const maConn = socketToMaConn(socket, ma)
log('new outbound connection %s', maConn.remoteAddr)

const conn = await this.upgrader.upgradeOutbound(maConn)
const conn = await options.upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}

async _connect (ma: Multiaddr, options: AbortOptions & WebSocketOptions): Promise<DuplexWebSocket> {
async _connect (ma: Multiaddr, options: AbortOptions): Promise<DuplexWebSocket> {
if (options?.signal?.aborted === true) {
throw new AbortError()
}
Expand All @@ -62,7 +64,7 @@ export class WebSockets implements Transport<AbortOptions & WebSocketOptions, We
errorPromise.reject(err)
}

const rawSocket = connect(toUri(ma), options)
const rawSocket = connect(toUri(ma), this.init)

if (rawSocket.socket.on != null) {
rawSocket.socket.on('error', errfn)
Expand Down Expand Up @@ -115,8 +117,8 @@ export class WebSockets implements Transport<AbortOptions & WebSocketOptions, We
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`
*/
createListener (options?: WebSocketListenerOptions) {
return createListener(this.upgrader, options)
createListener (options: CreateListenerOptions) {
return createListener({ ...this.init, ...options })
}

/**
Expand All @@ -127,8 +129,8 @@ export class WebSockets implements Transport<AbortOptions & WebSocketOptions, We
filter (multiaddrs: Multiaddr[]) {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]

if (this._filter != null) {
return this._filter(multiaddrs)
if (this.init?.filter != null) {
return this.init?.filter(multiaddrs)
}

// Browser
Expand Down
23 changes: 11 additions & 12 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { createServer } from 'it-ws/server'
import { logger } from '@libp2p/logger'
import { socketToMaConn } from './socket-to-conn.js'
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
import type { ListenerOptions, Upgrader, Listener, ListenerEvents } from '@libp2p/interfaces/transport'
import type { Listener, ListenerEvents, CreateListenerOptions } from '@libp2p/interfaces/transport'
import type { Server } from 'http'
import type { WebSocketServer } from 'it-ws/server'
import type { DuplexWebSocket } from 'it-ws/duplex'
import { EventEmitter, CustomEvent } from '@libp2p/interfaces'
import type { Connection } from '@libp2p/interfaces/connection'

const log = logger('libp2p:websockets:listener')

Expand All @@ -17,7 +18,7 @@ class WebSocketListener extends EventEmitter<ListenerEvents> implements Listener
private listeningMultiaddr?: Multiaddr
private readonly server: WebSocketServer

constructor (upgrader: Upgrader, options: WebSocketListenerOptions) {
constructor (init: WebSocketListenerInit) {
super()

// Keep track of open connections to destroy when the listener is closed
Expand All @@ -26,7 +27,7 @@ class WebSocketListener extends EventEmitter<ListenerEvents> implements Listener
const self = this // eslint-disable-line @typescript-eslint/no-this-alias

this.server = createServer({
...options,
...init,
onConnection: (stream: DuplexWebSocket) => {
const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0))
log('new inbound connection %s', maConn.remoteAddr)
Expand All @@ -38,15 +39,15 @@ class WebSocketListener extends EventEmitter<ListenerEvents> implements Listener
})

try {
void upgrader.upgradeInbound(maConn)
void init.upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)

if (options?.handler != null) {
options?.handler(conn)
if (init?.handler != null) {
init?.handler(conn)
}

self.dispatchEvent(new CustomEvent('connection', {
self.dispatchEvent(new CustomEvent<Connection>('connection', {
detail: conn
}))
})
Expand Down Expand Up @@ -149,12 +150,10 @@ class WebSocketListener extends EventEmitter<ListenerEvents> implements Listener
}
}

export interface WebSocketListenerOptions extends ListenerOptions {
export interface WebSocketListenerInit extends CreateListenerOptions {
server?: Server
}

export function createListener (upgrader: Upgrader, options?: WebSocketListenerOptions): Listener {
options = options ?? {}

return new WebSocketListener(upgrader, options)
export function createListener (init: WebSocketListenerInit): Listener {
return new WebSocketListener(init)
}
6 changes: 3 additions & 3 deletions test/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ describe('libp2p-websockets', () => {
let conn: Connection

beforeEach(async () => {
ws = new WebSockets({ upgrader: mockUpgrader() })
conn = await ws.dial(ma)
ws = new WebSockets()
conn = await ws.dial(ma, { upgrader: mockUpgrader() })
})

afterEach(async () => {
Expand Down Expand Up @@ -86,6 +86,6 @@ describe('libp2p-websockets', () => {
})

it('.createServer throws in browser', () => {
expect(new WebSockets({ upgrader: mockUpgrader() }).createListener).to.throw()
expect(new WebSockets().createListener).to.throw()
})
})
15 changes: 4 additions & 11 deletions test/compliance.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@ import { Multiaddr } from '@multiformats/multiaddr'
import http from 'http'
import { WebSockets } from '../src/index.js'
import * as filters from '../src/filters.js'
import type { WebSocketListenerOptions } from '../src/listener.js'
import type { WebSocketListenerInit } from '../src/listener.js'

describe('interface-transport compliance', () => {
tests({
async setup (args) {
if (args == null) {
throw new Error('No args')
}

const { upgrader } = args
const ws = new WebSockets({ upgrader, filter: filters.all })
async setup () {
const ws = new WebSockets({ filter: filters.all })
const addrs = [
new Multiaddr('/ip4/127.0.0.1/tcp/9091/ws'),
new Multiaddr('/ip4/127.0.0.1/tcp/9092/ws'),
Expand All @@ -24,9 +19,7 @@ describe('interface-transport compliance', () => {
]

let delayMs = 0
const delayedCreateListener = (options?: WebSocketListenerOptions) => {
options = options ?? {}

const delayedCreateListener = (options: WebSocketListenerInit) => {
// A server that will delay the upgrade event by delayMs
options.server = new Proxy(http.createServer(), {
get (server, prop) {
Expand Down
Loading

0 comments on commit 26ef08b

Please sign in to comment.