Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust JS API to support multi-stream connections #2546

Merged
merged 2 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions bin/wasm-node/javascript/src/index-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ function trustedBase64Decode(base64: string): Uint8Array {
connection.binaryType = 'arraybuffer';

connection.onopen = () => {
config.onOpen();
config.onOpen({ type: 'single-stream' });
};
connection.onclose = (event) => {
const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : "");
config.onClose(message);
config.onConnectionClose(message);
};
connection.onmessage = (msg) => {
config.onMessage(new Uint8Array(msg.data as ArrayBuffer));
Expand All @@ -136,6 +136,8 @@ function trustedBase64Decode(base64: string): Uint8Array {

send: (data: Uint8Array): void => {
connection.send(data);
}
},

openOutSubstream: () => { throw new Error('Wrong connection type') }
};
}
14 changes: 8 additions & 6 deletions bin/wasm-node/javascript/src/index-deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
connection.socket.binaryType = 'arraybuffer';

connection.socket.onopen = () => {
config.onOpen();
config.onOpen({ type: 'single-stream' });
};
connection.socket.onclose = (event) => {
const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : "");
config.onClose(message);
config.onConnectionClose(message);
};
connection.socket.onmessage = (msg) => {
config.onMessage(new Uint8Array(msg.data as ArrayBuffer));
Expand Down Expand Up @@ -168,7 +168,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean

if (socket.destroyed)
return established;
config.onOpen();
config.onOpen({ type: 'single-stream' });

// Spawns an asynchronous task that continuously reads from the socket.
// Every time data is read, the task re-executes itself in order to continue reading.
Expand All @@ -190,7 +190,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
// The socket is reported closed, but `socket.destroyed` is still `false` (see
// check above). As such, we must inform the inner layers.
socket.destroyed = true;
config.onClose(outcome === null ? "EOF when reading socket" : outcome);
config.onConnectionClose(outcome === null ? "EOF when reading socket" : outcome);
return;
}
console.assert(outcome !== 0); // `read` guarantees to return a non-zero value.
Expand Down Expand Up @@ -248,7 +248,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
// The socket is reported closed, but `socket.destroyed` is still
// `false` (see check above). As such, we must inform the inner layers.
socket.destroyed = true;
config.onClose(outcome);
config.onConnectionClose(outcome);
return c;
}
// Note that, contrary to `read`, it is possible for `outcome` to be 0.
Expand All @@ -259,7 +259,9 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
return c;
});
}
}
},

openOutSubstream: () => { throw new Error('Wrong connection type') }
};
}

Expand Down
12 changes: 7 additions & 5 deletions bin/wasm-node/javascript/src/index-nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
connection.socket.binaryType = 'arraybuffer';

connection.socket.onopen = () => {
config.onOpen();
config.onOpen({ type: 'single-stream' });
};
connection.socket.onclose = (event) => {
const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : "");
config.onClose(message);
config.onConnectionClose(message);
};
connection.socket.onmessage = (msg) => {
config.onMessage(new Uint8Array(msg.data as ArrayBuffer));
Expand All @@ -129,14 +129,14 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean

connection.socket.on('connect', () => {
if (socket.destroyed) return;
config.onOpen();
config.onOpen({ type: 'single-stream' });
});
connection.socket.on('close', (hasError) => {
if (socket.destroyed) return;
// NodeJS doesn't provide a reason why the closing happened, but only
// whether it was caused by an error.
const message = hasError ? "Error" : "Closed gracefully";
config.onClose(message);
config.onConnectionClose(message);
});
connection.socket.on('error', () => { });
connection.socket.on('data', (message) => {
Expand Down Expand Up @@ -173,7 +173,9 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
// TCP
connection.socket.write(data);
}
}
},

openOutSubstream: () => { throw new Error('Wrong connection type') }
};
}

