Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial implementation #2

Merged
merged 35 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5764e5d
minimal implementation
wemeetagain Jun 7, 2022
22da7f7
Add keepalive
wemeetagain Jun 8, 2022
768fb43
Implement maxIncomingStreams functionality
wemeetagain Jun 8, 2022
7d5199a
maxMessageSize, initialWindowSize functionality
wemeetagain Jun 8, 2022
c4330e7
Add a comment
wemeetagain Jun 8, 2022
1cce27d
Clean up config object
wemeetagain Jun 8, 2022
af28760
Clean up close
wemeetagain Jun 8, 2022
5280899
More tweaks
wemeetagain Jun 8, 2022
233c7fa
Add readme
wemeetagain Jun 9, 2022
d2f54cd
remove todos
wemeetagain Jun 9, 2022
ddeb046
Add some tests
wemeetagain Jun 16, 2022
5d38188
Add missing dev dependency
wemeetagain Jun 16, 2022
6e25085
Fix window overflow test
wemeetagain Jun 16, 2022
708c5ef
Update interfaces
wemeetagain Jun 16, 2022
c18e25c
Fix some tests
wemeetagain Jun 16, 2022
d5b3259
Tweak keepalive test
wemeetagain Jun 16, 2022
0a067e6
No keepalive in compliance tests
wemeetagain Jun 17, 2022
305fea9
Remove unneeded parts of the package.json
wemeetagain Jun 17, 2022
bb0e4b5
Add maxIncomingStreams config check
wemeetagain Jun 17, 2022
764fefa
Add maxOutgoingStreams
wemeetagain Jun 17, 2022
43e2256
Update config property names
wemeetagain Jun 17, 2022
85824c5
Add more comments
wemeetagain Jun 17, 2022
cc3e29f
Add to gitignore
wemeetagain Jun 17, 2022
1324aa1
Add sanity check to decoder
wemeetagain Jun 18, 2022
4444b24
Add decode unit tests
wemeetagain Jun 18, 2022
ca68cc9
More comments
wemeetagain Jun 18, 2022
1f043b8
Tweak test
wemeetagain Jun 18, 2022
60d41e8
Tweak muxer factory
wemeetagain Jun 19, 2022
9766631
Update muxer.close
wemeetagain Jun 20, 2022
ccdbd95
Add `direction` to YamuxMuxerInit
wemeetagain Jun 20, 2022
23ef45a
Review tweaks
wemeetagain Jul 4, 2022
db592b9
Update libp2p dependencies
wemeetagain Jul 4, 2022
b351a09
Add some benchmarks
wemeetagain Jul 4, 2022
e0b6579
tweak benchmark
wemeetagain Jul 6, 2022
a3c2d37
chore: update libp2p dependencies
wemeetagain Sep 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"clean": "aegir clean",
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"benchmark": "benchmark dist/test/bench/*.bench.js --timeout 400000",
"build": "aegir build",
"test": "aegir test",
"test:chrome": "aegir test -t browser",
Expand All @@ -80,11 +81,11 @@
"test:electron-main": "aegir test -t electron-main"
},
"dependencies": {
"@libp2p/components": "^2.0.0",
"@libp2p/interface-connection": "^2.0.0",
"@libp2p/interface-stream-muxer": "^1.0.2",
"@libp2p/components": "^2.0.1",
"@libp2p/interface-connection": "^2.1.1",
"@libp2p/interface-stream-muxer": "^2.0.1",
"@libp2p/logger": "^2.0.0",
"@libp2p/tracked-map": "^1.0.8",
"@libp2p/tracked-map": "^2.0.1",
"abortable-iterator": "^4.0.2",
"any-signal": "^3.0.1",
"err-code": "^3.0.1",
Expand All @@ -96,7 +97,9 @@
"uint8arrays": "^3.0.0"
},
"devDependencies": {
"@libp2p/interface-stream-muxer-compliance-tests": "^2.0.0",
"@dapplion/benchmark": "^0.2.2",
"@libp2p/interface-stream-muxer-compliance-tests": "^3.0.1",
"@libp2p/mplex": "^4.0.0",
"aegir": "^37.3.0",
"it-drain": "^1.0.5",
"it-pair": "2.0.2",
Expand Down
4 changes: 2 additions & 2 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ export const ERR_DECODE_INVALID_VERSION = 'ERR_DECODE_INVALID_VERSION'
export const ERR_BOTH_CLIENTS = 'ERR_BOTH_CLIENTS'
export const ERR_RECV_WINDOW_EXCEEDED = 'ERR_RECV_WINDOW_EXCEEDED'

export const PROTOCOL_ERRORS = [
export const PROTOCOL_ERRORS = new Set([
ERR_INVALID_FRAME,
ERR_UNREQUESTED_PING,
ERR_NOT_MATCHING_PING,
ERR_STREAM_ALREADY_EXISTS,
ERR_DECODE_INVALID_VERSION,
ERR_BOTH_CLIENTS,
ERR_RECV_WINDOW_EXCEEDED
]
])

// local errors

Expand Down
22 changes: 13 additions & 9 deletions src/decode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@ import { FrameHeader, FrameType, HEADER_LENGTH, YAMUX_VERSION } from './frame.js
import { ERR_DECODE_INVALID_VERSION, ERR_DECODE_IN_PROGRESS } from './constants.js'
import type { Source } from 'it-stream-types'

wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
// used to bitshift in decoding
// native bitshift can overflow into a negative number, so we bitshift by multiplying by a power of 2
const twoPow24 = 2 ** 24

/**
* Decode a header from the front of a buffer
*
* @param buffer - Assumed to have enough bytes for a header
* @param data - Assumed to have enough bytes for a header
*/
export function decodeHeader (buffer: Uint8ArrayList): FrameHeader {
if (buffer.get(0) !== YAMUX_VERSION) {
export function decodeHeader (data: Uint8Array): FrameHeader {
if (data[0] !== YAMUX_VERSION) {
throw errcode(new Error('Invalid frame version'), ERR_DECODE_INVALID_VERSION)
}
return {
type: buffer.getUint8(1),
flag: buffer.getUint16(2, false),
streamID: buffer.getUint32(4, false),
length: buffer.getUint32(8, false)
type: data[1],
flag: (data[2] << 8) + data[3],
streamID: (data[4] * twoPow24) + (data[5] << 16) + (data[6] << 8) + data[7],
length: (data[8] * twoPow24) + (data[9] << 16) + (data[10] << 8) + data[11]
}
}

Expand Down Expand Up @@ -70,7 +74,7 @@ export class Decoder {
this.frameInProgress = true
yield {
header,
readData: async () => await this.readBytes(length)
readData: this.readBytes.bind(this, length)
}
} else {
yield { header }
Expand All @@ -91,7 +95,7 @@ export class Decoder {
return
}

const header = decodeHeader(this.buffer)
const header = decodeHeader(this.buffer.slice(0, HEADER_LENGTH))
this.buffer.consume(HEADER_LENGTH)
return header
}
Expand Down
20 changes: 14 additions & 6 deletions src/encode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,23 @@ export function encodeFrame (header: FrameHeader, data?: Uint8Array): Uint8Array
frame = new Uint8Array(HEADER_LENGTH)
}

const frameView = new DataView(frame.buffer)

// always assume version 0
// frameView.setUint8(0, header.version)

frameView.setUint8(1, header.type)
frameView.setUint16(2, header.flag, false)
frameView.setUint32(4, header.streamID, false)
frameView.setUint32(8, header.length, false)
frame[1] = header.type

frame[2] = header.flag >>> 8 & 255
frame[3] = header.flag & 255

frame[4] = header.streamID >>> 24 & 255
frame[5] = header.streamID >>> 16 & 255
frame[6] = header.streamID >>> 8 & 255
frame[7] = header.streamID & 255

frame[8] = header.length >>> 24 & 255
frame[9] = header.length >>> 16 & 255
frame[10] = header.length >>> 8 & 255
frame[11] = header.length & 255

return frame
}
11 changes: 4 additions & 7 deletions src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import type { Logger } from '@libp2p/logger'
const YAMUX_PROTOCOL_ID = '/yamux/1.0.0'

export interface YamuxMuxerInit extends StreamMuxerInit, Partial<Config> {
/** True if client, false if server */
direction?: 'inbound' | 'outbound'
}

export class Yamux implements StreamMuxerFactory, Initializable {
Expand All @@ -39,7 +37,6 @@ export class Yamux implements StreamMuxerFactory, Initializable {

createStreamMuxer (init?: YamuxMuxerInit): YamuxMuxer {
return new YamuxMuxer(this.components, {
direction: 'inbound',
...this._init,
...init
})
Expand Down Expand Up @@ -130,7 +127,7 @@ export class YamuxMuxer implements StreamMuxer {
} catch (err: unknown) {
// either a protocol or internal error
const errCode = (err as {code: string}).code
if (PROTOCOL_ERRORS.includes(errCode)) {
if (PROTOCOL_ERRORS.has(errCode)) {
this.log?.error('protocol error in sink', err)
reason = GoAwayCode.ProtocolError
} else {
Expand Down Expand Up @@ -158,7 +155,7 @@ export class YamuxMuxer implements StreamMuxer {
this.log?.('muxer created')

if (this.config.enableKeepAlive) {
void this.keepAliveLoop().catch(e => this.log?.error('keepalive error: %s', e))
this.keepAliveLoop().catch(e => this.log?.error('keepalive error: %s', e))
}
}

Expand Down Expand Up @@ -328,7 +325,7 @@ export class YamuxMuxer implements StreamMuxer {
},
log: this.log,
config: this.config,
getRTT: () => this.rtt
getRTT: this.getRTT.bind(this)
})

return stream
Expand Down Expand Up @@ -359,7 +356,7 @@ export class YamuxMuxer implements StreamMuxer {
timeoutId = setTimeout(resolve, this.config.keepAliveInterval)
})
])
void this.ping().catch(e => this.log?.error('ping error: %s', e))
this.ping().catch(e => this.log?.error('ping error: %s', e))
} catch (e) {
// closed
clearInterval(timeoutId)
Expand Down
46 changes: 46 additions & 0 deletions test/bench/codec.bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { itBench } from '@dapplion/benchmark'
import { Flag, FrameHeader, FrameType } from '../../src/frame.js'
import { encodeFrame } from '../../src/encode.js'
import { decodeHeader } from '../../src/decode.js'
import { decodeHeaderNaive, encodeFrameNaive } from '../codec.util.js'

describe('codec benchmark', () => {
for (const { encode, name } of [
{ encode: encodeFrame, name: 'encodeFrame' },
{ encode: encodeFrameNaive, name: 'encodeFrameNaive' }
]) {
itBench<FrameHeader, undefined>({
id: `encode a frame header - ${name}`,
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
beforeEach: () => {
return {
type: FrameType.WindowUpdate,
flag: Flag.ACK,
streamID: Math.round(Math.random() * 2 ** 32),
length: Math.round(Math.random() * 2 ** 32)
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
}
},
fn: (header) => {
encode(header)
}
})
}

for (const { decode, name } of [
{ decode: decodeHeader, name: 'decodeHeader' },
{ decode: decodeHeaderNaive, name: 'decodeHeaderNaive' }
]) {
itBench<Uint8Array, undefined>({
id: `decode a frame header - ${name}`,
beforeEach: () => {
const header = new Uint8Array(12)
for (let i = 1; i < 12; i++) {
header[i] = Math.round(Math.random() * 255)
}
return header
},
fn: (header) => {
decode(header)
}
})
}
})
36 changes: 36 additions & 0 deletions test/bench/comparison.bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { itBench } from '@dapplion/benchmark'
import drain from 'it-drain'
import { pipe } from 'it-pipe'
import { testClientServer as testYamuxClientServer } from '../util.js'
import { testClientServer as testMplexClientServer } from '../mplex.util.js'

describe('comparison benchmark', () => {
for (const { impl, name } of [
{ impl: testYamuxClientServer, name: 'yamux' },
{ impl: testMplexClientServer, name: 'mplex' }
]) {
for (const { numMessages, msgSize } of [
{ numMessages: 1, msgSize: 2 ** 6 },
{ numMessages: 1, msgSize: 2 ** 10 },
{ numMessages: 1, msgSize: 2 ** 16 },
{ numMessages: 1, msgSize: 2 ** 20 },
{ numMessages: 1000, msgSize: 2 ** 6 },
{ numMessages: 1000, msgSize: 2 ** 10 },
{ numMessages: 1000, msgSize: 2 ** 16 },
{ numMessages: 1000, msgSize: 2 ** 20 }
]) {
itBench<ReturnType<typeof impl>, undefined>({
id: `${name} send and receive ${numMessages} ${msgSize / 1024}KB chunks`,
beforeEach: () => impl({
onIncomingStream: (stream) => {
void pipe(stream, drain).then(() => stream.close())
}
}),
fn: async ({ client, server }) => {
const stream = client.newStream()
await pipe(Array.from({ length: numMessages }, () => new Uint8Array(msgSize)), stream, drain)
}
})
}
}
})
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
31 changes: 31 additions & 0 deletions test/codec.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { expect } from 'aegir/chai'

import { decodeHeader } from '../src/decode.js'
import { encodeFrame } from '../src/encode.js'
import { Flag, FrameHeader, FrameType, GoAwayCode, stringifyHeader } from '../src/frame.js'
import { decodeHeaderNaive, encodeFrameNaive } from './codec.util.js'

const frames: Array<{header: FrameHeader, data?: Uint8Array}> = [
{ header: { type: FrameType.Ping, flag: Flag.SYN, streamID: 0, length: 1 } },
{ header: { type: FrameType.WindowUpdate, flag: Flag.SYN, streamID: 1, length: 1 } },
{ header: { type: FrameType.GoAway, flag: 0, streamID: 0, length: GoAwayCode.NormalTermination } },
{ header: { type: FrameType.Ping, flag: Flag.ACK, streamID: 0, length: 100 } },
{ header: { type: FrameType.WindowUpdate, flag: 0, streamID: 99, length: 1000 } },
{ header: { type: FrameType.WindowUpdate, flag: 0, streamID: 0xffffffff, length: 0xffffffff } },
{ header: { type: FrameType.GoAway, flag: 0, streamID: 0, length: GoAwayCode.ProtocolError } }
]

describe('codec', () => {
for (const { header } of frames) {
it(`should round trip encode/decode header ${stringifyHeader(header)}`, () => {
expect(decodeHeader(encodeFrame(header))).to.deep.equal(header)
})
}

for (const { header } of frames) {
it(`should match naive implementations of encode/decode for header ${stringifyHeader(header)}`, () => {
expect(encodeFrame(header)).to.deep.equal(encodeFrameNaive(header))
expect(decodeHeader(encodeFrame(header))).to.deep.equal(decodeHeaderNaive(encodeFrameNaive(header)))
})
}
})
44 changes: 44 additions & 0 deletions test/codec.util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import errcode from 'err-code'
import { ERR_DECODE_INVALID_VERSION, ERR_INVALID_FRAME } from '../src/constants.js'
import { FrameHeader, FrameType, HEADER_LENGTH, YAMUX_VERSION } from '../src/frame.js'

// Slower encode / decode functions that use dataview

export function decodeHeaderNaive (data: Uint8Array): FrameHeader {
const view = new DataView(data.buffer, data.byteOffset, data.byteLength)

if (view.getUint8(0) !== YAMUX_VERSION) {
throw errcode(new Error('Invalid frame version'), ERR_DECODE_INVALID_VERSION)
}
return {
type: view.getUint8(1),
flag: view.getUint16(2, false),
streamID: view.getUint32(4, false),
length: view.getUint32(8, false)
}
}

export function encodeFrameNaive (header: FrameHeader, data?: Uint8Array): Uint8Array {
let frame
if (header.type === FrameType.Data) {
if (data == null) {
throw errcode(new Error('Invalid frame'), ERR_INVALID_FRAME, { header, data })
}
frame = new Uint8Array(HEADER_LENGTH + header.length)
frame.set(data, HEADER_LENGTH)
} else {
frame = new Uint8Array(HEADER_LENGTH)
}

const frameView = new DataView(frame.buffer, frame.byteOffset, frame.byteLength)

// always assume version 0
// frameView.setUint8(0, header.version)

frameView.setUint8(1, header.type)
frameView.setUint16(2, header.flag, false)
frameView.setUint32(4, header.streamID, false)
frameView.setUint32(8, header.length, false)

return frame
}
Loading