Skip to content

Commit

Permalink
fix: abort slow sending streams (#2395)
Browse files Browse the repository at this point in the history
Race the passed abort signal along with gracefully closing read/write
ends of a stream, so that the signal will abort a stream that is
very slow to send data.
  • Loading branch information
achingbrain authored Feb 7, 2024
1 parent d3ec80f commit 2370d1c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
11 changes: 5 additions & 6 deletions packages/utils/src/abstract-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,12 @@ export abstract class AbstractStream implements Stream {

this.status = 'closing'

await Promise.all([
this.closeRead(options),
this.closeWrite(options)
])

// wait for read and write ends to close
await raceSignal(this.closed.promise, options?.signal)
await raceSignal(Promise.all([
this.closeWrite(options),
this.closeRead(options),
this.closed.promise
]), options?.signal)

this.status = 'closed'

Expand Down
51 changes: 51 additions & 0 deletions packages/utils/test/abstract-stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { expect } from 'aegir/chai'
import delay from 'delay'
import all from 'it-all'
import drain from 'it-drain'
import pDefer from 'p-defer'
import Sinon from 'sinon'
import { Uint8ArrayList } from 'uint8arraylist'
import { AbstractStream } from '../src/abstract-stream.js'
Expand Down Expand Up @@ -196,4 +197,54 @@ describe('abstract stream', () => {
expect(sendCloseReadSpy.called).to.be.false()
expect(sendCloseWriteSpy.called).to.be.false()
})

it('should wait for sending data to finish when closing gracefully', async () => {
const sendStarted = pDefer()
let timeFinished: number = 0

// stub send method to simulate slow sending
stream.sendData = async () => {
sendStarted.resolve()
await delay(1000)
timeFinished = Date.now()
}
const data = [
Uint8Array.from([0, 1, 2, 3, 4])
]

void stream.sink(data)

// wait for send to start
await sendStarted.promise

// close stream
await stream.close()

// should have waited for send to complete
expect(Date.now()).to.be.greaterThanOrEqual(timeFinished)
})

it('should abort close due to timeout with slow sender', async () => {
const sendStarted = pDefer()

// stub send method to simulate slow sending
stream.sendData = async () => {
sendStarted.resolve()
await delay(1000)
}
const data = [
Uint8Array.from([0, 1, 2, 3, 4])
]

void stream.sink(data)

// wait for send to start
await sendStarted.promise

// close stream, should be aborted
await expect(stream.close({
signal: AbortSignal.timeout(1)
})).to.eventually.be.rejected
.with.property('code', 'ABORT_ERR')
})
})

0 comments on commit 2370d1c

Please sign in to comment.