Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: strange case not closing stream #50

Merged
merged 4 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ test/codegen/*_pb.*

src/protocol/index.ts
test/codegen/client.ts
test/benchmarks/compilated
test/benchmarks/compiled
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"${workspaceRoot}/node_modules/.bin/jest",
"--runInBand",
"--coverage",
// "test/ws.spec.ts"
"test/push-channel.spec.ts"
],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen",
Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ test:
--ts_proto_out="$(PWD)/test/codegen" \
-I="$(PWD)/test/codegen" \
"$(PWD)/test/codegen/client.proto"
node_modules/.bin/jest --detectOpenHandles --colors --runInBand $(TESTARGS) --coverage $(TEST_FILE)
SIMMULATE_JITTER=false node_modules/.bin/jest --detectOpenHandles --colors --runInBand $(TESTARGS) --coverage $(TEST_FILE)
SIMMULATE_JITTER=true node_modules/.bin/jest --detectOpenHandles --colors --runInBand $(TESTARGS) $(TEST_FILE)
$(MAKE) integration-example

test-watch:
node_modules/.bin/jest --detectOpenHandles --colors --runInBand --watch $(TESTARGS) --coverage
INSTRUMENT_TRANSPORT=true node_modules/.bin/jest --detectOpenHandles --colors --runInBand --watch $(TESTARGS) --coverage

build:
node_modules/.bin/ts-node scripts/generate-proto-file.ts
Expand All @@ -66,7 +67,7 @@ cheap-perf:

inspect:
node_modules/.bin/tsc -p test/benchmarks/tsconfig.json
node --inspect-brk test/benchmarks/compilated/test/benchmarks/bench.js
node --inspect-brk test/benchmarks/compiled/test/benchmarks/allocation-bench.js

integration-example:
@cd example; ./build.sh
Expand Down
33 changes: 15 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,35 +63,32 @@ C->S: Request {procedure_id, payload}
S->C: Response {message_id, payload}
```

#### Getting an async stream
#### Getting an async stream (closed by client)

```sequence
participant Scene (client) as C
participant Kernel (server) as S
C->S: Request {message_id}
S->C: Response {message_id,streaming=true}
S->C: Response {message_id,streaming=true,seqId=0}
C->C: Generate async iterator for {message_id}
S-->C: StreamMessage {message_id,payload}
S-->C:
S-->C:
C->S: StreamMessage {ack=true,message_id,seqId=0}
note over C: Ask for a new item to be generated using ack=true
S-->C: StreamMessage {message_id,payload,seqId=1}
C->S: StreamMessage {ack=true,message_id,seqId=1}
note over C: Close the message by responding\nthe last ACK with ack=true,closed=true
S-->C: StreamMessage {message_id,payload,seqId=2}
C->S: StreamMessage {ack=true,message_id,seqId=2,closed=true}
S->S: Close async Generator
C->C: Close async Iterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this close async iterator be here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the C->C: Close async Iterator is duplicated and I think the last one is the right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the server has a generator, the client has an iterator. I think it is sound

S-->C: StreamMessage {message_id,closed=true}
C->C: Close async iterator
```

#### Closing an async stream from the Scene

```sequence
participant Scene (client) as C
participant Kernel (server) as S
C->S: Request {message_id}
S->C: Response {message_id,streaming=true}
C->C: Generate async iterator for {message_id}
S-->C: StreamMessage {message_id,payload}
S-->C:
S-->C:
C->C: Close async iterator
C-->S: StreamMessage {message_id,closed=true}
```
#### Getting an async stream (closed by server)

The server will send a special StreamMessage with a new SeqId to tell the client that a stream (generator)
was closed

# Implementation of the interfaces

Expand Down
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@types/jest": "^27.0.1",
"@types/ws": "^8.5.3",
"benchmark": "^2.1.4",
"fp-future": "^1.0.1",
"jest": "^27.0.6",
"rxjs": "^7.5.5",
"ts-jest": "^27.0.5",
Expand Down
3 changes: 2 additions & 1 deletion perf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

node_modules/.bin/tsc -p test/benchmarks/tsconfig.json
rm ./*.log || true
time node --prof test/benchmarks/compilated/test/benchmarks/bench.js
time node --prof --trace-deopt test/benchmarks/compiled/test/benchmarks/allocation-bench.js
time node --prof --trace-deopt test/benchmarks/compiled/test/benchmarks/bench.js
EXIT_CODE=$?
for f in *.log; do node --prof-process "$f"; done
exit $EXIT_CODE
12 changes: 10 additions & 2 deletions src/ack-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@ export function createAckHelper(transport: Transport): AckDispatcher {

const bb = new Writer()

transport.on("close", () => {
function closeAll() {
const err = new Error("Transport closed while waiting the ACK")
oneTimeCallbacks.forEach(([_resolve, reject]) => reject(err))
oneTimeCallbacks.forEach(([, reject]) => reject(err))
oneTimeCallbacks.clear()
}

transport.on("close", closeAll)
transport.on("error", err => {
oneTimeCallbacks.forEach(([, reject]) => reject(err))
oneTimeCallbacks.clear()
})

Expand All @@ -26,6 +32,8 @@ export function createAckHelper(transport: Transport): AckDispatcher {
if (fut) {
oneTimeCallbacks.delete(key)
fut[0](data)
} else {
throw new Error('Received a message for an inexistent handler ' + key)
}
},
async sendWithAck(data: StreamMessage): Promise<StreamMessage> {
Expand Down
80 changes: 52 additions & 28 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { CallableProcedureClient, ClientModuleDefinition, RpcClient, RpcClientPort, RpcPortEvents } from "."
import { Transport } from "./types"
import mitt from "mitt"
import future, { IFuture } from "fp-future"
import { Writer } from "protobufjs/minimal"
import {
CreatePort,
Expand All @@ -15,7 +16,7 @@ import {
StreamMessage,
} from "./protocol"
import { MessageDispatcher, messageNumberHandler } from "./message-number-handler"
import { pushableChannel } from "./push-channel"
import { AsyncQueue, linkedList, pushableChannel } from "./push-channel"
import {
calculateMessageIdentifier,
closeStreamMessage,
Expand Down Expand Up @@ -91,51 +92,74 @@ function throwIfRemoteError(parsedMessage: RemoteError) {
throw new Error("RemoteError: " + parsedMessage.errorMessage)
}

// @internal

/**
* If a StreamMessage is received, then it means we have the POSSIBILITY to
* consume a remote generator. The client must answer every ACK with the next
* inteded action, could be: next(), close(). Both actions are serialized in the
* StreamMessage. The server MUST NOT generate any new element of the generator
* if the client doesn't ask for it.
*
* The whole protocol is designed to be SLOW AND SECURE, that means, ACKs (slow)
* will block the generation and consumption of iterators (secure).
*
* That exist to save the memory of the servers and to generate the much needed
* backpressure.
*
* If throughput is what you are looking for, you may better use bigger messages
* containing serialized lists. Effectively reducing the number of messages
* and increasing their size.
*
* @internal
*/
Comment on lines +96 to +114
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

