diff --git a/lib/storage/src/data-chunk/readable-helper.ts b/lib/storage/src/data-chunk/readable-helper.ts index d1af8c772d09..a8266d254bd3 100644 --- a/lib/storage/src/data-chunk/readable-helper.ts +++ b/lib/storage/src/data-chunk/readable-helper.ts @@ -4,27 +4,25 @@ import { Readable } from "stream"; import { DEFAULT } from "../upload/defaults"; import { DataPart } from "./yield-chunk"; -interface StreamChunk { - Body: Buffer; - ended: boolean; -} - export async function* chunkFromReadable(reader: Readable, chunkSize: number): AsyncGenerator { let partNumber = DEFAULT.MIN_PART_NUMBER; let oldBuffer = Buffer.from(""); while (partNumber < DEFAULT.MAX_PART_NUMBER) { - reader.resume(); - const result = await _chunkFromStream(reader, chunkSize, oldBuffer); - reader.pause(); + let currentBuffer = oldBuffer; + if (reader.readable) { + reader.resume(); + currentBuffer = await _chunkFromStream(reader, chunkSize, oldBuffer); + reader.pause(); + } yield { - Body: result.Body.slice(0, chunkSize), + Body: currentBuffer.slice(0, chunkSize), PartNumber: partNumber, }; - oldBuffer = result.Body.slice(chunkSize) as Buffer; + oldBuffer = currentBuffer.slice(chunkSize) as Buffer; partNumber += 1; - if (result.ended && oldBuffer.length == 0) { + if (!reader.readable && oldBuffer.length == 0) { return; } } @@ -33,7 +31,11 @@ export async function* chunkFromReadable(reader: Readable, chunkSize: number): A } } -function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer): Promise { +function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer): Promise { + if (!stream.readable) { + return Promise.resolve(oldBuffer); + } + let currentChunk = oldBuffer; return new Promise((resolve, reject) => { const cleanupListeners = () => { @@ -44,12 +46,9 @@ function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer stream.on("data", (chunk) => { currentChunk = Buffer.concat([currentChunk, Buffer.from(chunk)]); - if (currentChunk.length >= chunkSize) { + if (currentChunk.length >= chunkSize || !stream.readable) { cleanupListeners(); - resolve({ - Body: currentChunk, - ended: false, - }); + resolve(currentChunk); } }); stream.on("error", (err) => { @@ -58,10 +57,7 @@ function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer }); stream.on("end", () => { cleanupListeners(); - resolve({ - Body: currentChunk, - ended: true, - }); + resolve(currentChunk); }); }); }