Skip to content

Commit

Permalink
[core] fix isStreamComplete issue (#31270)
Browse files Browse the repository at this point in the history
Extract @MaryGao's fix from PR
#31027

Fix an issue in `isStreamComplete` where the method never resolves when
the stream is not readable.

---------

Co-authored-by: Mary Gao <yanmeigao1210@gmail.com>
  • Loading branch information
jeremymeng and MaryGao authored Oct 8, 2024
1 parent c7378ee commit 5abc72c
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 2 deletions.
2 changes: 2 additions & 0 deletions sdk/core/core-rest-pipeline/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fix an issue in `isStreamComplete` where the method never resolves if the stream is not readable.

### Other Changes

## 1.17.0 (2024-09-12)
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/core-rest-pipeline/src/nodeHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ function isReadableStream(body: any): body is NodeJS.ReadableStream {
}

function isStreamComplete(stream: NodeJS.ReadableStream): Promise<void> {
if (stream.readable === false) {
return Promise.resolve();
}

return new Promise((resolve) => {
const handler = (): void => {
resolve();
Expand Down Expand Up @@ -184,7 +188,6 @@ class NodeHttpClient implements HttpClient {
if (isReadableStream(responseStream)) {
downloadStreamDone = isStreamComplete(responseStream);
}

Promise.all([uploadStreamDone, downloadStreamDone])
.then(() => {
// eslint-disable-next-line promise/always-return
Expand Down
47 changes: 47 additions & 0 deletions sdk/core/core-rest-pipeline/test/node/nodeHttpClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import { assert, describe, it, vi, beforeEach, afterEach } from "vitest";
import { PassThrough, Writable } from "stream";
import type { ClientRequest, IncomingHttpHeaders, IncomingMessage } from "http";
import { AbortSignalLike } from "@azure/abort-controller";
import { delay } from "@azure/core-util";
import { createDefaultHttpClient, createPipelineRequest } from "../../src/index.js";

vi.mock("https", async () => {
Expand Down Expand Up @@ -453,4 +455,49 @@ describe("NodeHttpClient", function () {
assert.strictEqual(e.name, "AbortError");
}
});

it("should release abort listener when stream body ends already", async function () {
vi.useRealTimers();
const client = createDefaultHttpClient();
const writable = new Writable({
write: (_chunk, _, next) => {
next();
},
}) as unknown as ClientRequest;
vi.mocked(https.request).mockReturnValueOnce(writable);

const controller = new AbortController();
let listenerRemoved = false;
const abortSignal: AbortSignalLike = {
aborted: false,
addEventListener: function (
_type: "abort",
listener: (this: AbortSignalLike, ev: any) => any,
options?: any,
): void {
controller.signal.addEventListener("abort", listener, options);
},
removeEventListener: function (
_type: "abort",
listener: (this: AbortSignalLike, ev: any) => any,
options?: any,
): void {
listenerRemoved = true;
controller.signal.removeEventListener("abort", listener, options);
},
};

const stream = new PassThrough();
stream.end();
const body = stream;
const request = createPipelineRequest({
url: "https://example.com",
body,
abortSignal,
});
const promise = client.sendRequest(request);
yieldHttpsResponse(createResponse(200));
await Promise.all([promise, delay(10)]);
assert.equal(listenerRemoved, true);
});
});
4 changes: 4 additions & 0 deletions sdk/core/ts-http-runtime/src/nodeHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ function isReadableStream(body: any): body is NodeJS.ReadableStream {
}

function isStreamComplete(stream: NodeJS.ReadableStream): Promise<void> {
if (stream.readable === false) {
return Promise.resolve();
}

return new Promise((resolve) => {
stream.on("close", resolve);
stream.on("end", resolve);
Expand Down
48 changes: 47 additions & 1 deletion sdk/core/ts-http-runtime/test/node/nodeHttpClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import { describe, it, assert, vi, beforeEach, afterEach } from "vitest";
import { PassThrough, Writable } from "node:stream";
import { ClientRequest, IncomingHttpHeaders, IncomingMessage } from "http";
import { createDefaultHttpClient, createPipelineRequest } from "../../src/index.js";
import { AbortSignalLike } from "../../src/abort-controller/AbortSignalLike.js";
import { createDefaultHttpClient, createPipelineRequest, delay } from "../../src/index.js";

vi.mock("https", async () => {
const actual = await vi.importActual("https");
Expand Down Expand Up @@ -453,4 +454,49 @@ describe("NodeHttpClient", function () {
assert.strictEqual(e.name, "AbortError");
}
});

it("should release abort listener when stream body ends already", async function () {
vi.useRealTimers();
const client = createDefaultHttpClient();
const writable = new Writable({
write: (_chunk, _, next) => {
next();
},
}) as unknown as ClientRequest;
vi.mocked(https.request).mockReturnValueOnce(writable);

const controller = new AbortController();
let listenerRemoved = false;
const abortSignal: AbortSignalLike = {
aborted: false,
addEventListener: function (
_type: "abort",
listener: (this: AbortSignalLike, ev: any) => any,
options?: any,
): void {
controller.signal.addEventListener("abort", listener, options);
},
removeEventListener: function (
_type: "abort",
listener: (this: AbortSignalLike, ev: any) => any,
options?: any,
): void {
listenerRemoved = true;
controller.signal.removeEventListener("abort", listener, options);
},
};

const stream = new PassThrough();
stream.end();
const body = stream;
const request = createPipelineRequest({
url: "https://example.com",
body,
abortSignal,
});
const promise = client.sendRequest(request);
yieldHttpsResponse(createResponse(200));
await Promise.all([promise, delay(10)]);
assert.equal(listenerRemoved, true);
});
});

0 comments on commit 5abc72c

Please sign in to comment.