Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

feat: muxed stream close read and write #90

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ class Connection {

this.stat.status = CLOSING

await Promise.all(this.streams.map(s => s.close && s.close()))

// Close raw connection
this._closing = await this._close()

Expand Down
4 changes: 1 addition & 3 deletions src/connection/tests/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

'use strict'

const chai = require('chai')
const expect = chai.expect
chai.use(require('dirty-chai'))
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const Status = require('../status')

Expand Down
117 changes: 101 additions & 16 deletions src/stream-muxer/tests/close-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
/* eslint max-nested-callbacks: ["error", 8] */
'use strict'

const { expect } = require('aegir/utils/chai')
const pair = require('it-pair/duplex')
const { pipe } = require('it-pipe')
const pDefer = require('p-defer')
const { consume } = require('streaming-iterables')
const Tcp = require('libp2p-tcp')
const { Multiaddr } = require('multiaddr')
const { source: abortable } = require('abortable-iterator')
const AbortController = require('abort-controller').default
const uint8arrayFromString = require('uint8arrays/from-string')

const mh = new Multiaddr('/ip4/127.0.0.1/tcp/0')

function pause (ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
Expand All @@ -39,33 +37,31 @@ module.exports = (common) => {
Muxer = await common.setup()
})

it('closing underlying socket closes streams (tcp)', async () => {
it('closing underlying socket closes streams', async () => {
const mockConn = muxer => ({
newStream: (...args) => muxer.newStream(...args)
})

const mockUpgrade = () => maConn => {
const mockUpgrade = maConn => {
const muxer = new Muxer(stream => pipe(stream, stream))
pipe(maConn, muxer, maConn)
return mockConn(muxer)
}

const mockUpgrader = () => ({
upgradeInbound: mockUpgrade(),
upgradeOutbound: mockUpgrade()
const [local, remote] = pair()
const controller = new AbortController()
const abortableRemote = abortable.duplex(remote, controller.signal, {
returnOnAbort: true
})

const tcp = new Tcp({ upgrader: mockUpgrader() })
const tcpListener = tcp.createListener()

await tcpListener.listen(mh)
const dialerConn = await tcp.dial(tcpListener.getAddrs()[0])
mockUpgrade(abortableRemote)
const dialerConn = mockUpgrade(local)

const s1 = await dialerConn.newStream()
const s2 = await dialerConn.newStream()

// close the listener in a bit
setTimeout(() => tcpListener.close(), 50)
// close the remote in a bit
setTimeout(() => controller.abort(), 50)

const s1Result = pipe(infiniteRandom, s1, consume)
const s2Result = pipe(infiniteRandom, s2, consume)
Expand Down Expand Up @@ -116,5 +112,94 @@ module.exports = (common) => {
// These should now all resolve without error
await Promise.all(streamResults)
})

it('can close a stream for writing', (done) => {
const p = pair()
const dialer = new Muxer()
const data = [randomBuffer(), randomBuffer()]

const listener = new Muxer(async stream => {
// Immediate close for write
await stream.closeWrite()

const results = await pipe(stream, async (source) => {
const data = []
for await (const chunk of source) {
data.push(chunk.slice())
}
return data
})
expect(results).to.eql(data)

try {
await stream.sink([randomBuffer()])
} catch (err) {
expect(err).to.exist()
return done()
}
expect.fail('should not support writing to closed writer')
})

pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])

const stream = dialer.newStream()
stream.sink(data)
})

it('can close a stream for reading', (done) => {
const p = pair()
const dialer = new Muxer()
const data = [randomBuffer(), randomBuffer()]

const listener = new Muxer(async stream => {
const results = await pipe(stream, async (source) => {
const data = []
for await (const chunk of source) {
data.push(chunk.slice())
}
return data
})
expect(results).to.eql(data)
done()
})

pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])

const stream = dialer.newStream()
stream.closeRead()

// Source should be done
; (async () => {
expect(await stream.source.next()).to.eql({ done: true })
stream.sink(data)
})()
})

it('calls onStreamEnd for closed streams not previously written', async () => {
const deferred = pDefer()

const onStreamEnd = () => deferred.resolve()
const dialer = new Muxer({ onStreamEnd })

const stream = await dialer.newStream()

stream.close()
await deferred.promise
})

it('calls onStreamEnd for read and write closed streams not previously written', async () => {
const deferred = pDefer()

const onStreamEnd = () => deferred.resolve()
const dialer = new Muxer({ onStreamEnd })

const stream = await dialer.newStream()

stream.closeWrite()
stream.closeRead()
await deferred.promise
})
})
}
4 changes: 3 additions & 1 deletion src/stream-muxer/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ export type MuxedTimeline = {
}

export interface MuxedStream extends AsyncIterable<Uint8Array | BufferList> {
close: () => void;
close: () => Promise<void>;
closeRead: () => Promise<void>;
closeWrite: () => Promise<void>;
abort: () => void;
reset: () => void;
sink: Sink;
Expand Down