Expand Down
110 changes: 87 additions & 23 deletions bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,40 @@ export interface Config {
*/
export interface Connection {
/**
* Transitions the connection to the `Closed` state.
* Transitions the connection or one of its substreams to the `Closed` state.
*
* The `config.onClose` callback is **not** called.
* If the connection is of type "single-stream", the whole connection must be shut down.
* If the connection is of type "multi-stream", a `streamId` can be provided, in which case
* only the given substream is shut down.
*
* The `config.onClose` or `config.onStreamClose` callbacks are **not** called.
*
* The transition is performed in the background.
* None of the callbacks passed to the `Config` will be called again.
* If the whole connection is to be shut down, none of the callbacks passed to the `Config`
* must be called again. If only a substream is shut down, the `onStreamClose` and `onMessage`
* callbacks must not be called again with that substream.
*/
close(): void;
close(streamId?: number): void;

/**
* Queues data to be sent on the given connection.
*
* The connection must currently be in the `Open` state.
*
* The `streamId` must be provided if and only if the connection is of type "multi-stream".
* It indicates which substream to send the data on.
*/
send(data: Uint8Array, streamId?: number): void;

/**
* Start opening an additional outbound substream on the given connection.
*
* The state of the connection must be `Open`. This function must only be called for
* connections of type "multi-stream".
*
* The `onStreamOpened` callback must later be called with an outbound direction.
*/
send(data: Uint8Array): void;
openOutSubstream(): void;
}

/**
Expand All @@ -109,22 +128,43 @@ export interface ConnectionConfig {

/**
* Callback called when the connection transitions from the `Opening` to the `Open` state.
*
* Must only be called once per connection.
*/
onOpen: () => void;
onOpen: (info: { type: 'single-stream' } | { type: 'multi-stream', peerId: Uint8Array }) => void;

/**
* Callback called when the connection transitions to the `Closed` state.
*
* It it **not** called if `Connection.close` is manually called by the API user.
*/
onClose: (message: string) => void;
onConnectionClose: (message: string) => void;

/**
* Callback called when a new substream has been opened.
*
* This function must only be called for connections of type "multi-stream".
*/
onStreamOpened: (streamId: number, direction: 'inbound' | 'outbound') => void;

/**
* Callback called when a stream transitions to the `Closed` state.
*
* It it **not** called if `Connection.closeStream` is manually called by the API user.
*
* This function must only be called for connections of type "multi-stream".
*/
onStreamClose: (streamId: number) => void;

/**
* Callback called when a message sent by the remote has been received.
*
* Can only happen while the connection is in the `Open` state.
*
* The `streamId` parameter must be provided if and only if the connection is of type
* "multi-stream".
*/
onMessage: (message: Uint8Array) => void;
onMessage: (message: Uint8Array, streamId?: number) => void;
}

/**
Expand Down Expand Up @@ -279,13 +319,24 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports,

const connec = config.connect({
address,
onOpen: () => {
onOpen: (info) => {
if (killedTracked.killed) return;
try {
instance.exports.connection_open_single_stream(connectionId);
switch (info.type) {
case 'single-stream': {
instance.exports.connection_open_single_stream(connectionId);
break
}
case 'multi-stream': {
const ptr = instance.exports.alloc(info.peerId.length) >>> 0;
new Uint8Array(instance.exports.memory.buffer).set(info.peerId, ptr);
instance.exports.connection_open_multi_stream(connectionId, ptr, info.peerId.length);
break
}
}
} catch(_error) {}
},
onClose: (message: string) => {
onConnectionClose: (message: string) => {
if (killedTracked.killed) return;
try {
const encoded = new TextEncoder().encode(message)
Expand All @@ -294,14 +345,27 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports,
instance.exports.connection_closed(connectionId, ptr, encoded.length);
} catch(_error) {}
},
onMessage: (message: Uint8Array) => {
onMessage: (message: Uint8Array, streamId?: number) => {
if (killedTracked.killed) return;
try {
const ptr = instance.exports.alloc(message.length) >>> 0;
new Uint8Array(instance.exports.memory.buffer).set(message, ptr)
instance.exports.stream_message(connectionId, 0, ptr, message.length);
instance.exports.stream_message(connectionId, streamId || 0, ptr, message.length);
} catch(_error) {}
},
onStreamOpened: (streamId: number, direction: 'inbound' | 'outbound') => {
if (killedTracked.killed) return;
try {
instance.exports.connection_stream_opened(connectionId, streamId, direction === 'outbound');
} catch(_error) {}
},
onStreamClose: (streamId: number) => {
if (killedTracked.killed) return;
try {
instance.exports.stream_closed(connectionId, streamId);
} catch(_error) {}
}

});

connections[connectionId] = connec;
Expand Down Expand Up @@ -332,21 +396,21 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports,
delete connections[connectionId];
},

// Opens a new substream on a multi-stream connection
connection_stream_open: (_connectionId: number) => {
// Given that multi-stream connections are never opened at the moment, this function
// should never be called.
// Opens a new substream on a multi-stream connection.
connection_stream_open: (connectionId: number) => {
const connection = connections[connectionId]!;
connection.openOutSubstream()
},

// Closes a substream on a multi-stream connection
connection_stream_close: (_connectionId: number, _streamId: number) => {
// Given that multi-stream connections are never opened at the moment, this function
// should never be called.
// Closes a substream on a multi-stream connection.
connection_stream_close: (connectionId: number, streamId: number) => {
const connection = connections[connectionId]!;
connection.close(streamId)
},

// Must queue the data found in the WebAssembly memory at the given pointer. It is assumed
// that this function is called only when the connection is in an open state.
stream_send: (connectionId: number, _streamId: number, ptr: number, len: number) => {
stream_send: (connectionId: number, streamId: number, ptr: number, len: number) => {
if (killedTracked.killed) return;

const instance = config.instance!;
Expand All @@ -356,7 +420,7 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports,

const data = new Uint8Array(instance.exports.memory.buffer).slice(ptr, ptr + len);
const connection = connections[connectionId]!;
connection.send(data);
connection.send(data, streamId); // TODO: docs says the streamId is provided only for multi-stream connections, but here it's always provided
},

current_task_entered: (ptr: number, len: number) => {
Expand Down