export function streamFromDispatcher(
dispatcher: MessageDispatcher,
streamMessage: StreamMessage,
messageNumber: number
): AsyncGenerator<Uint8Array> {
const channel = pushableChannel<Uint8Array>(localIteratorClosed)

let lastReceivedSequenceId = 0
let isRemoteClosed = false

const channel = new AsyncQueue<Uint8Array>(sendServerSignals)

dispatcher.transport.on("close", () => {
if (!channel.isClosed()) {
channel.failAndClose(new Error("RPC Transport closed"))
}
channel.close(new Error("RPC Transport closed"))
})

dispatcher.transport.on("error", () => {
if (!channel.isClosed()) {
channel.failAndClose(new Error("RPC Transport failed"))
}
channel.close(new Error("RPC Transport failed"))
})

function localIteratorClosed() {
if (!isRemoteClosed) {
dispatcher.transport.sendMessage(closeStreamMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId))
// This function is called at two moments
// 1. When the channel is closed or fails -> an ACK closing the stream is sent to the server
// 2. When the channel.next() is called -> an ACK requesting the next elem is sent to the server
function sendServerSignals(_channel: AsyncQueue<Uint8Array>, action: "close" | "next") {
if (action == "close") {
dispatcher.removeListener(messageNumber)
}
dispatcher.removeListener(messageNumber)
}

function sendAck() {
const closed = channel.isClosed()
if (!closed && !isRemoteClosed) {
dispatcher.transport.sendMessage(streamAckMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId))
if (!isRemoteClosed) {
if (action == "close") {
dispatcher.transport.sendMessage(closeStreamMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId))
} else if (action == "next") {
dispatcher.transport.sendMessage(streamAckMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId))
}
}
}

// receive a message from the server and send it to the iterable channel
function processMessage(message: StreamMessage) {
lastReceivedSequenceId = message.sequenceId

if (message.closed) {
// when the server CLOSES the stream, then we raise the flag isRemoteClosed
// to prevent sending an extra closeStreamMessage to the server after closing
// our channel.
// IMPORTANT: If the server closes the connection, then we DONT send the ACK
// back to the server because it is redundant information.
isRemoteClosed = true
channel.close()
} else {
channel.push(message.payload).then(sendAck).catch(channel.failAndClose)
channel.enqueue(message.payload)
}
}

Expand All @@ -148,20 +172,18 @@ export function streamFromDispatcher(
processMessage(message)
} else if (messageType == RpcMessageTypes.RpcMessageTypes_REMOTE_ERROR_RESPONSE) {
isRemoteClosed = true
channel.failAndClose(
channel.close(
new Error("RemoteError: " + ((message as RemoteError).errorMessage || "Unknown remote error"))
)
} else {
channel.failAndClose(new Error("RemoteError: Protocol error"))
channel.close(new Error("RemoteError: Protocol error"))
}
} else {
channel.failAndClose(new Error("RemoteError: Protocol error"))
channel.close(new Error("RemoteError: Protocol error"))
}
})

processMessage(streamMessage)

return channel.iterable
return channel
}

// @internal
Expand Down Expand Up @@ -199,10 +221,12 @@ function createProcedure(portId: number, procedureId: number, dispatcher: Messag
return undefined
}
} else if (messageType == RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE) {
// If a StreamMessage is received, then it means we have the POSSIBILITY
// to consume a remote generator. Look into the streamFromDispatcher functions
// for more information.
return streamFromDispatcher(dispatcher, message, messageNumber)
} else if (messageType == RpcMessageTypes.RpcMessageTypes_REMOTE_ERROR_RESPONSE) {
throwIfRemoteError(message)
debugger
}
}
}
Expand Down
Loading