From 506ae511467e75525016c7d23ff4dce155380fb5 Mon Sep 17 00:00:00 2001 From: menduz Date: Wed, 25 May 2022 20:03:11 -0300 Subject: [PATCH 1/4] strange case not closing stream --- .vscode/launch.json | 2 +- src/codegen.ts | 7 +- src/transports/Memory.ts | 12 +++- test/codegen-server.spec.ts | 124 +++++++++++++++++++++++++++++++++++- test/codegen/client.proto | 1 + test/helpers.ts | 22 +++++-- 6 files changed, 151 insertions(+), 17 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 22874e6..8284158 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -35,7 +35,7 @@ "${workspaceRoot}/node_modules/.bin/jest", "--runInBand", "--coverage", - // "test/ws.spec.ts" + "test/codegen-server.spec.ts" ], "console": "integratedTerminal", "internalConsoleOptions": "neverOpen", diff --git a/src/codegen.ts b/src/codegen.ts index 42be134..772d106 100644 --- a/src/codegen.ts +++ b/src/codegen.ts @@ -38,12 +38,9 @@ export function clientProcedureStream( const remoteModule: Record Promise> = (await port) as any if (!(method.name in remoteModule)) throw new Error("Method " + method.name + " not implemented in server port") - const result = await remoteModule[method.name](method.requestType.encode(arg).finish()) - if (result) { - for await (const bytes of await result) { - yield method.responseType.decode(bytes ?? EMPTY_U8ARRAY) - } + for await (const bytes of await remoteModule[method.name](method.requestType.encode(arg).finish())) { + yield method.responseType.decode(bytes ?? EMPTY_U8ARRAY) } } diff --git a/src/transports/Memory.ts b/src/transports/Memory.ts index 03d3193..b7388b7 100644 --- a/src/transports/Memory.ts +++ b/src/transports/Memory.ts @@ -1,16 +1,24 @@ import { Transport, TransportEvents } from "../types" import mitt, { Emitter } from "mitt" -export function MemoryTransport() { +export type MemoryTransportOptions = { + decouplingFunction?: (cb: () => void) => void +} + +export function MemoryTransport(options?: MemoryTransportOptions) { const clientEd = mitt() const serverEd = mitt() + const decouple = options?.decouplingFunction ?? ((cb) => cb()) + function configureMemoryTransport(receiver: Emitter, sender: Emitter): Transport { let isClosed = false return { ...sender, sendMessage(message) { - receiver.emit("message", message) + decouple(() => { + receiver.emit("message", message) + }) }, close() { if (!isClosed) { diff --git a/test/codegen-server.spec.ts b/test/codegen-server.spec.ts index 472b18b..9ff1b82 100644 --- a/test/codegen-server.spec.ts +++ b/test/codegen-server.spec.ts @@ -1,11 +1,14 @@ import { RpcServerPort } from "../src" import { AlmostEmpty, Book, BookServiceDefinition, Empty, GetBookRequest, QueryBooksRequest } from "./codegen/client" -import { createSimpleTestEnvironment, takeAsync } from "./helpers" +import { createSimpleTestEnvironment, delay, takeAsync } from "./helpers" import * as codegen from "../src/codegen" +import { from, lastValueFrom, take } from "rxjs" const FAIL_WITH_EXCEPTION_ISBN = 1 describe("codegen client & server", () => { + let infiniteGeneratorClosed = 0 + let infiniteGeneratorEmited = 0 const testEnv = createSimpleTestEnvironment(async function (port) { codegen.registerService(port, BookServiceDefinition, async () => ({ async getBook(req: GetBookRequest) { @@ -69,13 +72,28 @@ describe("codegen client & server", () => { yield { int: 1 } yield { int: 0 } }, + async *infiniteGenerator() { + try { + while (true) { + infiniteGeneratorEmited++ + if (infiniteGeneratorEmited == 15) await Promise.race([]) + yield { int: infiniteGeneratorEmited } + } + } finally { + infiniteGeneratorClosed++ + } + }, })) }) let service: codegen.RpcClientModule - it("basic service wraper creation", async () => { - const { rpcClient } = await testEnv.start() + beforeAll(async () => { + const { rpcClient } = await testEnv.start(null, { + decouplingFunction: (cb) => { + setTimeout(cb, Math.random() * 10) + }, + }) const clientPort = await rpcClient.createPort("test1") service = codegen.loadService(clientPort, BookServiceDefinition) @@ -152,4 +170,104 @@ describe("codegen client & server", () => { "RemoteError: fail_before_end" ) }) + + it("infinite stream take rxjs 10", async () => { + infiniteGeneratorClosed = 0 + infiniteGeneratorEmited = 0 + + const gen = from(service.infiniteGenerator({})).pipe(take(10)) + + await lastValueFrom(gen) + await delay(100) + + expect(infiniteGeneratorEmited).toEqual(10) + expect(infiniteGeneratorClosed).toEqual(1) + }) + + it("infinite stream take rxjs 15", async () => { + infiniteGeneratorClosed = 0 + infiniteGeneratorEmited = 0 + + const gen = from(service.infiniteGenerator({})).pipe(take(15)) + const values: any[] = [] + + const sub = gen.subscribe((value) => { + values.push(value) + }) + + await delay(1000) + sub.unsubscribe() + await delay(100) + + expect(values).toEqual([ + { int: 1 }, + { int: 2 }, + { int: 3 }, + { int: 4 }, + { int: 5 }, + { int: 6 }, + { int: 7 }, + { int: 8 }, + { int: 9 }, + { int: 10 }, + { int: 11 }, + { int: 12 }, + { int: 13 }, + { int: 14 }, + ]) + expect(infiniteGeneratorEmited).toEqual(15) + expect(infiniteGeneratorClosed).toEqual(1) + }) + + it("infinite stream take rxjs 0", async () => { + infiniteGeneratorClosed = 0 + infiniteGeneratorEmited = 0 + + const gen = from(service.infiniteGenerator({})).pipe(take(0)) + const values: any[] = [] + + const sub = gen.subscribe((value) => { + values.push(value) + }) + + await delay(1000) + sub.unsubscribe() + await delay(100) + + expect(values).toEqual([]) + + expect(infiniteGeneratorEmited).toEqual(0) + expect(infiniteGeneratorClosed).toEqual(1) + }) + + it("take async iterator", async () => { + infiniteGeneratorClosed = 0 + infiniteGeneratorEmited = 0 + + const values: any[] = [] + + const stream = service.infiniteGenerator({}) + + for await (const value of stream) { + values.push(value) + if (values.length == 10) break + } + + await delay(100) + + expect(infiniteGeneratorEmited).toEqual(10) + expect(infiniteGeneratorClosed).toEqual(1) + expect(values).toEqual([ + { int: 1 }, + { int: 2 }, + { int: 3 }, + { int: 4 }, + { int: 5 }, + { int: 6 }, + { int: 7 }, + { int: 8 }, + { int: 9 }, + { int: 10 }, + ]) + }) }) diff --git a/test/codegen/client.proto b/test/codegen/client.proto index 4e41bac..9012a4d 100644 --- a/test/codegen/client.proto +++ b/test/codegen/client.proto @@ -27,4 +27,5 @@ service BookService { rpc EmptyResponse(Book) returns (Empty) {} rpc EmptyResponseStream(Book) returns (stream Empty) {} rpc AlmostEmptyResponseStream(Book) returns (stream AlmostEmpty) {} + rpc InfiniteGenerator(Empty) returns (stream AlmostEmpty) {} } diff --git a/test/helpers.ts b/test/helpers.ts index 33e9113..c5e509c 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -1,7 +1,14 @@ -import { createRpcClient, createRpcServer, CreateRpcServerOptions, RpcClient, RpcServerHandler, Transport } from "../src" +import { + createRpcClient, + createRpcServer, + CreateRpcServerOptions, + RpcClient, + RpcServerHandler, + Transport, +} from "../src" import { log } from "./logger" import { inspect } from "util" -import { MemoryTransport } from "../src/transports/Memory" +import { MemoryTransport, MemoryTransportOptions } from "../src/transports/Memory" import { parseProtocolMessage } from "../src/protocol/helpers" import { Reader } from "protobufjs/minimal" @@ -53,9 +60,12 @@ export function instrumentMemoryTransports(memoryTransport: ReturnType(handler: RpcServerHandler, options: CreateRpcServerOptions = {}) { - async function start(context: Context) { - const memoryTransport = MemoryTransport() +export function createSimpleTestEnvironment( + handler: RpcServerHandler, + options: CreateRpcServerOptions = {} +) { + async function start(context: Context, transportOptions?: MemoryTransportOptions) { + const memoryTransport = MemoryTransport(transportOptions) instrumentMemoryTransports(memoryTransport) const rpcServer = createRpcServer(options) @@ -80,7 +90,7 @@ export function createSimpleTestEnvironment(handler: RpcServerHa } return { - start + start, } } From 8e8d6c748ddb5d768151b6e742e02e82fcfc1afd Mon Sep 17 00:00:00 2001 From: menduz Date: Wed, 1 Jun 2022 18:19:54 -0300 Subject: [PATCH 2/4] wip --- .gitignore | 2 +- .vscode/launch.json | 3 +- Makefile | 5 +- package-lock.json | 13 ++ package.json | 1 + perf.sh | 2 +- src/client.ts | 4 +- src/codegen.ts | 77 ++++++++++-- src/push-channel.ts | 113 +++++++++++++----- src/server.ts | 48 ++++---- test/benchmarks/bench.ts | 6 + test/benchmarks/tsconfig.json | 2 +- test/close-transport.spec.ts | 10 +- test/codegen-server.spec.ts | 176 ++++++++++++++++++++------- test/codegen/client.proto | 1 + test/helpers.ts | 11 +- test/push-channel.spec.ts | 216 +++++++++++++++++++++++++++++----- test/stream.spec.ts | 53 +++------ 18 files changed, 553 insertions(+), 190 deletions(-) diff --git a/.gitignore b/.gitignore index 1354cf3..b203d41 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,4 @@ test/codegen/*_pb.* src/protocol/index.ts test/codegen/client.ts -test/benchmarks/compilated \ No newline at end of file +test/benchmarks/compiled \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 8284158..daedc09 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -35,7 +35,8 @@ "${workspaceRoot}/node_modules/.bin/jest", "--runInBand", "--coverage", - "test/codegen-server.spec.ts" + "-t", + "manualHackWithPushableChannel" ], "console": "integratedTerminal", "internalConsoleOptions": "neverOpen", diff --git a/Makefile b/Makefile index 51131a6..739d8ff 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,8 @@ test: --ts_proto_out="$(PWD)/test/codegen" \ -I="$(PWD)/test/codegen" \ "$(PWD)/test/codegen/client.proto" - node_modules/.bin/jest --detectOpenHandles --colors --runInBand $(TESTARGS) --coverage $(TEST_FILE) + SIMMULATE_JITTER=false node_modules/.bin/jest --detectOpenHandles --colors --runInBand $(TESTARGS) $(TEST_FILE) + SIMMULATE_JITTER=true node_modules/.bin/jest --detectOpenHandles --colors --runInBand $(TESTARGS) --coverage $(TEST_FILE) $(MAKE) integration-example test-watch: @@ -66,7 +67,7 @@ cheap-perf: inspect: node_modules/.bin/tsc -p test/benchmarks/tsconfig.json - node --inspect-brk test/benchmarks/compilated/test/benchmarks/bench.js + node --inspect-brk test/benchmarks/compiled/test/benchmarks/bench.js integration-example: @cd example; ./build.sh diff --git a/package-lock.json b/package-lock.json index f3d0c99..0bf97cf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,6 +18,7 @@ "@types/jest": "^27.0.1", "@types/ws": "^8.5.3", "benchmark": "^2.1.4", + "fp-future": "^1.0.1", "jest": "^27.0.6", "rxjs": "^7.5.5", "ts-jest": "^27.0.5", @@ -2167,6 +2168,12 @@ "node": ">= 6" } }, + "node_modules/fp-future": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/fp-future/-/fp-future-1.0.1.tgz", + "integrity": "sha512-2McmZH/KsZqlqHju9+Ox0FC7q7Knve4t6ZeKubbhAz1xpnD7hkCrP8TP5g5QbbD5bA5jBANbXf/ew4x1FjSUrw==", + "dev": true + }, "node_modules/fs-extra": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-7.0.1.tgz", @@ -6481,6 +6488,12 @@ "mime-types": "^2.1.12" } }, + "fp-future": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/fp-future/-/fp-future-1.0.1.tgz", + "integrity": "sha512-2McmZH/KsZqlqHju9+Ox0FC7q7Knve4t6ZeKubbhAz1xpnD7hkCrP8TP5g5QbbD5bA5jBANbXf/ew4x1FjSUrw==", + "dev": true + }, "fs-extra": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-7.0.1.tgz", diff --git a/package.json b/package.json index 7665640..d9d26aa 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "@types/jest": "^27.0.1", "@types/ws": "^8.5.3", "benchmark": "^2.1.4", + "fp-future": "^1.0.1", "jest": "^27.0.6", "rxjs": "^7.5.5", "ts-jest": "^27.0.5", diff --git a/perf.sh b/perf.sh index af59cc1..b3eb926 100755 --- a/perf.sh +++ b/perf.sh @@ -2,7 +2,7 @@ node_modules/.bin/tsc -p test/benchmarks/tsconfig.json rm ./*.log || true -time node --prof test/benchmarks/compilated/test/benchmarks/bench.js +time node --prof --trace-deopt test/benchmarks/compiled/test/benchmarks/bench.js EXIT_CODE=$? for f in *.log; do node --prof-process "$f"; done exit $EXIT_CODE \ No newline at end of file diff --git a/src/client.ts b/src/client.ts index d9fa55b..ddb13db 100644 --- a/src/client.ts +++ b/src/client.ts @@ -133,9 +133,9 @@ export function streamFromDispatcher( if (message.closed) { isRemoteClosed = true - channel.close() + channel.iterable.return(null).then(sendAck) } else { - channel.push(message.payload).then(sendAck).catch(channel.failAndClose) + channel.push(message.payload, sendAck) } } diff --git a/src/codegen.ts b/src/codegen.ts index 772d106..aaf2db3 100644 --- a/src/codegen.ts +++ b/src/codegen.ts @@ -34,14 +34,43 @@ export function clientProcedureStream( port: unknown | Promise, method: TsProtoMethodDefinition ): ServerStreamingClientMethod { - const fn = async function* (arg: Request): AsyncGenerator { - const remoteModule: Record Promise> = (await port) as any + const fn = function (arg: Request): AsyncGenerator { + let _generator: Promise> | undefined = undefined - if (!(method.name in remoteModule)) throw new Error("Method " + method.name + " not implemented in server port") + async function lazyGenerator() { + const remoteModule: Record Promise> = (await port) as any + if (!(method.name in remoteModule)) throw new Error("Method " + method.name + " not implemented in server port") + return (await remoteModule[method.name](method.requestType.encode(arg).finish()))[Symbol.asyncIterator]() + } + + function getGenerator() { + if (!_generator) { + _generator = lazyGenerator() + } + return _generator! + } + + function generateResult(): AsyncGenerator { + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => generateResult(), + async next() { + const iter = await (await getGenerator()).next() + return { value: method.responseType.decode(iter.value ?? EMPTY_U8ARRAY), done: iter.done } + }, + async return(value) { + const iter = await (await getGenerator()).return(value) + return { value: method.responseType.decode(iter.value ?? EMPTY_U8ARRAY), done: iter.done } + }, + async throw(value) { + const iter = await (await getGenerator()).throw(value) + return { value: method.responseType.decode(iter.value ?? EMPTY_U8ARRAY), done: iter.done } + } + } - for await (const bytes of await remoteModule[method.name](method.requestType.encode(arg).finish())) { - yield method.responseType.decode(bytes ?? EMPTY_U8ARRAY) + return ret } + + return generateResult() } return fn @@ -64,15 +93,43 @@ export function serverProcedureStream( fn: (arg: Request, context: Context) => Promise> | AsyncGenerator, method: TsProtoMethodDefinition ): (arg: Uint8Array, context: Context) => AsyncGenerator { - return async function* (argBinary, ctx) { + return function (argBinary, context): AsyncGenerator { + let _generator: Promise> | undefined = undefined + const arg = method.requestType.decode(argBinary) - const result = await fn(arg, ctx) - if (!result) throw new Error("Empty or null responses are not allowed. Procedure: " + method.name) + async function lazyGenerator() { + const result = (await fn(arg, context)) + + if (!result) throw new Error("Empty or null responses are not allowed. Procedure: " + method.name) + + return result[Symbol.asyncIterator]() + } + + function getGenerator() { + if (!_generator) { + _generator = lazyGenerator() + } + return _generator! + } - for await (const elem of result) { - yield method.responseType.encode(elem).finish() + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + async next() { + const iter = await (await getGenerator()).next() + return { value: iter.value ? method.responseType.encode(iter.value).finish() : iter.value, done: iter.done } + }, + async return(value) { + const iter = await (await getGenerator()).return(value) + return { value: iter.value ? method.responseType.encode(iter.value).finish() : iter.value, done: iter.done } + }, + async throw(value) { + const iter = await (await getGenerator()).throw(value) + return { value: iter.value ? method.responseType.encode(iter.value).finish() : iter.value, done: iter.done } + } } + + return ret } } diff --git a/src/push-channel.ts b/src/push-channel.ts index 247159d..a074b29 100644 --- a/src/push-channel.ts +++ b/src/push-channel.ts @@ -1,12 +1,71 @@ -export function pushableChannel(onIteratorClose: () => void) { - type LastResolver = () => void - type ListItem = { value: T; resolve: LastResolver; prev?: ListItem } +type Node = { value: T; resolve?: LastResolver; prev?: Node, next?: Node } +type LastResolver = (err?: any) => void + +export function linkedList() { + let head: Node | undefined = undefined + let tail: Node | undefined = undefined + + function push(value: T, resolve?: LastResolver) { + const node: Node = { + value, + resolve, + } + node.prev = tail; + if (tail) { + tail.next = node; + } + if (!head) { + head = node; + } + tail = node; + } + + function remove(node: Node): void { + if (!node.next) { + tail = node.prev; + } else { + const nextNode = node.next; + nextNode.prev = node.prev; + } + if (!node.prev) { + head = node.next; + } else { + const prevNode = node.prev; + prevNode.next = node.next; + } + } + + // removes the head node and updates the head + function unshift(): Node | undefined { + const ret = head + if (ret) remove(ret) + return ret + } + + // signals if the list is empty + function isEmpty(): boolean { + return !head + } + return { push, unshift, isEmpty } +} + + +export function pushableChannel(onIteratorClose: () => void) { let returnLock: (() => void) | null = null - const queue: ListItem[] = [] + const queue = linkedList() let closed = false let error: Error | null = null - let lastResolver: LastResolver | null = null + + function closeAllPending() { + if (!queue.isEmpty()) { + const err = error || new Error("Channel was closed before deliverying the message") + while (!queue.isEmpty()) { + const { resolve } = queue.unshift()! + if (resolve) resolve(err); + } + } + } function releaseLockIfNeeded() { // signal that we have a value @@ -17,24 +76,18 @@ export function pushableChannel(onIteratorClose: () => void) { } } - async function push(value: T) { - if (closed) throw new Error("Channel is closed") + function push(value: T, callback: (err?: any) => void) { + if (closed) { + callback(new Error("Channel is closed")) + return + } if (error) { - throw error + callback(error) + return } // push the value to the queue - return new Promise((resolve) => { - queue.push({ value, resolve }) - releaseLockIfNeeded() - }) - } - - // resolves the promise returned by push(T) - function markConsumed() { - if (lastResolver) { - lastResolver() - lastResolver = null - } + queue.push(value, callback) + releaseLockIfNeeded() } function failAndClose(errorToThrow: Error) { @@ -43,29 +96,24 @@ export function pushableChannel(onIteratorClose: () => void) { } function yieldNextResult(): IteratorResult | void { - if (error && queue.length == 0) { + if (error && queue.isEmpty()) { throw error } - if (closed && queue.length == 0) { + if (closed && queue.isEmpty()) { return { done: true, value: undefined } } - if (queue.length) { - if (lastResolver) { - throw new Error("logic error, this should never happen") - } - - const { value, resolve } = queue.shift()! - lastResolver = resolve + if (!queue.isEmpty()) { + const node = queue.unshift()! + if (node.resolve) node.resolve() return { done: false, - value, + value: node.value, } } } function close() { if (!closed) { - markConsumed() closed = true releaseLockIfNeeded() onIteratorClose() @@ -76,7 +124,6 @@ export function pushableChannel(onIteratorClose: () => void) { async next() { while (true) { try { - markConsumed() const result = yieldNextResult() if (result) { return result @@ -92,6 +139,7 @@ export function pushableChannel(onIteratorClose: () => void) { }, async return(value) { close() + closeAllPending() return { done: true, value: undefined } }, async throw(e) { @@ -99,6 +147,7 @@ export function pushableChannel(onIteratorClose: () => void) { throw error } close() + closeAllPending() return { done: true, value: undefined } }, [Symbol.asyncIterator]() { diff --git a/src/server.ts b/src/server.ts index ef10a53..b4f1ac7 100644 --- a/src/server.ts +++ b/src/server.ts @@ -287,31 +287,33 @@ export async function handleRequest( payload: EMPTY_U8A, portId: request.portId, }) - - for await (const elem of iter) { - sequenceNumber++ - reusedStreamMessage.closed = false - reusedStreamMessage.ack = false - reusedStreamMessage.sequenceId = sequenceNumber - reusedStreamMessage.messageIdentifier = calculateMessageIdentifier( - RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE, - messageNumber - ) - reusedStreamMessage.payload = elem - reusedStreamMessage.portId = request.portId - - // sendWithAck may fail if the transport is closed, effectively - // ending this iterator. - const ret = await ackDispatcher.sendWithAck(reusedStreamMessage) - - if (ret.ack) { - continue - } else if (ret.closed) { - // if it was closed remotely, then we end the stream right away - return + try { + for await (const elem of iter) { + sequenceNumber++ + reusedStreamMessage.closed = false + reusedStreamMessage.ack = false + reusedStreamMessage.sequenceId = sequenceNumber + reusedStreamMessage.messageIdentifier = calculateMessageIdentifier( + RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE, + messageNumber + ) + reusedStreamMessage.payload = elem + reusedStreamMessage.portId = request.portId + + // sendWithAck may fail if the transport is closed, effectively + // ending this iterator. + const ret = await ackDispatcher.sendWithAck(reusedStreamMessage) + + if (ret.ack) { + continue + } else if (ret.closed) { + // if it was closed remotely, then we end the stream right away + return + } } + } finally { + transport.sendMessage(closeStreamMessage(messageNumber, sequenceNumber, request.portId)) } - transport.sendMessage(closeStreamMessage(messageNumber, sequenceNumber, request.portId)) } else { unsafeSyncWriter.reset() Response.encode(response, unsafeSyncWriter) diff --git a/test/benchmarks/bench.ts b/test/benchmarks/bench.ts index d4526ef..97fa4ab 100644 --- a/test/benchmarks/bench.ts +++ b/test/benchmarks/bench.ts @@ -32,6 +32,12 @@ async function test() { yield* books } }, + async *almostEmptyResponseStream() { throw new Error('not implemented') }, + async emptyQuery() { throw new Error('not implemented') }, + async emptyResponse() { throw new Error('not implemented') }, + async *emptyResponseStream() { throw new Error('not implemented') }, + async *infiniteGenerator() { throw new Error('not implemented') }, + async *failFirstGenerator() { throw new Error('not implemented') } })) }) diff --git a/test/benchmarks/tsconfig.json b/test/benchmarks/tsconfig.json index 11da6fc..0453797 100644 --- a/test/benchmarks/tsconfig.json +++ b/test/benchmarks/tsconfig.json @@ -9,7 +9,7 @@ "node", "jest" ], - "outDir": "./compilated", + "outDir": "./compiled", "esModuleInterop": true, /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */ "skipLibCheck": true, /* Skip type checking of declaration files. */ "forceConsistentCasingInFileNames": true, /* Disallow inconsistently-cased references to the same file. */ diff --git a/test/close-transport.spec.ts b/test/close-transport.spec.ts index f58a589..81191b0 100644 --- a/test/close-transport.spec.ts +++ b/test/close-transport.spec.ts @@ -1,7 +1,7 @@ import { RpcClient } from "../src" import { calculateMessageIdentifier } from "../src/protocol/helpers" import { RpcMessageHeader, RpcMessageTypes } from "../src/protocol" -import { createSimpleTestEnvironment } from "./helpers" +import { createSimpleTestEnvironment, delay } from "./helpers" async function testPort(rpcClient: RpcClient, portName: string) { const port = await rpcClient.createPort(portName) @@ -27,7 +27,7 @@ describe("Close transport closes streams (server side)", () => { })) }) - it("creates the server", async () => { + it("runs the test", async () => { const { rpcClient, transportServer } = await testEnv.start() const { infinite } = await testPort(rpcClient, "port1") @@ -142,7 +142,7 @@ describe("Error in transport closes the transport", () => { })) }) - it("creates the server", async () => { + it("runs the test", async () => { const { rpcClient, transportServer } = await testEnv.start() const { infinite } = await testPort(rpcClient, "port1") @@ -158,6 +158,10 @@ describe("Error in transport closes the transport", () => { } }).rejects.toThrow("RPC Transport closed") + // give it a second to finish, the memory transport adds some jitter to simulate + // async network conditions + await delay(10) + // the server AsyncGenerators must be closed after the transport is closed to avoid leaks expect(infiniteStreamClosed).toEqual(true) }) diff --git a/test/codegen-server.spec.ts b/test/codegen-server.spec.ts index 9ff1b82..741d42d 100644 --- a/test/codegen-server.spec.ts +++ b/test/codegen-server.spec.ts @@ -1,4 +1,4 @@ -import { RpcServerPort } from "../src" +import { future } from "fp-future" import { AlmostEmpty, Book, BookServiceDefinition, Empty, GetBookRequest, QueryBooksRequest } from "./codegen/client" import { createSimpleTestEnvironment, delay, takeAsync } from "./helpers" import * as codegen from "../src/codegen" @@ -9,6 +9,7 @@ const FAIL_WITH_EXCEPTION_ISBN = 1 describe("codegen client & server", () => { let infiniteGeneratorClosed = 0 let infiniteGeneratorEmited = 0 + let closeFuture = future() const testEnv = createSimpleTestEnvironment(async function (port) { codegen.registerService(port, BookServiceDefinition, async () => ({ async getBook(req: GetBookRequest) { @@ -72,16 +73,41 @@ describe("codegen client & server", () => { yield { int: 1 } yield { int: 0 } }, - async *infiniteGenerator() { - try { - while (true) { + infiniteGenerator() { + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + async next() { infiniteGeneratorEmited++ - if (infiniteGeneratorEmited == 15) await Promise.race([]) - yield { int: infiniteGeneratorEmited } - } - } finally { - infiniteGeneratorClosed++ + // hang in 4th iteration + if (infiniteGeneratorEmited == 4) await closeFuture + return { value: { int: infiniteGeneratorEmited } } + }, + async return() { + infiniteGeneratorClosed++ + return { done: true, value: null } + }, + async throw() { + throw new Error("throw should never be called in this scenario") + }, } + + return ret + }, + failFirstGenerator() { + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + next() { + throw new Error('Fails on first yield without returning a promise') + }, + async return() { + throw new Error("Fails on return") + }, + async throw() { + throw new Error("throw should never be called in this scenario") + }, + } + + return ret }, })) }) @@ -90,13 +116,23 @@ describe("codegen client & server", () => { beforeAll(async () => { const { rpcClient } = await testEnv.start(null, { - decouplingFunction: (cb) => { - setTimeout(cb, Math.random() * 10) - }, }) const clientPort = await rpcClient.createPort("test1") service = codegen.loadService(clientPort, BookServiceDefinition) + + }) + + beforeEach(async () => { + process.stderr.write('Cleaning up...\n') + + closeFuture.resolve() + closeFuture = future() + await delay(100) + + infiniteGeneratorClosed = 0 + infiniteGeneratorEmited = 0 + process.stderr.write('\n') }) it("calls an unary method", async () => { @@ -171,24 +207,65 @@ describe("codegen client & server", () => { ) }) - it("infinite stream take rxjs 10", async () => { + it("when streams are materialized, the first element should be sended to start the ACK chains", async () => { infiniteGeneratorClosed = 0 infiniteGeneratorEmited = 0 + closeFuture = future() - const gen = from(service.infiniteGenerator({})).pipe(take(10)) + const gen = service.infiniteGenerator({}) + + await gen.return(null) + // give it time to end and send async messages + await delay(100) + + expect(infiniteGeneratorEmited).toEqual(1) + expect(infiniteGeneratorClosed).toEqual(1) + }) + + it("infinite stream take vanilla 1", async () => { + infiniteGeneratorClosed = 0 + infiniteGeneratorEmited = 0 + closeFuture = future() + + const gen = service.infiniteGenerator({}) + + const values = [ + await (await gen.next()).value + ] + + await gen.return(null) + // give it time to end and send async messages + await delay(100) + + expect(values).toEqual([ + { int: 1 } + ]) + + expect(infiniteGeneratorEmited).toEqual(1) + expect(infiniteGeneratorClosed).toEqual(1) + }) + + + it("infinite stream take rxjs 2", async () => { + infiniteGeneratorClosed = 0 + infiniteGeneratorEmited = 0 + closeFuture = future() + + const gen = from(service.infiniteGenerator({})).pipe(take(2)) await lastValueFrom(gen) await delay(100) - expect(infiniteGeneratorEmited).toEqual(10) + expect(infiniteGeneratorEmited).toEqual(2) expect(infiniteGeneratorClosed).toEqual(1) }) - it("infinite stream take rxjs 15", async () => { + it("infinite stream take rxjs 3", async () => { infiniteGeneratorClosed = 0 infiniteGeneratorEmited = 0 + closeFuture = future() - const gen = from(service.infiniteGenerator({})).pipe(take(15)) + const gen = from(service.infiniteGenerator({})).pipe(take(3)) const values: any[] = [] const sub = gen.subscribe((value) => { @@ -203,71 +280,80 @@ describe("codegen client & server", () => { { int: 1 }, { int: 2 }, { int: 3 }, - { int: 4 }, - { int: 5 }, - { int: 6 }, - { int: 7 }, - { int: 8 }, - { int: 9 }, - { int: 10 }, - { int: 11 }, - { int: 12 }, - { int: 13 }, - { int: 14 }, ]) - expect(infiniteGeneratorEmited).toEqual(15) + expect(infiniteGeneratorEmited).toEqual(3) expect(infiniteGeneratorClosed).toEqual(1) }) - it("infinite stream take rxjs 0", async () => { + it("infinite stream take rxjs 3 (using lastvalue)", async () => { infiniteGeneratorClosed = 0 infiniteGeneratorEmited = 0 + closeFuture = future() - const gen = from(service.infiniteGenerator({})).pipe(take(0)) + const gen = from(service.infiniteGenerator({})).pipe(take(3)) + + const lastValue = await lastValueFrom(gen) + await delay(100) + + expect(lastValue).toEqual({ int: 3 }) + expect(infiniteGeneratorEmited).toEqual(3) + expect(infiniteGeneratorClosed).toEqual(1) + }) + + it("infinite stream take rxjs 4", async () => { + infiniteGeneratorClosed = 0 + infiniteGeneratorEmited = 0 + closeFuture = future() + + const gen = from(service.infiniteGenerator({})).pipe(take(4)) const values: any[] = [] const sub = gen.subscribe((value) => { values.push(value) }) + // allow the stream to be "fully" consumed await delay(1000) sub.unsubscribe() + + // wait for the close message to arrive to the server await delay(100) - expect(values).toEqual([]) + // resolve the "hanging promise" + closeFuture.resolve() + + // give it time to end and send async messages + await delay(100) - expect(infiniteGeneratorEmited).toEqual(0) + expect(values).toEqual([ + { int: 1 }, + { int: 2 }, + { int: 3 }, + ]) + expect(infiniteGeneratorEmited).toEqual(4) expect(infiniteGeneratorClosed).toEqual(1) }) it("take async iterator", async () => { infiniteGeneratorClosed = 0 infiniteGeneratorEmited = 0 + closeFuture = future() const values: any[] = [] - const stream = service.infiniteGenerator({}) - - for await (const value of stream) { + for await (const value of service.infiniteGenerator({})) { values.push(value) - if (values.length == 10) break + if (values.length == 3) break } await delay(100) - expect(infiniteGeneratorEmited).toEqual(10) + expect(infiniteGeneratorEmited).toEqual(3) expect(infiniteGeneratorClosed).toEqual(1) expect(values).toEqual([ { int: 1 }, { int: 2 }, { int: 3 }, - { int: 4 }, - { int: 5 }, - { int: 6 }, - { int: 7 }, - { int: 8 }, - { int: 9 }, - { int: 10 }, ]) }) }) diff --git a/test/codegen/client.proto b/test/codegen/client.proto index 9012a4d..5bf66cf 100644 --- a/test/codegen/client.proto +++ b/test/codegen/client.proto @@ -28,4 +28,5 @@ service BookService { rpc EmptyResponseStream(Book) returns (stream Empty) {} rpc AlmostEmptyResponseStream(Book) returns (stream AlmostEmpty) {} rpc InfiniteGenerator(Empty) returns (stream AlmostEmpty) {} + rpc FailFirstGenerator(Empty) returns (stream AlmostEmpty) {} } diff --git a/test/helpers.ts b/test/helpers.ts index c5e509c..14daf85 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -65,7 +65,16 @@ export function createSimpleTestEnvironment( options: CreateRpcServerOptions = {} ) { async function start(context: Context, transportOptions?: MemoryTransportOptions) { - const memoryTransport = MemoryTransport(transportOptions) + const memoryTransport = MemoryTransport({ + decouplingFunction: (cb) => { + if (process.env.SIMMULATE_JITTER === 'true') { + setTimeout(cb, Math.random() * 10) + } else { + cb() + } + }, + ...transportOptions + }) instrumentMemoryTransports(memoryTransport) const rpcServer = createRpcServer(options) diff --git a/test/push-channel.spec.ts b/test/push-channel.spec.ts index 9594e4d..0385030 100644 --- a/test/push-channel.spec.ts +++ b/test/push-channel.spec.ts @@ -1,13 +1,47 @@ import mitt from "mitt" -import { pushableChannel } from "../src/push-channel" +import { lastValueFrom, from, map } from "rxjs" +import { linkedList, pushableChannel } from "../src/push-channel" import { takeAsync } from "./helpers" -describe.only("push channel", () => { +function promisify(fn) { + return (...args) => { + return new Promise((resolve, reject) => fn(...args, x => x ? reject(x) : resolve())) + } +} + +describe('linked list', () => { + it('adds one, removes one', () => { + const l = linkedList() + expect(l.isEmpty()).toBeTruthy() + expect(l.unshift()).toBe(undefined) + expect(l.isEmpty()).toBeTruthy() + l.push(1) + expect(l.unshift()).toMatchObject({ value: 1 }) + expect(l.isEmpty()).toBeTruthy() + expect(l.unshift()).toBe(undefined) + expect(l.isEmpty()).toBeTruthy() + l.push(1) + expect(l.isEmpty()).toBeFalsy() + l.push(2) + l.push(3) + expect(l.isEmpty()).toBeFalsy() + expect(l.unshift()).toMatchObject({ value: 1 }) + expect(l.unshift()).toMatchObject({ value: 2 }) + expect(l.isEmpty()).toBeFalsy() + expect(l.unshift()).toMatchObject({ value: 3 }) + expect(l.isEmpty()).toBeTruthy() + expect(l.unshift()).toBe(undefined) + }) +}) + +describe("push channel", () => { it("enqueues several elements and iterates through them", async () => { const chan = pushableChannel(() => void 0) expect(chan.isClosed()).toEqual(false) - const pushes = [chan.push(1), chan.push(2), chan.push(3)] + const push = promisify(chan.push) + + const pushes = [push(1), push(2), push(3)] const values: number[] = [] @@ -15,7 +49,7 @@ describe.only("push channel", () => { values.push(val) expect(chan.isClosed()).toEqual(false) if (val == 3) { - setTimeout(() => pushes.push(chan.push(4)), 100) + setTimeout(() => pushes.push(push(4)), 100) } else if (val == 4) { chan.close() expect(chan.isClosed()).toEqual(true) @@ -34,9 +68,10 @@ describe.only("push channel", () => { const chan = pushableChannel(() => { closedCalled = true }) + const push = promisify(chan.push) expect(chan.isClosed()).toEqual(false) - void chan.push(0) + push(0) expect(chan.isClosed()).toEqual(false) for await (const val of chan) { @@ -51,15 +86,15 @@ describe.only("push channel", () => { it("breaking the channel as generator should finish execution", async () => { let closedCalled = false - let didCompleteTestFunction = false const events = mitt() async function* test() { const chan = pushableChannel(() => { closedCalled = true - events.off("*", chan.push) + events.off("*", push) }) - events.on("*", chan.push) + const push = promisify(chan.push) + events.on("*", push) for await (const num of chan) { yield num } @@ -81,14 +116,18 @@ describe.only("push channel", () => { it("it works as a job queue", async () => { const chan = pushableChannel(() => void 0) + const push = promisify(chan.push) - void chan.push(0) - void chan.push(1) - void chan.push(2) - void chan.push(3) - void chan.push(4) + const jobs = Promise.all([ + push(0), + push(1), + push(2), + push(3), + push(4) + ]) const takeAll = takeAsync(chan.iterable) + await jobs chan.close() @@ -97,22 +136,30 @@ describe.only("push channel", () => { it("it works as a job queue, iterator still works after close", async () => { const chan = pushableChannel(() => void 0) + const push = promisify(chan.push) + + const jobs = Promise.all([ + push(0), + push(1), + push(2), + push(3), + push(4) + ]) - void chan.push(0) - void chan.push(1) - void chan.push(2) - void chan.push(3) - void chan.push(4) chan.close() - expect(await takeAsync(chan.iterable)).toEqual([0, 1, 2, 3, 4]) + const takeAll = takeAsync(chan.iterable) + + await jobs + + expect(await takeAll).toEqual([0, 1, 2, 3, 4]) }) it("throw in the iterator closes the channel", async () => { const chan = pushableChannel(() => void 0) - + const push = promisify(chan.push) expect(chan.isClosed()).toEqual(false) - void chan.push(0) + void push(0) expect(chan.isClosed()).toEqual(false) await expect(async () => { @@ -146,12 +193,12 @@ describe.only("push channel", () => { it("close the channel without pending ops inside iterator breaks iterator", async () => { const chan = pushableChannel(() => void 0) - + const push = promisify(chan.push) expect(chan.isClosed()).toEqual(false) let values: number[] = [] - void chan.push(1) + void push(1) expect(chan.isClosed()).toEqual(false) @@ -161,23 +208,133 @@ describe.only("push channel", () => { // this should behave exactly as "break" chan.close() expect(chan.isClosed()).toEqual(true) - await expect(async () => await chan.push(2)).rejects.toThrow("Channel is closed") + await expect(async () => await push(2)).rejects.toThrow("Channel is closed") } expect(chan.isClosed()).toEqual(true) expect(values).toEqual([1]) }) - it("close the channel with failAndClose should make the iterator fail", async () => { + it("rxjs consumer reads a value after start the stream", async () => { + const chan = pushableChannel(() => void 0) + const push = promisify(chan.push) + expect(chan.isClosed()).toEqual(false) + + const result = lastValueFrom(from(chan)) + + expect(chan.isClosed()).toEqual(false) + + await push(1) + chan.close() + + expect(chan.isClosed()).toEqual(true) + + expect(await result).toEqual(1) + }) + + it("emit one, read one, then pause. read callback is triggered", async () => { + const chan = pushableChannel(() => void 0) + const push = promisify(chan.push) + expect(chan.isClosed()).toEqual(false) + + expect(chan.isClosed()).toEqual(false) + + const [pushed, next] = await Promise.all([ + push(1), + chan.iterable.next() + ]) + + expect(chan.isClosed()).toEqual(false) + expect(await next.value).toEqual(1) + + chan.close() + + expect(chan.isClosed()).toEqual(true) + }) + + it("emit three, read one, then close stream. pending pushes must fail", async () => { + const chan = pushableChannel(() => void 0) + const push = promisify(chan.push) + expect(chan.isClosed()).toEqual(false) + + expect(chan.isClosed()).toEqual(false) + + const pushes = Promise.allSettled([ + push(1), + push(2), + push(3), + ]) + + expect((await chan.iterable.next()).value).toEqual(1) + + + expect(chan.isClosed()).toEqual(false) + await chan.iterable.return(null) + expect(chan.isClosed()).toEqual(true) + + expect((await pushes).map($ => $.status)).toEqual(["fulfilled", "rejected", "rejected"]) + }) + + + it("rxjs consumer reads all preexistent values", async () => { + const chan = pushableChannel(() => void 0) + expect(chan.isClosed()).toEqual(false) + const push = promisify(chan.push) + + const promises = [push(1), push(2), push(3)] + + const result = lastValueFrom(from(chan)) + + expect(chan.isClosed()).toEqual(false) + chan.close() + expect(chan.isClosed()).toEqual(true) + + await Promise.all(promises) + + expect(await result).toEqual(3) + }) + + it("async consumer reads all preexistent values", async () => { const chan = pushableChannel(() => void 0) + expect(chan.isClosed()).toEqual(false) + const push = promisify(chan.push) + + const promises = [push(1), push(2), push(3)] + + expect(await (await chan.iterable.next()).value).toEqual(1) + expect(await (await chan.iterable.next()).value).toEqual(2) + expect(await (await chan.iterable.next()).value).toEqual(3) + await Promise.all(promises) + + expect(chan.isClosed()).toEqual(false) + chan.close() + expect(chan.isClosed()).toEqual(true) + }) + + it("async consumer reads zero values if the channel is closed", async () => { + const chan = pushableChannel(() => void 0) + expect(chan.isClosed()).toEqual(false) + + const result = lastValueFrom(from(chan)) + + expect(chan.isClosed()).toEqual(false) + chan.close() + expect(chan.isClosed()).toEqual(true) + + await expect(() => result).rejects.toMatchObject({ message: 'no elements in sequence' }) + }) + + it("close the channel with failAndClose should make the iterator fail", async () => { + const chan = pushableChannel(() => void 0) + const push = promisify(chan.push) expect(chan.isClosed()).toEqual(false) let values: number[] = [] expect(chan.isClosed()).toEqual(false) - setTimeout(() => chan.push(1).then(() => chan.failAndClose(new Error("safe"))), 10) + setImmediate(() => { push(1).then(() => chan.failAndClose(new Error("safe"))) }) await expect(async () => { for await (const val of chan) { @@ -191,6 +348,7 @@ describe.only("push channel", () => { it("generator yield basic case", async () => { let chan = pushableChannel(() => void 0) + const push = promisify(chan.push) let values: number[] = [] async function* generator() { @@ -202,9 +360,9 @@ describe.only("push channel", () => { } setTimeout(async () => { - await chan.push(1) - await chan.push(2) - await chan.push(3) + await push(1) + await push(2) + await push(3) }, 10) expect(chan.isClosed()).toEqual(false) diff --git a/test/stream.spec.ts b/test/stream.spec.ts index 939897a..557d552 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -33,7 +33,6 @@ async function testPort(rpcClient: RpcClient, portName: string) { describe("Helpers simple req/res", () => { let remoteCallCounter = 0 - const events = mitt<{ a: Uint8Array }>() let channel: ReturnType const testEnv = createSimpleTestEnvironment(async function (port) { log(`! Initializing port ${port.portId} ${port.portName}`) @@ -60,21 +59,9 @@ describe("Helpers simple req/res", () => { yield new Uint8Array([counter % 0xff]) } }, - async *manualHackWithPushableChannel() { - channel = pushableChannel(() => deferCloseChannel) - // subscribe to room message - events.on("a", channel.push) - // forward all messages - for await (const message of channel) { - yield message as Uint8Array - } - - // then close the channel - channel.close() - - function deferCloseChannel() { - events.off("a", channel.push) - } + manualHackWithPushableChannel() { + channel = pushableChannel(() => void 0) + return channel.iterable as AsyncGenerator }, async *parameterCounter(data) { let total = data[0] @@ -142,19 +129,16 @@ describe("Helpers simple req/res", () => { const values: Uint8Array[] = [] const FINAL_RESULT = new Uint8Array([1, 2, 3]) - let localCallCounter = 0 + const generator = (await module.infiniteCounter())[Symbol.asyncIterator]() + remoteCallCounter = 0 - await expect(async () => { - for await (const u8a of await module.infiniteCounter()) { - values.push(u8a) - localCallCounter++ - if (localCallCounter == FINAL_RESULT.length) throw new Error("closed locally") - } - }).rejects.toThrow("closed locally") - expect(new Uint8Array(Buffer.concat(values))).toEqual(FINAL_RESULT) + values.push(await (await generator.next()).value) + values.push(await (await generator.next()).value) + values.push(await (await generator.next()).value) + await generator.throw(new Error("closed locally")) - expect(remoteCallCounter).toEqual(localCallCounter) + expect(new Uint8Array(Buffer.concat(values))).toEqual(FINAL_RESULT) }) it("a remote infiniteCounter is gracefully stopped from client side on third iteration", async () => { @@ -205,28 +189,19 @@ describe("Helpers simple req/res", () => { expect(remoteCallCounter).toEqual(localCallCounter) }) - it("a remote manualHackWithPushableChannel is gracefully stopped from client side on third iteration", async () => { + it("a remote manualHackWithPushableChannel is gracefully stopped from client side", async () => { const { rpcClient } = await testEnv.start() const port = await rpcClient.createPort("test1") const module = (await port.loadModule("echo")) as { manualHackWithPushableChannel(): Promise> } - async function test() { - for await (const u8a of await module.manualHackWithPushableChannel()) { - expect(channel.isClosed()).toEqual(false) - return u8a - } - } - - const ret = test() + const gen = (await module.manualHackWithPushableChannel())[Symbol.asyncIterator]() - await delay(100) + expect(channel.isClosed()).toEqual(false) - events.emit("a", new Uint8Array([1])) - expect(await ret).toEqual(new Uint8Array([1])) + await gen.return(null) - await delay(100) expect(channel.isClosed()).toEqual(true) }) }) From 5923fd58b50779ad9de6d2f2067a5e6ddd17caa1 Mon Sep 17 00:00:00 2001 From: menduz Date: Thu, 2 Jun 2022 22:07:26 -0300 Subject: [PATCH 3/4] It had to be done --- .vscode/launch.json | 3 +- Makefile | 6 +- README.md | 33 +-- src/ack-helper.ts | 12 +- src/client.ts | 82 ++++-- src/codegen.ts | 32 +-- src/push-channel.ts | 152 ++++++++--- src/server.ts | 91 +++--- src/transports/WebSocket.ts | 1 - src/transports/WebWorker.ts | 1 - test/close-transport.spec.ts | 328 +++++++++++++++++++++- test/codegen-server.spec.ts | 25 +- test/helpers.ts | 7 +- test/push-channel.spec.ts | 410 +++++++++++++++++++++++++++- test/sanity.spec.ts | 11 +- test/stream-from-dispatcher.spec.ts | 169 ++++++------ test/stream.spec.ts | 21 +- 17 files changed, 1079 insertions(+), 305 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index daedc09..9f547dd 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -35,8 +35,7 @@ "${workspaceRoot}/node_modules/.bin/jest", "--runInBand", "--coverage", - "-t", - "manualHackWithPushableChannel" + "test/push-channel.spec.ts" ], "console": "integratedTerminal", "internalConsoleOptions": "neverOpen", diff --git a/Makefile b/Makefile index 739d8ff..146edb1 100644 --- a/Makefile +++ b/Makefile @@ -39,12 +39,12 @@ test: --ts_proto_out="$(PWD)/test/codegen" \ -I="$(PWD)/test/codegen" \ "$(PWD)/test/codegen/client.proto" - SIMMULATE_JITTER=false node_modules/.bin/jest --detectOpenHandles --colors --runInBand $(TESTARGS) $(TEST_FILE) - SIMMULATE_JITTER=true node_modules/.bin/jest --detectOpenHandles --colors --runInBand $(TESTARGS) --coverage $(TEST_FILE) + SIMMULATE_JITTER=false node_modules/.bin/jest --detectOpenHandles --colors --runInBand $(TESTARGS) --coverage $(TEST_FILE) + SIMMULATE_JITTER=true node_modules/.bin/jest --detectOpenHandles --colors --runInBand $(TESTARGS) $(TEST_FILE) $(MAKE) integration-example test-watch: - node_modules/.bin/jest --detectOpenHandles --colors --runInBand --watch $(TESTARGS) --coverage + INSTRUMENT_TRANSPORT=true node_modules/.bin/jest --detectOpenHandles --colors --runInBand --watch $(TESTARGS) --coverage build: node_modules/.bin/ts-node scripts/generate-proto-file.ts diff --git a/README.md b/README.md index af6e258..ec0d272 100644 --- a/README.md +++ b/README.md @@ -63,35 +63,32 @@ C->S: Request {procedure_id, payload} S->C: Response {message_id, payload} ``` -#### Getting an async stream +#### Getting an async stream (closed by client) ```sequence participant Scene (client) as C participant Kernel (server) as S C->S: Request {message_id} -S->C: Response {message_id,streaming=true} +S->C: Response {message_id,streaming=true,seqId=0} C->C: Generate async iterator for {message_id} -S-->C: StreamMessage {message_id,payload} -S-->C: -S-->C: +C->S: StreamMessage {ack=true,message_id,seqId=0} +note over C: Ask for a new item to be generated using ack=true +S-->C: StreamMessage {message_id,payload,seqId=1} +C->S: StreamMessage {ack=true,message_id,seqId=1} +note over C: Close the message by responding\nthe last ACK with ack=true,closed=true +S-->C: StreamMessage {message_id,payload,seqId=2} +C->S: StreamMessage {ack=true,message_id,seqId=2,closed=true} +S->S: Close async Generator +C->C: Close async Iterator S-->C: StreamMessage {message_id,closed=true} C->C: Close async iterator ``` -#### Closing an async stream from the Scene -```sequence -participant Scene (client) as C -participant Kernel (server) as S -C->S: Request {message_id} -S->C: Response {message_id,streaming=true} -C->C: Generate async iterator for {message_id} -S-->C: StreamMessage {message_id,payload} -S-->C: -S-->C: -C->C: Close async iterator -C-->S: StreamMessage {message_id,closed=true} -``` +#### Getting an async stream (closed by server) + +The server will send a special StreamMessage with a new SeqId to tell the client that a stream (generator) +was closed # Implementation of the interfaces diff --git a/src/ack-helper.ts b/src/ack-helper.ts index 11e14b5..82e76bc 100644 --- a/src/ack-helper.ts +++ b/src/ack-helper.ts @@ -13,9 +13,15 @@ export function createAckHelper(transport: Transport): AckDispatcher { const bb = new Writer() - transport.on("close", () => { + function closeAll() { const err = new Error("Transport closed while waiting the ACK") - oneTimeCallbacks.forEach(([_resolve, reject]) => reject(err)) + oneTimeCallbacks.forEach(([, reject]) => reject(err)) + oneTimeCallbacks.clear() + } + + transport.on("close", closeAll) + transport.on("error", err => { + oneTimeCallbacks.forEach(([, reject]) => reject(err)) oneTimeCallbacks.clear() }) @@ -26,6 +32,8 @@ export function createAckHelper(transport: Transport): AckDispatcher { if (fut) { oneTimeCallbacks.delete(key) fut[0](data) + } else { + throw new Error('Received a message for an inexistent handler ' + key) } }, async sendWithAck(data: StreamMessage): Promise { diff --git a/src/client.ts b/src/client.ts index ddb13db..cc27273 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,6 +1,7 @@ import { CallableProcedureClient, ClientModuleDefinition, RpcClient, RpcClientPort, RpcPortEvents } from "." import { Transport } from "./types" import mitt from "mitt" +import future, { IFuture } from "fp-future" import { Writer } from "protobufjs/minimal" import { CreatePort, @@ -15,7 +16,7 @@ import { StreamMessage, } from "./protocol" import { MessageDispatcher, messageNumberHandler } from "./message-number-handler" -import { pushableChannel } from "./push-channel" +import { AsyncQueue, linkedList, pushableChannel } from "./push-channel" import { calculateMessageIdentifier, closeStreamMessage, @@ -91,51 +92,74 @@ function throwIfRemoteError(parsedMessage: RemoteError) { throw new Error("RemoteError: " + parsedMessage.errorMessage) } -// @internal + +/** + * If a StreamMessage is received, then it means we have the POSSIBILITY to + * consume a remote generator. The client must answer every ACK with the next + * inteded action, could be: next(), close(). Both actions are serialized in the + * StreamMessage. The server MUST NOT generate any new element of the generator + * if the client doesn't ask for it. + * + * The whole protocol is designed to be SLOW AND SECURE, that means, ACKs (slow) + * will block the generation and consumption of iterators (secure). + * + * That exist to save the memory of the servers and to generate the much needed + * backpressure. + * + * If throughput is what you are looking for, you may better use bigger messages + * containing serialized lists. Effectively reducing the number of messages + * and increasing their size. + * + * @internal + */ export function streamFromDispatcher( dispatcher: MessageDispatcher, streamMessage: StreamMessage, messageNumber: number ): AsyncGenerator { - const channel = pushableChannel(localIteratorClosed) - let lastReceivedSequenceId = 0 let isRemoteClosed = false + const channel = new AsyncQueue(sendServerSignals) + dispatcher.transport.on("close", () => { - if (!channel.isClosed()) { - channel.failAndClose(new Error("RPC Transport closed")) - } + channel.close(new Error("RPC Transport closed")) }) dispatcher.transport.on("error", () => { - if (!channel.isClosed()) { - channel.failAndClose(new Error("RPC Transport failed")) - } + channel.close(new Error("RPC Transport failed")) }) - function localIteratorClosed() { - if (!isRemoteClosed) { - dispatcher.transport.sendMessage(closeStreamMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId)) + // This function is called at two moments + // 1. When the channel is closed or fails -> an ACK closing the stream is sent to the server + // 2. When the channel.next() is called -> an ACK requesting the next elem is sent to the server + function sendServerSignals(_channel: AsyncQueue, action: "close" | "next") { + if (action == "close") { + dispatcher.removeListener(messageNumber) } - dispatcher.removeListener(messageNumber) - } - - function sendAck() { - const closed = channel.isClosed() - if (!closed && !isRemoteClosed) { - dispatcher.transport.sendMessage(streamAckMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId)) + if (!isRemoteClosed) { + if (action == "close") { + dispatcher.transport.sendMessage(closeStreamMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId)) + } else if (action == "next") { + dispatcher.transport.sendMessage(streamAckMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId)) + } } } + // receive a message from the server and send it to the iterable channel function processMessage(message: StreamMessage) { lastReceivedSequenceId = message.sequenceId if (message.closed) { + // when the server CLOSES the stream, then we raise the flag isRemoteClosed + // to prevent sending an extra closeStreamMessage to the server after closing + // our channel. + // IMPORTANT: If the server closes the connection, then we DONT send the ACK + // back to the server because it is redundant information. isRemoteClosed = true - channel.iterable.return(null).then(sendAck) + channel.close() } else { - channel.push(message.payload, sendAck) + channel.enqueue(message.payload) } } @@ -148,20 +172,18 @@ export function streamFromDispatcher( processMessage(message) } else if (messageType == RpcMessageTypes.RpcMessageTypes_REMOTE_ERROR_RESPONSE) { isRemoteClosed = true - channel.failAndClose( + channel.close( new Error("RemoteError: " + ((message as RemoteError).errorMessage || "Unknown remote error")) ) } else { - channel.failAndClose(new Error("RemoteError: Protocol error")) + channel.close(new Error("RemoteError: Protocol error")) } } else { - channel.failAndClose(new Error("RemoteError: Protocol error")) + channel.close(new Error("RemoteError: Protocol error")) } }) - processMessage(streamMessage) - - return channel.iterable + return channel } // @internal @@ -199,10 +221,12 @@ function createProcedure(portId: number, procedureId: number, dispatcher: Messag return undefined } } else if (messageType == RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE) { + // If a StreamMessage is received, then it means we have the POSSIBILITY + // to consume a remote generator. Look into the streamFromDispatcher functions + // for more information. return streamFromDispatcher(dispatcher, message, messageNumber) } else if (messageType == RpcMessageTypes.RpcMessageTypes_REMOTE_ERROR_RESPONSE) { throwIfRemoteError(message) - debugger } } } diff --git a/src/codegen.ts b/src/codegen.ts index aaf2db3..9603241 100644 --- a/src/codegen.ts +++ b/src/codegen.ts @@ -50,27 +50,23 @@ export function clientProcedureStream( return _generator! } - function generateResult(): AsyncGenerator { - const ret: AsyncGenerator = { - [Symbol.asyncIterator]: () => generateResult(), - async next() { - const iter = await (await getGenerator()).next() - return { value: method.responseType.decode(iter.value ?? EMPTY_U8ARRAY), done: iter.done } - }, - async return(value) { - const iter = await (await getGenerator()).return(value) - return { value: method.responseType.decode(iter.value ?? EMPTY_U8ARRAY), done: iter.done } - }, - async throw(value) { - const iter = await (await getGenerator()).throw(value) - return { value: method.responseType.decode(iter.value ?? EMPTY_U8ARRAY), done: iter.done } - } + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + async next() { + const iter = await (await getGenerator()).next() + return { value: method.responseType.decode(iter.value ?? EMPTY_U8ARRAY), done: iter.done } + }, + async return(value) { + const iter = await (await getGenerator()).return(value) + return { value: iter.value ? method.responseType.decode(iter.value) : iter.value, done: iter.done } + }, + async throw(value) { + const iter = await (await getGenerator()).throw(value) + return { value: iter.value ? method.responseType.decode(iter.value) : iter.value, done: iter.done } } - - return ret } - return generateResult() + return ret } return fn diff --git a/src/push-channel.ts b/src/push-channel.ts index a074b29..2a3bbcc 100644 --- a/src/push-channel.ts +++ b/src/push-channel.ts @@ -1,45 +1,47 @@ -type Node = { value: T; resolve?: LastResolver; prev?: Node, next?: Node } +type Node = { value: T; prev?: Node; next?: Node } type LastResolver = (err?: any) => void export function linkedList() { let head: Node | undefined = undefined let tail: Node | undefined = undefined - function push(value: T, resolve?: LastResolver) { + function enqueue(value: T) { const node: Node = { value, - resolve, } - node.prev = tail; + node.prev = tail if (tail) { - tail.next = node; + tail.next = node } if (!head) { - head = node; + head = node } - tail = node; + tail = node } function remove(node: Node): void { if (!node.next) { - tail = node.prev; + tail = node.prev } else { - const nextNode = node.next; - nextNode.prev = node.prev; + const nextNode = node.next + nextNode.prev = node.prev } if (!node.prev) { - head = node.next; + head = node.next } else { - const prevNode = node.prev; - prevNode.next = node.next; + const prevNode = node.prev + prevNode.next = node.next } } // removes the head node and updates the head - function unshift(): Node | undefined { + function dequeue(): T | undefined { const ret = head - if (ret) remove(ret) - return ret + if (ret) { + remove(ret) + return ret.value + } + return undefined } // signals if the list is empty @@ -47,13 +49,12 @@ export function linkedList() { return !head } - return { push, unshift, isEmpty } + return { enqueue, dequeue, isEmpty } } - export function pushableChannel(onIteratorClose: () => void) { let returnLock: (() => void) | null = null - const queue = linkedList() + const queue = linkedList<{ value: T, resolve: LastResolver }>() let closed = false let error: Error | null = null @@ -61,8 +62,8 @@ export function pushableChannel(onIteratorClose: () => void) { if (!queue.isEmpty()) { const err = error || new Error("Channel was closed before deliverying the message") while (!queue.isEmpty()) { - const { resolve } = queue.unshift()! - if (resolve) resolve(err); + const { resolve } = queue.dequeue()! + if (resolve) resolve(err) } } } @@ -76,23 +77,24 @@ export function pushableChannel(onIteratorClose: () => void) { } } - function push(value: T, callback: (err?: any) => void) { + function push(value: T, resolve: (err?: any) => void) { if (closed) { - callback(new Error("Channel is closed")) + resolve(new Error("Channel is closed")) return } if (error) { - callback(error) + resolve(error) return } // push the value to the queue - queue.push(value, callback) + queue.enqueue({ value, resolve }) releaseLockIfNeeded() } function failAndClose(errorToThrow: Error) { error = errorToThrow close() + closeAllPending() } function yieldNextResult(): IteratorResult | void { @@ -103,8 +105,9 @@ export function pushableChannel(onIteratorClose: () => void) { return { done: true, value: undefined } } if (!queue.isEmpty()) { - const node = queue.unshift()! - if (node.resolve) node.resolve() + const node = queue.dequeue()! + if (node.resolve) + node.resolve(error || undefined) return { done: false, value: node.value, @@ -131,8 +134,7 @@ export function pushableChannel(onIteratorClose: () => void) { await new Promise((res) => (returnLock = res)) } } catch (err: any) { - error = err - close() + failAndClose(err) throw err } } @@ -140,14 +142,13 @@ export function pushableChannel(onIteratorClose: () => void) { async return(value) { close() closeAllPending() - return { done: true, value: undefined } + return { done: true, value } }, async throw(e) { if (error) { throw error } - close() - closeAllPending() + failAndClose(e) return { done: true, value: undefined } }, [Symbol.asyncIterator]() { @@ -161,3 +162,90 @@ export function pushableChannel(onIteratorClose: () => void) { return { iterable, push, close, failAndClose, isClosed, [Symbol.asyncIterator]: () => iterable } } + +export class AsyncQueue implements AsyncGenerator { + // enqueues > dequeues + values = linkedList>() + // dequeues > enqueues + settlers = linkedList<{ + resolve(x: IteratorResult): void + reject(error: Error): void + }>() + closed = false + error: Error | undefined = undefined + + constructor(private requestingNext: (queue: AsyncQueue, action: "next" | "close") => void) { } + + [Symbol.asyncIterator](): AsyncGenerator { + return this + } + + enqueue(value: T) { + if (this.closed) { + throw new Error("Channel is closed") + } + if (!this.settlers.isEmpty()) { + if (!this.values.isEmpty()) { + throw new Error("Illegal internal state") + } + const settler = this.settlers.dequeue()! + if (value instanceof Error) { + settler.reject(value) + } else { + settler.resolve({ value }) + } + } else { + this.values.enqueue({ value }) + } + } + /** + * @returns a Promise for an IteratorResult + */ + async next(): Promise> { + if (!this.values.isEmpty()) { + const value = this.values.dequeue()! + return value + } + if (this.error) { + throw this.error + } + if (this.closed) { + if (!this.settlers.isEmpty()) { + throw new Error("Illegal internal state") + } + return { done: true, value: undefined } + } + // Wait for new values to be enqueued + return new Promise>((resolve, reject) => { + this.requestingNext(this, "next") + this.settlers.enqueue({ resolve, reject }) + }) + } + + async return(value: any): Promise> { + this.close(value) + return { done: true, value } + } + + async throw(error: Error): Promise> { + this.close(error) + return { done: true, value: undefined } + } + + close(error?: Error) { + if (error) + while (!this.settlers.isEmpty()) { + this.settlers.dequeue()!.reject(error) + } + else + while (!this.settlers.isEmpty()) { + this.settlers.dequeue()!.resolve({ done: true, value: undefined }) + } + if (error) + this.error = error + if (!this.closed) { + this.closed = true + this.requestingNext(this, "close") + } + } +} diff --git a/src/server.ts b/src/server.ts index b4f1ac7..7f27d1b 100644 --- a/src/server.ts +++ b/src/server.ts @@ -236,6 +236,56 @@ export async function handleDestroyPort( } } +// @internal +export async function sendStream(ackDispatcher: AckDispatcher, transport: Transport, stream: AsyncGenerator, portId: number, messageNumber: number) { + let sequenceNumber = 0 + + const reusedStreamMessage: StreamMessage = StreamMessage.fromJSON({ + closed: false, + ack: false, + sequenceId: sequenceNumber, + messageIdentifier: calculateMessageIdentifier( + RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE, + messageNumber + ), + payload: EMPTY_U8A, + portId: portId, + }) + + // First, tell the client that we are opening a stream. Once the client sends + // an ACK, we will know if they are ready to consume the first element. + // If the response is instead close=true, then this function returns and + // no stream.next() is called + // The following lines are called "stream offer" in the tests. + const ret = await ackDispatcher.sendWithAck(reusedStreamMessage) + if (ret.closed) return + if (!ret.ack) throw new Error('Error in logic, ACK must be true') + + // If this point is reached, then the client WANTS to consume an element of the + // generator + for await (const elem of stream) { + sequenceNumber++ + reusedStreamMessage.sequenceId = sequenceNumber + reusedStreamMessage.payload = elem + + // sendWithAck may fail if the transport is closed, effectively ending this + // iterator and the underlying generator. (by exiting this for-await-of) + // Aditionally, the ack message is used to know WHETHER the client wants to + // generate another element or cancel the iterator by setting closed=true + const ret = await ackDispatcher.sendWithAck(reusedStreamMessage) + + // we first check for ACK because it is the hot-code-path + if (ret.ack) { + continue + } else if (ret.closed) { + // if it was closed remotely, then we end the stream right away + return + } + } + + transport.sendMessage(closeStreamMessage(messageNumber, sequenceNumber, portId)) +} + // @internal export async function handleRequest( ackDispatcher: AckDispatcher, @@ -276,44 +326,7 @@ export async function handleRequest( Response.encode(response, unsafeSyncWriter) transport.sendMessage(unsafeSyncWriter.finish()) } else if (result && Symbol.asyncIterator in result) { - const iter: AsyncGenerator = await (result as any)[Symbol.asyncIterator]() - let sequenceNumber = -1 - - const reusedStreamMessage: StreamMessage = StreamMessage.fromJSON({ - closed: false, - ack: false, - sequenceId: 0, - messageIdentifier: 0, - payload: EMPTY_U8A, - portId: request.portId, - }) - try { - for await (const elem of iter) { - sequenceNumber++ - reusedStreamMessage.closed = false - reusedStreamMessage.ack = false - reusedStreamMessage.sequenceId = sequenceNumber - reusedStreamMessage.messageIdentifier = calculateMessageIdentifier( - RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE, - messageNumber - ) - reusedStreamMessage.payload = elem - reusedStreamMessage.portId = request.portId - - // sendWithAck may fail if the transport is closed, effectively - // ending this iterator. - const ret = await ackDispatcher.sendWithAck(reusedStreamMessage) - - if (ret.ack) { - continue - } else if (ret.closed) { - // if it was closed remotely, then we end the stream right away - return - } - } - } finally { - transport.sendMessage(closeStreamMessage(messageNumber, sequenceNumber, request.portId)) - } + await sendStream(ackDispatcher, transport, result, port.portId, messageNumber) } else { unsafeSyncWriter.reset() Response.encode(response, unsafeSyncWriter) @@ -379,7 +392,7 @@ export function createRpcServer(options: CreateRpcServerOptions { + const messageQueue = new AsyncQueue>(log) + + const ackDispatcher: AckDispatcher = { + receiveAck() { }, + async sendWithAck(data) { + return await (await messageQueue.next()).value + } + } + + const transport = MemoryTransport() + const sendMessageSpy = jest.spyOn(transport.client, 'sendMessage') + const sendWithAckSpy = jest.spyOn(ackDispatcher, 'sendWithAck') + + function generator() { + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + async next() { throw new Error('not implemented') }, + async return() { throw new Error('not implemented') }, + async throw() { throw new Error('not implemented') } + } + return ret + } + + await Promise.all([ + sendStream(ackDispatcher, transport.client, generator(), 0, 0).catch(log), + // this message responds to the "stream offer" by closing it. + // IN SOME CASES, the client may not need to consume the stream. Since we are + // creating a "safe" API to handle resources and possibly signatures in the + // servers, not generating extra elements in generators must be ensured + messageQueue.enqueue({ closed: true }) + ]) + + expect(sendWithAckSpy).toBeCalledTimes(1) + expect(sendMessageSpy).toBeCalledTimes(0) +}) + + +/** + * This test ensures that the server iterators are closed if an ACK rejects + */ +test("Unit: server sendStream finalizes iterator upon failed ACK", async () => { + const messageQueue = new AsyncQueue>(log) + + const ackDispatcher: AckDispatcher = { + receiveAck() { }, + async sendWithAck(data) { + return await (await messageQueue.next()).value + } + } + + const transport = MemoryTransport() + const sendMessageSpy = jest.spyOn(transport.client, 'sendMessage') + const sendWithAckSpy = jest.spyOn(ackDispatcher, 'sendWithAck') + + let finalized = false + + function generator() { + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + async next() { + return { value: new Uint8Array(), done: false } + }, + async return() { + finalized = true + return { done: true, value: undefined } + }, + async throw() { throw new Error('not implemented') } + } + return ret + } + + await Promise.all([ + sendStream(ackDispatcher, transport.client, generator(), 0, 0).catch(log), + // this message responds to the "stream offer" + messageQueue.enqueue({ ack: true, closed: false }), + // this message asks for an element of the stream to be consumed + messageQueue.enqueue({ ack: true, closed: false }), + // then we FAIL! on the ACK. This should finalize the server generator + messageQueue.close(new Error('Timed out!')), + ]) + + expect(finalized).toEqual(true) + + expect(sendWithAckSpy).toBeCalledTimes(3) + expect(sendMessageSpy).toBeCalledTimes(0) +}) + + +/** + * This test ensures that the server sends a close message after the iterator returns a value + */ +test("Unit: server sendStream sends a close message after iterator finalizes", async () => { + const ackDispatcher: AckDispatcher = { + receiveAck() { }, + async sendWithAck(data) { + if (data.sequenceId != 0) throw new Error('never called') + return Promise.resolve({ closed: false, ack: true } as any) + } + } + + const transport = MemoryTransport() + + const sendMessageSpy = jest.spyOn(transport.client, 'sendMessage') + + function generator() { + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + async next() { + return { value: undefined, done: true } + }, + async return() { + return { done: true, value: undefined } + }, + async throw() { throw new Error('not implemented') } + } + return ret + } + + await sendStream(ackDispatcher, transport.client, generator(), 0, 0) + + expect(sendMessageSpy).toBeCalledTimes(1) +}) + + +/** + * This test ensures tha the AckDispatcher rejects all pending operations + * if an error is triggered in the transport + */ +test("Unit: AckDispatcher rejects all pending operations on transport error", async () => { + const transport = MemoryTransport() + const ackDispatcher: AckDispatcher = createAckHelper(transport.server) + + const promises = Promise.allSettled([ + ackDispatcher.sendWithAck({ messageIdentifier: calculateMessageIdentifier(0, 1), sequenceId: 1, payload: '' } as any), + ackDispatcher.sendWithAck({ messageIdentifier: calculateMessageIdentifier(0, 2), sequenceId: 2, payload: '' } as any), + ackDispatcher.sendWithAck({ messageIdentifier: calculateMessageIdentifier(0, 3), sequenceId: 3, payload: '' } as any), + ]) + + const error = new Error('Error123') + + transport.server.emit('error', error) + + expect(await promises).toEqual([ + { status: 'rejected', reason: error }, + { status: 'rejected', reason: error }, + { status: 'rejected', reason: error } + ]) +}) + +/** + * This test ensures tha the AckDispatcher rejects all pending operations + * if the transport is closed + */ +test("Unit: AckDispatcher rejects all pending operations on transport close", async () => { + const transport = MemoryTransport() + const ackDispatcher: AckDispatcher = createAckHelper(transport.server) + + const promises = Promise.allSettled([ + ackDispatcher.sendWithAck({ messageIdentifier: calculateMessageIdentifier(0, 1), sequenceId: 1, payload: '' } as any), + ackDispatcher.sendWithAck({ messageIdentifier: calculateMessageIdentifier(0, 2), sequenceId: 2, payload: '' } as any), + ackDispatcher.sendWithAck({ messageIdentifier: calculateMessageIdentifier(0, 3), sequenceId: 3, payload: '' } as any), + ]) + + transport.server.close() + + expect(await promises).toMatchObject([ + { status: 'rejected', reason: { message: 'Transport closed while waiting the ACK' } }, + { status: 'rejected', reason: { message: 'Transport closed while waiting the ACK' } }, + { status: 'rejected', reason: { message: 'Transport closed while waiting the ACK' } } + ]) +}) + +describe("Throw in client iterator closes remote iterator", () => { + const didClose = future() + const testEnv = createSimpleTestEnvironment(async function (port) { + let i = 0 + port.registerModule("echo", async (port) => ({ + infinite() { + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + async next() { + if (!didClose.isPending) throw new Error('error in logic, .next should not be called after return') + return { value: Uint8Array.from([i++]), done: false } + }, + async return(value) { + didClose.resolve(value) + return { done: true, value: undefined } + }, + async throw() { throw new Error('not implemented') } + } + return ret + }, + })) + }) + + it("runs the test", async () => { + const { rpcClient } = await testEnv.start() + + const generator = await (await testPort(rpcClient, "port1")).infinite() + + expect(await (await generator.next()).value).toEqual(Uint8Array.from([0])) + expect(await (await generator.next()).value).toEqual(Uint8Array.from([1])) + + await generator.throw(new Error('client error')) + + // the server AsyncGenerators must be closed after the transport is closed to avoid leaks + expect(await didClose).toEqual(undefined) + }) +}) + + +describe("Throw in client iterator closes remote iterator after receiving a message", () => { + const didClose = future() + const testEnv = createSimpleTestEnvironment(async function (port) { + let i = 0 + port.registerModule("echo", async (port) => ({ + infinite() { + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + async next() { + if (!didClose.isPending) throw new Error('error in logic, .next should not be called after return') + return { value: Uint8Array.from([i++]), done: false } + }, + async return(value) { + didClose.resolve(value) + return { done: true, value: undefined } + }, + async throw() { throw new Error('not implemented') } + } + return ret + }, + })) + }) + + it("runs the test", async () => { + const { rpcClient } = await testEnv.start() + + const generator = await (await testPort(rpcClient, "port1")).infinite() + + await (await generator.next()).value + await generator.throw(new Error('client error')) + + // the server AsyncGenerators must be closed after the transport is closed to avoid leaks + expect(await didClose).toEqual(undefined) + }) +}) + +describe("Close transport closes streams (server side) 1", () => { + const didClose = future() + const testEnv = createSimpleTestEnvironment(async function (port) { + let i = 0 + port.registerModule("echo", async (port) => ({ + infinite() { + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + async next() { + if (!didClose.isPending) throw new Error('error in logic, .next should not be called after return') + return { value: Uint8Array.from([i++]), done: false } + }, + async return(value) { + didClose.resolve(value) + return { done: true, value: undefined } + }, + async throw() { throw new Error('not implemented') } + } + return ret + }, + })) + }) + + it("runs the test", async () => { + const { rpcClient, transportServer } = await testEnv.start() + + const generator = await (await testPort(rpcClient, "port1")).infinite() + + await (await generator.next()).value + transportServer.close() + await expect(generator.next()).rejects.toThrow("RPC Transport closed") + + await delay(100) + + // the server AsyncGenerators must be closed after the transport is closed to avoid leaks + expect(await didClose).toEqual(undefined) + }) +}) + + describe("Close transport closes streams (server side)", () => { let infiniteStreamClosed = false const testEnv = createSimpleTestEnvironment(async function (port) { @@ -125,19 +423,27 @@ describe("Close transport closes streams (client side)", () => { }) }) -describe("Error in transport closes the transport", () => { +describe("Error in server transport closes the iterators", () => { let infiniteStreamClosed = false const testEnv = createSimpleTestEnvironment(async function (port) { port.registerModule("echo", async (port) => ({ - async *infinite() { - try { - infiniteStreamClosed = false - while (true) { - yield Uint8Array.from([1]) - } - } finally { - infiniteStreamClosed = true + infinite() { + infiniteStreamClosed = false + const ret: AsyncGenerator = { + [Symbol.asyncIterator]: () => ret, + async next() { + return { value: Uint8Array.from([1]), done: false } + }, + async return() { + infiniteStreamClosed = true + return { value: null, done: true } + }, + async throw() { + throw new Error("throw should never be called in this scenario") + }, } + + return ret }, })) }) diff --git a/test/codegen-server.spec.ts b/test/codegen-server.spec.ts index 741d42d..93db5ff 100644 --- a/test/codegen-server.spec.ts +++ b/test/codegen-server.spec.ts @@ -77,9 +77,9 @@ describe("codegen client & server", () => { const ret: AsyncGenerator = { [Symbol.asyncIterator]: () => ret, async next() { - infiniteGeneratorEmited++ // hang in 4th iteration if (infiniteGeneratorEmited == 4) await closeFuture + infiniteGeneratorEmited++ return { value: { int: infiniteGeneratorEmited } } }, async return() { @@ -115,8 +115,7 @@ describe("codegen client & server", () => { let service: codegen.RpcClientModule beforeAll(async () => { - const { rpcClient } = await testEnv.start(null, { - }) + const { rpcClient } = await testEnv.start() const clientPort = await rpcClient.createPort("test1") service = codegen.loadService(clientPort, BookServiceDefinition) @@ -207,21 +206,6 @@ describe("codegen client & server", () => { ) }) - it("when streams are materialized, the first element should be sended to start the ACK chains", async () => { - infiniteGeneratorClosed = 0 - infiniteGeneratorEmited = 0 - closeFuture = future() - - const gen = service.infiniteGenerator({}) - - await gen.return(null) - // give it time to end and send async messages - await delay(100) - - expect(infiniteGeneratorEmited).toEqual(1) - expect(infiniteGeneratorClosed).toEqual(1) - }) - it("infinite stream take vanilla 1", async () => { infiniteGeneratorClosed = 0 infiniteGeneratorEmited = 0 @@ -329,6 +313,7 @@ describe("codegen client & server", () => { { int: 1 }, { int: 2 }, { int: 3 }, + { int: 4 }, ]) expect(infiniteGeneratorEmited).toEqual(4) expect(infiniteGeneratorClosed).toEqual(1) @@ -348,12 +333,12 @@ describe("codegen client & server", () => { await delay(100) - expect(infiniteGeneratorEmited).toEqual(3) - expect(infiniteGeneratorClosed).toEqual(1) expect(values).toEqual([ { int: 1 }, { int: 2 }, { int: 3 }, ]) + expect(infiniteGeneratorEmited).toEqual(3) + expect(infiniteGeneratorClosed).toEqual(1) }) }) diff --git a/test/helpers.ts b/test/helpers.ts index 14daf85..f9f08dc 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -2,7 +2,6 @@ import { createRpcClient, createRpcServer, CreateRpcServerOptions, - RpcClient, RpcServerHandler, Transport, } from "../src" @@ -19,7 +18,7 @@ export async function takeAsync(iter: AsyncGenerator, max?: number) { for await (const $ of iter) { r.push($) counter++ - if (typeof max == "number" && counter == max) break + if (counter === max) break } return r } @@ -31,7 +30,7 @@ function serialize(data: Uint8Array) { } export function instrumentTransport(transport: Transport, name: string) { - if (typeof it == "function") { + if (typeof it == "function" && process.env.INSTRUMENT_TRANSPORT) { transport.on("close", (data) => { log(` (${name}): closed`) }) @@ -99,7 +98,7 @@ export function createSimpleTestEnvironment( } return { - start, + start } } diff --git a/test/push-channel.spec.ts b/test/push-channel.spec.ts index 0385030..e44e2ae 100644 --- a/test/push-channel.spec.ts +++ b/test/push-channel.spec.ts @@ -1,6 +1,6 @@ import mitt from "mitt" import { lastValueFrom, from, map } from "rxjs" -import { linkedList, pushableChannel } from "../src/push-channel" +import { AsyncQueue, linkedList, pushableChannel } from "../src/push-channel" import { takeAsync } from "./helpers" function promisify(fn) { @@ -11,26 +11,26 @@ function promisify(fn) { describe('linked list', () => { it('adds one, removes one', () => { - const l = linkedList() + const l = linkedList() expect(l.isEmpty()).toBeTruthy() - expect(l.unshift()).toBe(undefined) + expect(l.dequeue()).toBe(undefined) expect(l.isEmpty()).toBeTruthy() - l.push(1) - expect(l.unshift()).toMatchObject({ value: 1 }) + l.enqueue(1) + expect(l.dequeue()).toEqual(1) expect(l.isEmpty()).toBeTruthy() - expect(l.unshift()).toBe(undefined) + expect(l.dequeue()).toBe(undefined) expect(l.isEmpty()).toBeTruthy() - l.push(1) + l.enqueue(1) expect(l.isEmpty()).toBeFalsy() - l.push(2) - l.push(3) + l.enqueue(2) + l.enqueue(3) expect(l.isEmpty()).toBeFalsy() - expect(l.unshift()).toMatchObject({ value: 1 }) - expect(l.unshift()).toMatchObject({ value: 2 }) + expect(l.dequeue()).toEqual(1) + expect(l.dequeue()).toEqual(2) expect(l.isEmpty()).toBeFalsy() - expect(l.unshift()).toMatchObject({ value: 3 }) + expect(l.dequeue()).toEqual(3) expect(l.isEmpty()).toBeTruthy() - expect(l.unshift()).toBe(undefined) + expect(l.dequeue()).toBe(undefined) }) }) @@ -377,3 +377,387 @@ describe("push channel", () => { expect(values).toEqual([1, 2, 3]) }) }) + +describe("async queue", () => { + it("enqueues several elements and iterates through them", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + const pushes = [chan.enqueue(1), chan.enqueue(2), chan.enqueue(3)] + + const values: number[] = [] + + for await (const val of chan) { + values.push(val) + expect(chan.closed).toEqual(false) + if (val == 3) { + setTimeout(() => pushes.push(chan.enqueue(4)), 100) + } else if (val == 4) { + chan.close() + expect(chan.closed).toEqual(true) + } + } + + expect(chan.closed).toEqual(true) + expect(values).toEqual([1, 2, 3, 4]) + // all promises must have ended after consumption + await Promise.all(pushes) + expect(values).toEqual([1, 2, 3, 4]) + }) + + it("break in the iterator closes the channel", async () => { + let closedCalled = false + const chan = new AsyncQueue((_, reason) => { + if (reason == 'close') closedCalled = true + }) + + expect(chan.closed).toEqual(false) + chan.enqueue(0) + expect(chan.closed).toEqual(false) + + for await (const val of chan) { + expect(val).toEqual(0) + expect(chan.closed).toEqual(false) + break + } + + expect(chan.closed).toEqual(true) + expect(closedCalled).toEqual(true) + }) + + it("breaking the channel as generator should finish execution", async () => { + let closedCalled = false + const events = mitt() + + async function* test() { + const chan = pushableChannel(() => { + closedCalled = true + events.off("*", push) + }) + const push = promisify(chan.push) + events.on("*", push) + for await (const num of chan) { + yield num + } + } + + const ret = takeAsync(test(), 3) + + await new Promise((ret) => setTimeout(ret, 100)) + + void events.emit("a", 0) + void events.emit("a", 0) + void events.emit("a", 0) + + await new Promise((ret) => setTimeout(ret, 100)) + + expect(await ret).toEqual(["a", "a", "a"]) + expect(closedCalled).toEqual(true) + }) + + it("it works as a job queue", async () => { + const chan = new AsyncQueue(() => void 0) + + const jobs = Promise.all([ + chan.enqueue(0), + chan.enqueue(1), + chan.enqueue(2), + chan.enqueue(3), + chan.enqueue(4) + ]) + + const takeAll = takeAsync(chan) + await jobs + + chan.close() + + expect(await takeAll).toEqual([0, 1, 2, 3, 4]) + }) + + it("it works as a job queue, iterator still works after close", async () => { + const chan = new AsyncQueue(() => void 0) + + const jobs = Promise.all([ + chan.enqueue(0), + chan.enqueue(1), + chan.enqueue(2), + chan.enqueue(3), + chan.enqueue(4) + ]) + + chan.close() + + const takeAll = takeAsync(chan) + + await jobs + + expect(await takeAll).toEqual([0, 1, 2, 3, 4]) + }) + + it("throw in the iterator closes the channel", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + chan.enqueue(0) + expect(chan.closed).toEqual(false) + + await expect(async () => { + for await (const _ of chan) { + throw new Error("safe") + } + }).rejects.toThrow("safe") + + expect(chan.closed).toEqual(true) + }) + + it("close the channel asynchronously before yielding first time works", async () => { + const chan = new AsyncQueue(() => void 0) + + expect(chan.closed).toEqual(false) + + const values: number[] = [] + + // asynchronously close the channel + setTimeout(() => chan.close(), 100) + + expect(chan.closed).toEqual(false) + + for await (const val of chan) { + values.push(val) + } + + expect(chan.closed).toEqual(true) + expect(values).toEqual([]) + }) + + it("close the channel without pending ops inside iterator breaks iterator", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + let values: number[] = [] + + chan.enqueue(1) + + expect(chan.closed).toEqual(false) + + for await (const val of chan) { + if (chan.closed) throw new Error("did continue with iterator") + values.push(val) + // this should behave exactly as "break" + chan.close() + expect(chan.closed).toEqual(true) + await expect(async () => chan.enqueue(2)).rejects.toThrow("Channel is closed") + } + + expect(chan.closed).toEqual(true) + expect(values).toEqual([1]) + }) + + it("rxjs consumer reads a value after start the stream", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + const result = lastValueFrom(from(chan)) + + expect(chan.closed).toEqual(false) + + await chan.enqueue(1) + chan.close() + + expect(chan.closed).toEqual(true) + + expect(await result).toEqual(1) + }) + + it("emit one, read one, then pause. read callback is triggered", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + const [pushed, next] = await Promise.all([ + chan.enqueue(1), + chan.next() + ]) + + expect(chan.closed).toEqual(false) + expect(await next.value).toEqual(1) + + chan.close() + + expect(chan.closed).toEqual(true) + }) + + it("emit one, read three, then close stream. pending pushes must fail", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + chan.enqueue(1) + + const pushes = Promise.all([ + chan.next(), + chan.next(), + chan.next(), + ]) + + expect(chan.closed).toEqual(false) + await chan.return(null) + expect(chan.closed).toEqual(true) + + expect((await pushes)).toEqual([{ value: 1 }, { done: true, value: undefined }, { done: true, value: undefined }]) + }) + + + it("emit one, read three, then throw stream. pending pushes must fail", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + chan.enqueue(1) + + const pushes = Promise.allSettled([ + chan.next(), + chan.next(), + chan.next(), + ]) + + expect(chan.closed).toEqual(false) + await chan.throw(new Error('Synthetic error')) + expect(chan.closed).toEqual(true) + + expect((await pushes).map($ => $.status)).toEqual(["fulfilled", "rejected", "rejected"]) + }) + + it("read three, emit one, then close stream. pending pushes must fail", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + const pushes = Promise.all([ + chan.next(), + chan.next(), + chan.next(), + ]) + + chan.enqueue(1) + + expect(chan.closed).toEqual(false) + await chan.return(null) + expect(chan.closed).toEqual(true) + + expect((await pushes)).toEqual([{ value: 1 }, { done: true, value: undefined }, { done: true, value: undefined }]) + }) + + + it("read three, emit one, then close stream. pending pushes must fail", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + const pushes = Promise.allSettled([ + chan.next(), + chan.next(), + chan.next(), + ]) + + chan.enqueue(1) + + expect(chan.closed).toEqual(false) + await chan.throw(new Error('Synthetic error')) + expect(chan.closed).toEqual(true) + + expect((await pushes).map($ => $.status)).toEqual(["fulfilled", "rejected", "rejected"]) + }) + + + it("rxjs consumer reads all preexistent values", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + const promises = [chan.enqueue(1), chan.enqueue(2), chan.enqueue(3)] + + const result = lastValueFrom(from(chan)) + + expect(chan.closed).toEqual(false) + chan.close() + expect(chan.closed).toEqual(true) + + await Promise.all(promises) + + expect(await result).toEqual(3) + }) + + it("async consumer reads all preexistent values", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + const promises = [chan.enqueue(1), chan.enqueue(2), chan.enqueue(3)] + + expect(await (await chan.next()).value).toEqual(1) + expect(await (await chan.next()).value).toEqual(2) + expect(await (await chan.next()).value).toEqual(3) + + await Promise.all(promises) + + expect(chan.closed).toEqual(false) + chan.close() + expect(chan.closed).toEqual(true) + }) + + it("async consumer reads zero values if the channel is closed", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + const result = lastValueFrom(from(chan)) + + expect(chan.closed).toEqual(false) + chan.close() + expect(chan.closed).toEqual(true) + + await expect(() => result).rejects.toMatchObject({ message: 'no elements in sequence' }) + }) + + it("close the channel with failAndClose should make the iterator fail", async () => { + const chan = new AsyncQueue(() => void 0) + expect(chan.closed).toEqual(false) + + let values: number[] = [] + setImmediate(() => { + chan.enqueue(1); + chan.close(new Error("safe")) + }) + + await expect(async () => { + for await (const val of chan) { + values.push(val) + } + }).rejects.toThrow("safe") + + expect(chan.closed).toEqual(true) + expect(values).toEqual([1]) + }) + + it("generator yield basic case", async () => { + const chan = new AsyncQueue(() => void 0) + let values: number[] = [] + + async function* generator() { + let counter = 0 + for await (const val of chan) { + yield val + if (++counter == 3) throw new Error("reached 3") + } + } + + setTimeout(async () => { + await chan.enqueue(1) + await chan.enqueue(2) + await chan.enqueue(3) + }, 10) + + expect(chan.closed).toEqual(false) + + await expect(async () => { + for await (const val of generator()) { + values.push(val) + } + }).rejects.toThrow("reached 3") + + expect(chan.closed).toEqual(true) + expect(values).toEqual([1, 2, 3]) + }) +}) \ No newline at end of file diff --git a/test/sanity.spec.ts b/test/sanity.spec.ts index 2f4e51c..128ef7d 100644 --- a/test/sanity.spec.ts +++ b/test/sanity.spec.ts @@ -14,10 +14,13 @@ export async function configureTestPortServer( assert?: (t: Uint8Array, context: Context) => Promise ) { log(`! Initializing port ${port.portId} ${port.portName}`) + port.registerModule("fails", async (port): Promise => { + throw new Error('Failed while creating ModuLe') + }) port.registerModule( "echo", async (port): Promise => ({ - async returnEmpty() {}, + async returnEmpty() { }, async basic() { return Uint8Array.from([0, 1, 2]) }, @@ -61,12 +64,16 @@ describe("Helpers simple req/res", () => { await configureTestPortServer(port) }) - it("creates the server", async () => { + it("creates the server and gracefully fails if a module creation fails", async () => { const { rpcClient } = await testEnv.start() const port1 = await testPort(rpcClient, "port1") + await expect(() => port1.loadModule("fails")).rejects.toThrowError('Failed while creating ModuLe') + await expect(() => port1.loadModule("unknown-module")).rejects.toThrowError('Module unknown-module is not available for port port1') const port2 = await testPort(rpcClient, "port2") expect(port1).not.toBe(port2) }) + + }) diff --git a/test/stream-from-dispatcher.spec.ts b/test/stream-from-dispatcher.spec.ts index 8e16e30..60a4013 100644 --- a/test/stream-from-dispatcher.spec.ts +++ b/test/stream-from-dispatcher.spec.ts @@ -6,120 +6,96 @@ import { instrumentMemoryTransports, takeAsync } from "./helpers" import { StreamMessage } from "../src/protocol" describe("streamFromDispatcher", () => { - it("a CloseMessage from the server closes the iterator in the client", async () => { + it("a CloseMessage from the server closes the iterator in the client.", async () => { let seq = 0 const MESSAGE_NUMBER = 1 + const PORT_ID = 0 const transport = instrumentMemoryTransports(MemoryTransport()) const dispatcher = messageNumberHandler(transport.client) const removeListenerSpy = jest.spyOn(dispatcher, "removeListener") - const stream = streamFromDispatcher( + + // create a client stream for the server + const clientStream = streamFromDispatcher( dispatcher, - StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, 0, new Uint8Array())), + StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, Uint8Array.of())), MESSAGE_NUMBER ) - setTimeout(() => transport.server.sendMessage(closeStreamMessage(MESSAGE_NUMBER, seq++, 0)), 10) + // server sends CLOSE message + setImmediate(() => transport.server.sendMessage(closeStreamMessage(MESSAGE_NUMBER, seq++, PORT_ID))) + + // consume all stream + const allMessages = await takeAsync(clientStream) + + // yields empty array + expect(allMessages).toEqual([]) - const allMessages = await takeAsync(stream) - expect(allMessages).toEqual([new Uint8Array()]) + // and the listener should have been cleared expect(removeListenerSpy).toHaveBeenCalledWith(MESSAGE_NUMBER) }) it("a CloseMessage from the server closes the iterator in the client after yielding data SYNC", async () => { let seq = 0 const MESSAGE_NUMBER = 2 + const PORT_ID = 0 const PAYLOAD = Uint8Array.from([0xde, 0xad]) const transport = instrumentMemoryTransports(MemoryTransport()) const dispatcher = messageNumberHandler(transport.client) const removeListenerSpy = jest.spyOn(dispatcher, "removeListener") const addListenerSpy = jest.spyOn(dispatcher, "addListener") - const stream = streamFromDispatcher( + + // create a client stream for the server + const clientStream = streamFromDispatcher( dispatcher, - StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, 0, new Uint8Array())), + StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, Uint8Array.of())), MESSAGE_NUMBER ) + // it should have registered the listener expect(addListenerSpy).toHaveBeenCalledWith(MESSAGE_NUMBER, expect.anything()) - transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, 0, PAYLOAD)) - transport.server.sendMessage(closeStreamMessage(MESSAGE_NUMBER, seq++, 0)) - expect(removeListenerSpy).toHaveBeenCalledWith(MESSAGE_NUMBER) - - const allMessages = await takeAsync(stream) - expect(allMessages).toEqual([new Uint8Array(), PAYLOAD]) - }) - - it("a CloseMessage from the server closes the iterator in the client after yielding data", async () => { - let seq = 0 - const MESSAGE_NUMBER = 2 - const PAYLOAD = Uint8Array.from([0xde, 0xad]) - const transport = instrumentMemoryTransports(MemoryTransport()) - const dispatcher = messageNumberHandler(transport.client) - const stream = streamFromDispatcher( - dispatcher, - StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, 0, new Uint8Array())), - MESSAGE_NUMBER - ) + // ask the server for the .next() value + const futureValue = clientStream.next() - setTimeout(() => { - transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, 0, PAYLOAD)) - transport.server.sendMessage(closeStreamMessage(MESSAGE_NUMBER, seq++, 0)) - }, 10) + // server sends .next() + transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, PAYLOAD)) - const allMessages = await takeAsync(stream) - expect(allMessages).toEqual([new Uint8Array(), PAYLOAD]) - }) + // server closes transport + transport.server.sendMessage(closeStreamMessage(MESSAGE_NUMBER, seq++, PORT_ID)) - it("a StreamMessage with payload yields its result concurrently", async () => { - const MESSAGE_NUMBER = 3 - const PAYLOAD = Uint8Array.from([132]) - const transport = instrumentMemoryTransports(MemoryTransport()) - const dispatcher = messageNumberHandler(transport.client) - const stream = streamFromDispatcher( - dispatcher, - StreamMessage.decode(streamMessage(MESSAGE_NUMBER, 0, 0, PAYLOAD)), - MESSAGE_NUMBER - ) + // and since we consumed only one element from the stream, our entire materialized + // messages should only contain that + expect((await futureValue).value).toEqual(PAYLOAD) - const [_, allMessages] = await Promise.all([ - transport.server.sendMessage(closeStreamMessage(MESSAGE_NUMBER, 0, 0)), - takeAsync(stream), - ]) + expect((await clientStream.next()).done).toEqual(true) - expect(allMessages).toEqual([PAYLOAD]) + // the listener should have been cleared + expect(removeListenerSpy).toHaveBeenCalledWith(MESSAGE_NUMBER) }) - it("a StreamMessage with payload yields its result", async () => { + it("a CloseMessage from the server closes the iterator in the client after yielding data", async () => { let seq = 0 - const MESSAGE_NUMBER = 3 - const PAYLOAD = Uint8Array.from([133]) + const MESSAGE_NUMBER = 2 + const PORT_ID = 0 + const PAYLOAD = Uint8Array.from([0xde, 0xad]) const transport = instrumentMemoryTransports(MemoryTransport()) const dispatcher = messageNumberHandler(transport.client) - const stream = streamFromDispatcher( + + // create a client stream for the server + const clientStream = streamFromDispatcher( dispatcher, - StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, 0, PAYLOAD)), + StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, new Uint8Array())), MESSAGE_NUMBER ) - setTimeout(() => transport.server.sendMessage(closeStreamMessage(MESSAGE_NUMBER, seq++, 0)), 10) + // server sends a message and then closes the stream + setImmediate(() => { + transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, PAYLOAD)) + transport.server.sendMessage(closeStreamMessage(MESSAGE_NUMBER, seq++, PORT_ID)) + }) - const allMessages = await takeAsync(stream) - expect(allMessages).toEqual([PAYLOAD]) - }) - - it("a StreamMessage with payload yields its result with closeMessage received before consuming stream", async () => { - let seq = 0 - const MESSAGE_NUMBER = 3 - const PAYLOAD = Uint8Array.from([133]) - const transport = instrumentMemoryTransports(MemoryTransport()) - const dispatcher = messageNumberHandler(transport.client) - const stream = streamFromDispatcher( - dispatcher, - StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, 0, PAYLOAD)), - MESSAGE_NUMBER - ) - transport.server.sendMessage(closeStreamMessage(MESSAGE_NUMBER, seq++, 0)) - const allMessages = await takeAsync(stream) + // client consumes one message and then finishes the iterator + const allMessages = await takeAsync(clientStream) expect(allMessages).toEqual([PAYLOAD]) }) @@ -128,61 +104,74 @@ describe("streamFromDispatcher", () => { const MESSAGE_NUMBER = 4 const transport = instrumentMemoryTransports(MemoryTransport()) const dispatcher = messageNumberHandler(transport.client) - const stream = streamFromDispatcher( + const clientStream = streamFromDispatcher( dispatcher, - StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, 0, Uint8Array.from([1]))), + StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, 0, Uint8Array.of())), MESSAGE_NUMBER ) + // send three messages and then close the stream transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, 0, Uint8Array.from([2]))) transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, 0, Uint8Array.from([3]))) transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, 0, Uint8Array.from([4]))) transport.server.sendMessage(closeStreamMessage(MESSAGE_NUMBER, seq++, 0)) - const values = await takeAsync(stream) + const values = await takeAsync(clientStream) - expect(values).toEqual([Uint8Array.from([1]), Uint8Array.from([2]), Uint8Array.from([3]), Uint8Array.from([4])]) + expect(values).toEqual([Uint8Array.from([2]), Uint8Array.from([3]), Uint8Array.from([4])]) }) - it("Consume partial stream", async () => { + it("closed streams", async () => { let seq = 0 const MESSAGE_NUMBER = 4 + const PORT_ID = 0 const transport = instrumentMemoryTransports(MemoryTransport()) const dispatcher = messageNumberHandler(transport.client) const removeListenerSpy = jest.spyOn(dispatcher, "removeListener") - const stream = streamFromDispatcher( + + // create a client stream for the server + const clientStream = streamFromDispatcher( dispatcher, - StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, 0, Uint8Array.from([1]))), + StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, Uint8Array.of())), MESSAGE_NUMBER ) - transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, 0, Uint8Array.from([2]))) - transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, 0, Uint8Array.from([3]))) - transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, 0, Uint8Array.from([4]))) + // send three messags to the client + transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, Uint8Array.from([2]))) + transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, Uint8Array.from([3]))) + transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, Uint8Array.from([4]))) + // then the listener should not have been called because the stream was not processed yet expect(removeListenerSpy).not.toHaveBeenCalledWith(MESSAGE_NUMBER) - const values = await takeAsync(stream, 2) - expect(values).toEqual([Uint8Array.from([1]), Uint8Array.from([2])]) - // the listener should have ended because we finished the stream at two + + const values = await takeAsync(clientStream, 1) + expect(values).toEqual([Uint8Array.from([2])]) + + // the listener should have ended because we finished the stream at 1st expect(removeListenerSpy).toHaveBeenCalledWith(MESSAGE_NUMBER) // rest of the stream can be consumed beacuse we have it in memory - const rest = await takeAsync(stream, 2) + const rest = await takeAsync(clientStream, 2) expect(rest).toEqual([Uint8Array.from([3]), Uint8Array.from([4])]) }) it("closes the stream if the transport has an error", async () => { let seq = 0 const MESSAGE_NUMBER = 4 + const PORT_ID = 0 const transport = instrumentMemoryTransports(MemoryTransport()) const dispatcher = messageNumberHandler(transport.client) const removeListenerSpy = jest.spyOn(dispatcher, "removeListener") - const stream = streamFromDispatcher( + // create a client stream for the server + const clientStream = streamFromDispatcher( dispatcher, - StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, 0, Uint8Array.from([1]))), + StreamMessage.decode(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, Uint8Array.of())), MESSAGE_NUMBER ) + // send a message to the client + transport.server.sendMessage(streamMessage(MESSAGE_NUMBER, seq++, PORT_ID, Uint8Array.from([12]))) + expect(removeListenerSpy).not.toHaveBeenCalledWith(MESSAGE_NUMBER) transport.client.emit("error", new Error("TRANSPORT ERROR")) expect(removeListenerSpy).toHaveBeenCalledWith(MESSAGE_NUMBER) @@ -190,10 +179,10 @@ describe("streamFromDispatcher", () => { // the listener should have ended because we finished the stream due to an error let received: Uint8Array[] = [] await expect(async () => { - for await (const data of stream) { + for await (const data of clientStream) { received.push(data) } }).rejects.toThrow("RPC Transport failed") - expect(received).toEqual([Uint8Array.from([1])]) + expect(received).toEqual([Uint8Array.from([12])]) }) }) diff --git a/test/stream.spec.ts b/test/stream.spec.ts index 557d552..4a21f25 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -44,6 +44,7 @@ describe("Helpers simple req/res", () => { yield Uint8Array.from([3]) }, async *throwFirst() { + log('Will throw!') throw new Error("safe error 1") }, async *throwSecond() { @@ -59,10 +60,6 @@ describe("Helpers simple req/res", () => { yield new Uint8Array([counter % 0xff]) } }, - manualHackWithPushableChannel() { - channel = pushableChannel(() => void 0) - return channel.iterable as AsyncGenerator - }, async *parameterCounter(data) { let total = data[0] while (total > 0) { @@ -188,20 +185,4 @@ describe("Helpers simple req/res", () => { expect(remoteCallCounter).toEqual(localCallCounter) }) - - it("a remote manualHackWithPushableChannel is gracefully stopped from client side", async () => { - const { rpcClient } = await testEnv.start() - const port = await rpcClient.createPort("test1") - const module = (await port.loadModule("echo")) as { - manualHackWithPushableChannel(): Promise> - } - - const gen = (await module.manualHackWithPushableChannel())[Symbol.asyncIterator]() - - expect(channel.isClosed()).toEqual(false) - - await gen.return(null) - - expect(channel.isClosed()).toEqual(true) - }) }) From 0f9695a9d8943f1fddea5ee511c4f76c71a6675b Mon Sep 17 00:00:00 2001 From: menduz Date: Thu, 2 Jun 2022 22:26:16 -0300 Subject: [PATCH 4/4] new benchmark code --- Makefile | 2 +- perf.sh | 1 + test/benchmarks/allocation-bench.ts | 40 +++++++++ test/benchmarks/bench.ts | 128 +++++++--------------------- test/benchmarks/code.ts | 100 ++++++++++++++++++++++ 5 files changed, 171 insertions(+), 100 deletions(-) create mode 100644 test/benchmarks/allocation-bench.ts create mode 100644 test/benchmarks/code.ts diff --git a/Makefile b/Makefile index 146edb1..e160afa 100644 --- a/Makefile +++ b/Makefile @@ -67,7 +67,7 @@ cheap-perf: inspect: node_modules/.bin/tsc -p test/benchmarks/tsconfig.json - node --inspect-brk test/benchmarks/compiled/test/benchmarks/bench.js + node --inspect-brk test/benchmarks/compiled/test/benchmarks/allocation-bench.js integration-example: @cd example; ./build.sh diff --git a/perf.sh b/perf.sh index b3eb926..22e45be 100755 --- a/perf.sh +++ b/perf.sh @@ -2,6 +2,7 @@ node_modules/.bin/tsc -p test/benchmarks/tsconfig.json rm ./*.log || true +time node --prof --trace-deopt test/benchmarks/compiled/test/benchmarks/allocation-bench.js time node --prof --trace-deopt test/benchmarks/compiled/test/benchmarks/bench.js EXIT_CODE=$? for f in *.log; do node --prof-process "$f"; done diff --git a/test/benchmarks/allocation-bench.ts b/test/benchmarks/allocation-bench.ts new file mode 100644 index 0000000..7674b30 --- /dev/null +++ b/test/benchmarks/allocation-bench.ts @@ -0,0 +1,40 @@ +import {test} from './code' + +const ITERATIONS = 50 + +const yieldThread = () => new Promise(setImmediate) + +async function main() { + const functions = await test() + functions.printMemory() + for (let i = 0; i < ITERATIONS; i++) { + await functions.benchGetBooks() + await yieldThread() + } + functions.printMemory() + for (let i = 0; i < ITERATIONS; i++) { + await functions.benchBooks() + await yieldThread() + } + functions.printMemory() + for (let i = 0; i < ITERATIONS; i++) { + await functions.benchBooksNoAck() + await yieldThread() + } + functions.printMemory() + for (let i = 0; i < ITERATIONS; i++) { + await functions.benchBooks() + await yieldThread() + } + functions.printMemory() + for (let i = 0; i < ITERATIONS; i++) { + await functions.benchBooksNoAck() + await yieldThread() + } + functions.printMemory() +} + +main().catch((err) => { + console.error(err) + process.exit(1) +}) diff --git a/test/benchmarks/bench.ts b/test/benchmarks/bench.ts index 97fa4ab..03e4e6f 100644 --- a/test/benchmarks/bench.ts +++ b/test/benchmarks/bench.ts @@ -1,131 +1,58 @@ import { Suite } from "benchmark" -import * as helpers from "../helpers" -import { BookServiceDefinition } from "../codegen/client" -import { loadService, registerService } from "../../src/codegen" +import { test } from "./code" -const ITER_MULTIPLIER = 400 - -const books = [ - { author: "mr menduz", isbn: 1234, title: "1001 reasons to write your own OS" }, - { author: "mr cazala", isbn: 1111, title: "Advanced CSS" }, - { author: "mr mannakia", isbn: 7666, title: "Advanced binary packing" }, - { author: "mr kuruk", isbn: 7668, title: "Advanced bots AI" }, -] - -async function test() { - const testEnv = helpers.createSimpleTestEnvironment(async function (port) { - registerService(port, BookServiceDefinition, async () => ({ - async getBook(req) { - return { - author: "menduz", - isbn: req.isbn, - title: "Rpc onion layers", - } - }, - async *queryBooks(req) { - for (let i = 0; i < ITER_MULTIPLIER; i++) { - yield* books - } - }, - async *queryBooksNoAck(req) { - for (let i = 0; i < ITER_MULTIPLIER; i++) { - yield* books - } - }, - async *almostEmptyResponseStream() { throw new Error('not implemented') }, - async emptyQuery() { throw new Error('not implemented') }, - async emptyResponse() { throw new Error('not implemented') }, - async *emptyResponseStream() { throw new Error('not implemented') }, - async *infiniteGenerator() { throw new Error('not implemented') }, - async *failFirstGenerator() { throw new Error('not implemented') } - })) - }) - - const { rpcClient } = await testEnv.start() - const clientPort = await rpcClient.createPort("test1") - const service = loadService(clientPort, BookServiceDefinition) +async function main() { const suite = new Suite() - async function benchBooks(deferred) { - const results = [] - - for await (const book of service.queryBooks({ authorPrefix: "mr" })) { - results.push(book) - } - - if (results.length != ITER_MULTIPLIER * 4) throw new Error("Invalid number of results, got: " + results.length) - else deferred.resolve() - } - - async function benchBooksNoAck(deferred) { - const results = [] - - for await (const book of service.queryBooksNoAck({ authorPrefix: "mr" })) { - results.push(book) - } - - if (results.length != ITER_MULTIPLIER * 4) throw new Error("Invalid number of results, got: " + results.length) - else deferred.resolve() - } - - let memory: ReturnType = process.memoryUsage() - - function printMemory() { - const newMemory = process.memoryUsage() - - function toMb(num: number) { - return (num / 1024 / 1024).toFixed(2) + "MB" - } - - console.log(` - heapTotal: ${toMb(newMemory.heapTotal - memory.heapTotal)} - heapUsed: ${toMb(newMemory.heapUsed - memory.heapUsed)} - rss: ${toMb(newMemory.rss - memory.rss)} - arrayBuffers: ${toMb((newMemory as any).arrayBuffers - (memory as any).arrayBuffers)} - `) - - memory = newMemory - } + const functions = await test() suite .add("PREWARM GetBook", { defer: true, async fn(deferred) { - for (let i = 0; i < ITER_MULTIPLIER; i++) { - const ret = await service.getBook({ isbn: 1234 }) - if (ret.isbn != 1234) throw new Error("invalid number") - } + await functions.benchGetBooks() deferred.resolve() }, }) .add("PREWARM QueryBooks", { defer: true, - fn: benchBooks, + async fn(deferred) { + await functions.benchBooks() + deferred.resolve() + }, }) .add("QPREWARM ueryBooksNoAck", { defer: true, - fn: benchBooksNoAck, + async fn(deferred) { + await functions.benchBooksNoAck() + + deferred.resolve() + }, }) .add("QueryBooks", { defer: true, - fn: benchBooks, + async fn(deferred) { + await functions.benchBooks() + deferred.resolve() + }, }) .add("GetBook", { defer: true, async fn(deferred) { - for (let i = 0; i < ITER_MULTIPLIER; i++) { - const ret = await service.getBook({ isbn: 1234 }) - if (ret.isbn != 1234) throw new Error("invalid number") - } + await functions.benchGetBooks() deferred.resolve() }, }) .add("QueryBooksNoAck", { defer: true, - fn: benchBooksNoAck, + async fn(deferred) { + await functions.benchBooksNoAck() + + deferred.resolve() + }, }) .on("cycle", function (event) { console.log(String(event.target)) @@ -136,15 +63,18 @@ async function test() { process.exitCode = 1 } - printMemory() + functions.printMemory() }) .on("complete", function (event) { - printMemory() + functions.printMemory() + functions.clientPort.close() + functions.transportClient.close() + functions.transportServer.close() }) .run({ async: true }) } -test().catch((err) => { +main().catch((err) => { console.error(err) process.exit(1) }) diff --git a/test/benchmarks/code.ts b/test/benchmarks/code.ts new file mode 100644 index 0000000..73f3db6 --- /dev/null +++ b/test/benchmarks/code.ts @@ -0,0 +1,100 @@ +import * as helpers from "../helpers" +import { BookServiceDefinition } from "../codegen/client" +import { loadService, registerService } from "../../src/codegen" + +const ITER_MULTIPLIER = 400 + +const books = [ + { author: "mr menduz", isbn: 1234, title: "1001 reasons to write your own OS" }, + { author: "mr cazala", isbn: 1111, title: "Advanced CSS" }, + { author: "mr mannakia", isbn: 7666, title: "Advanced binary packing" }, + { author: "mr kuruk", isbn: 7668, title: "Advanced bots AI" }, +] + +export async function test() { + const testEnv = helpers.createSimpleTestEnvironment(async function (port) { + registerService(port, BookServiceDefinition, async () => ({ + async getBook(req) { + return { + author: "menduz", + isbn: req.isbn, + title: "Rpc onion layers", + } + }, + async *queryBooks(req) { + for (let i = 0; i < ITER_MULTIPLIER; i++) { + yield* books + } + }, + async *queryBooksNoAck(req) { + for (let i = 0; i < ITER_MULTIPLIER; i++) { + yield* books + } + }, + async *almostEmptyResponseStream() { throw new Error('not implemented') }, + async emptyQuery() { throw new Error('not implemented') }, + async emptyResponse() { throw new Error('not implemented') }, + async *emptyResponseStream() { throw new Error('not implemented') }, + async *infiniteGenerator() { throw new Error('not implemented') }, + async *failFirstGenerator() { throw new Error('not implemented') } + })) + }) + + const { rpcClient, rpcServer, transportClient, transportServer } = await testEnv.start() + const clientPort = await rpcClient.createPort("test1") + const service = loadService(clientPort, BookServiceDefinition) + + async function benchBooks() { + const results = [] + + for await (const book of service.queryBooks({ authorPrefix: "mr" })) { + results.push(book) + } + + if (results.length != ITER_MULTIPLIER * 4) throw new Error("Invalid number of results, got: " + results.length) + } + + async function benchBooksNoAck() { + const results = [] + + for await (const book of service.queryBooksNoAck({ authorPrefix: "mr" })) { + results.push(book) + } + + if (results.length != ITER_MULTIPLIER * 4) throw new Error("Invalid number of results, got: " + results.length) + } + + let memory: ReturnType = process.memoryUsage() + + function printMemory() { + const newMemory = process.memoryUsage() + + function toMb(num: number) { + return (num / 1024 / 1024).toFixed(2) + "MB" + } + + console.log(` + heapTotal: ${toMb(newMemory.heapTotal - memory.heapTotal)} + heapUsed: ${toMb(newMemory.heapUsed - memory.heapUsed)} + rss: ${toMb(newMemory.rss - memory.rss)} + arrayBuffers: ${toMb((newMemory as any).arrayBuffers - (memory as any).arrayBuffers)} + `) + + memory = newMemory + } + + return { + benchBooks, + benchBooksNoAck, + async benchGetBooks() { + for (let i = 0; i < ITER_MULTIPLIER; i++) { + const ret = await service.getBook({ isbn: 1234 }) + if (ret.isbn != 1234) throw new Error("invalid number") + } + }, + printMemory, + clientPort, + transportClient, + transportServer, + } +}