Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

deps(dev): Upgrade aegir to 38.1.7 #257

Merged
merged 3 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@
"devDependencies": {
"@libp2p/interface-stream-muxer-compliance-tests": "^6.0.0",
"@types/varint": "^6.0.0",
"aegir": "^37.2.0",
"aegir": "^38.1.7",
"cborg": "^1.8.1",
"delay": "^5.0.0",
"eslint-plugin-etc": "^2.0.2",
"iso-random-stream": "^2.0.2",
"it-all": "^2.0.0",
"it-drain": "^2.0.0",
Expand All @@ -176,8 +177,7 @@
"it-pipe": "^2.0.3",
"it-to-buffer": "^3.0.0",
"p-defer": "^4.0.0",
"random-int": "^3.0.0",
"typescript": "^5.0.2"
"random-int": "^3.0.0"
},
"browser": {
"./dist/src/alloc-unsafe.js": "./dist/src/alloc-unsafe-browser.js"
Expand Down
2 changes: 1 addition & 1 deletion src/alloc-unsafe-browser.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export function allocUnsafe (size: number) {
export function allocUnsafe (size: number): Uint8Array {
return new Uint8Array(size)
}
2 changes: 1 addition & 1 deletion src/alloc-unsafe.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export function allocUnsafe (size: number) {
export function allocUnsafe (size: number): Buffer {
return Buffer.allocUnsafe(size)
}
9 changes: 7 additions & 2 deletions src/decode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class Decoder {
this._maxUnprocessedMessageQueueSize = maxUnprocessedMessageQueueSize
}

write (chunk: Uint8Array) {
write (chunk: Uint8Array): Message[] {
if (chunk == null || chunk.length === 0) {
return []
}
Expand Down Expand Up @@ -109,7 +109,12 @@ export class Decoder {
const MSB = 0x80
const REST = 0x7F

function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
export interface ReadVarIntResult {
value: number
offset: number
}

function readVarInt (buf: Uint8ArrayList, offset: number = 0): ReadVarIntResult {
let res = 0
let shift = 0
let counter = offset
Expand Down
2 changes: 1 addition & 1 deletion src/encode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const encoder = new Encoder()
/**
* Encode and yield one or more messages
*/
export async function * encode (source: Source<Message[]>, minSendBytes: number = 0) {
export async function * encode (source: Source<Message[]>, minSendBytes: number = 0): AsyncGenerator<Uint8Array, void, undefined> {
if (minSendBytes == null || minSendBytes === 0) {
// just send the messages
for await (const messages of source) {
Expand Down
24 changes: 12 additions & 12 deletions src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const MAX_STREAMS_OUTBOUND_STREAMS_PER_CONNECTION = 1024
const MAX_STREAM_BUFFER_SIZE = 1024 * 1024 * 4 // 4MB
const DISCONNECT_THRESHOLD = 5

function printMessage (msg: Message) {
function printMessage (msg: Message): any {
const output: any = {
...msg,
type: `${MessageTypeNames[msg.type]} (${msg.type})`
Expand Down Expand Up @@ -101,7 +101,7 @@ export class MplexStreamMuxer implements StreamMuxer {
/**
* Returns a Map of streams and their ids
*/
get streams () {
get streams (): Stream[] {
// Inbound and Outbound streams may have the same ids, so we need to make those unique
const streams: Stream[] = []
for (const stream of this._streams.initiators.values()) {
Expand Down Expand Up @@ -135,23 +135,23 @@ export class MplexStreamMuxer implements StreamMuxer {
if (this.closeController.signal.aborted) return

if (err != null) {
this.streams.forEach(s => s.abort(err))
this.streams.forEach(s => { s.abort(err) })
} else {
this.streams.forEach(s => s.close())
this.streams.forEach(s => { s.close() })
}
this.closeController.abort()
}

/**
* Called whenever an inbound stream is created
*/
_newReceiverStream (options: { id: number, name: string }) {
_newReceiverStream (options: { id: number, name: string }): MplexStream {
const { id, name } = options
const registry = this._streams.receivers
return this._newStream({ id, name, type: 'receiver', registry })
}

_newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) {
_newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }): MplexStream {
const { id, name, type, registry } = options

log('new %s stream %s', type, id)
Expand All @@ -164,15 +164,15 @@ export class MplexStreamMuxer implements StreamMuxer {
throw new Error(`${type} stream ${id} already exists!`)
}

const send = (msg: Message) => {
const send = (msg: Message): void => {
if (log.enabled) {
log.trace('%s stream %s send', type, id, printMessage(msg))
}

this._source.push(msg)
}

const onEnd = () => {
const onEnd = (): void => {
log('%s stream with id %s and protocol %s ended', type, id, stream.stat.protocol)
registry.delete(id)

Expand All @@ -190,7 +190,7 @@ export class MplexStreamMuxer implements StreamMuxer {
* Creates a sink with an abortable source. Incoming messages will
* also have their size restricted. All messages will be varint decoded.
*/
_createSink () {
_createSink (): Sink<Uint8Array> {
const sink: Sink<Uint8Array> = async source => {
// see: https://github.com/jacobheun/any-signal/pull/18
const abortSignals = [this.closeController.signal]
Expand Down Expand Up @@ -222,8 +222,8 @@ export class MplexStreamMuxer implements StreamMuxer {
* Creates a source that restricts outgoing message sizes
* and varint encodes them
*/
_createSource () {
const onEnd = (err?: Error) => {
_createSource (): any {
const onEnd = (err?: Error): void => {
this.close(err)
}
const source = pushableV<Message>({
Expand All @@ -238,7 +238,7 @@ export class MplexStreamMuxer implements StreamMuxer {
})
}

async _handleIncoming (message: Message) {
async _handleIncoming (message: Message): Promise<void> {
const { id, type } = message

if (log.enabled) {
Expand Down
4 changes: 2 additions & 2 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export function createStream (options: Options): MplexStream {
open: Date.now()
}

const onSourceEnd = (err?: Error) => {
const onSourceEnd = (err?: Error): void => {
if (sourceEnded) {
return
}
Expand All @@ -68,7 +68,7 @@ export function createStream (options: Options): MplexStream {
}
}

const onSinkEnd = (err?: Error) => {
const onSinkEnd = (err?: Error): void => {
if (sinkEnded) {
return
}
Expand Down
10 changes: 8 additions & 2 deletions test/fixtures/utils.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { Message, MessageTypes } from '../../src/message-types.js'

export function messageWithBytes (msg: Message) {
export type MessageWithBytes = {
[k in keyof Message]: Message[k]
} & {
data: Uint8Array
}

export function messageWithBytes (msg: Message): Message | MessageWithBytes {
if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
return {
...msg,
data: msg.data.slice() // convert Uint8ArrayList to Buffer
data: msg.data.slice() // convert Uint8ArrayList to Uint8Array
}
}

Expand Down
6 changes: 3 additions & 3 deletions test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ describe('restrict size', () => {
(source) => each(source, chunk => {
output.push(chunk)
}),
async (source) => await drain(source)
async (source) => { await drain(source) }
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_TOO_BIG')
Expand Down Expand Up @@ -90,7 +90,7 @@ describe('restrict size', () => {
(source) => each(source, chunk => {
output.push(chunk)
}),
async (source) => await drain(source)
async (source) => { await drain(source) }
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG')
Expand All @@ -113,7 +113,7 @@ describe('restrict size', () => {
(source) => each(source, chunk => {
output.push(chunk)
}),
async (source) => await drain(source)
async (source) => { await drain(source) }
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG')
Expand Down
Loading