Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime/http): websocket support #10359

Merged
merged 44 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
c17a720
add websocket upgrade
crowlKats Apr 16, 2021
e63b02b
Merge branch 'main' into native_http_websocket
crowlKats Apr 24, 2021
6de42f0
add ws
crowlKats Apr 24, 2021
a313886
handle sec-websocket-key
crowlKats Apr 24, 2021
a94cbca
handle closing and add typings
crowlKats Apr 25, 2021
de350f5
basic error handling
crowlKats Apr 25, 2021
dc9056d
Update cli/dts/lib.deno.unstable.d.ts
crowlKats May 5, 2021
48e0e23
rename upgradeWebSocket
crowlKats May 5, 2021
2499a5a
Merge branch 'main' into native_http_websocket
crowlKats May 5, 2021
b494a44
clean up
crowlKats May 5, 2021
35d375a
clean up
crowlKats May 5, 2021
85639fb
clean up
crowlKats May 5, 2021
e855c63
Update runtime/js/40_http.js
crowlKats May 5, 2021
b8aa871
fix
crowlKats May 5, 2021
afe1d99
Merge branch 'main' into native_http_websocket
crowlKats Jun 16, 2021
e149890
Merge branch 'main' into native_http_websocket
crowlKats Jun 21, 2021
d4a51bf
work
crowlKats Jun 21, 2021
613636d
wip
lucacasonato Jun 16, 2021
9b32625
work
crowlKats Jun 21, 2021
fa193cf
fix
crowlKats Jun 22, 2021
dddfc29
work
crowlKats Jun 30, 2021
7de7397
fix
lucacasonato Jun 30, 2021
0a70386
cleanup
lucacasonato Jun 30, 2021
4fe1e79
it works!
lucacasonato Jun 30, 2021
aad5ee6
don't leak resources
lucacasonato Jun 30, 2021
087aed1
clean up & remove hyper_tungstenite
crowlKats Jul 1, 2021
15ae4c0
Merge branch 'main' into native_http_websocket
crowlKats Jul 1, 2021
9e2b416
clean up
crowlKats Jul 1, 2021
2dc7e8c
Merge branch 'main' into native_http_websocket
crowlKats Jul 1, 2021
66ba74d
clean up
crowlKats Jul 1, 2021
d940e7e
add test & lint
crowlKats Jul 1, 2021
c86e7bb
add protocol selection support
crowlKats Jul 1, 2021
79c324e
fix
crowlKats Jul 1, 2021
54941c1
fix
crowlKats Jul 1, 2021
588c563
errors
crowlKats Jul 1, 2021
8a1f533
adjust test
crowlKats Jul 1, 2021
8f99e87
adjust test
crowlKats Jul 1, 2021
46d6591
fix
crowlKats Jul 2, 2021
17d5038
use boxed dyn Future for connection
lucacasonato Jul 7, 2021
65536d1
Merge remote-tracking branch 'origin/main' into native_http_websocket
lucacasonato Jul 7, 2021
05b8166
lint + fmt
lucacasonato Jul 7, 2021
fc8b27f
Merge remote-tracking branch 'origin/main' into native_http_websocket
lucacasonato Jul 8, 2021
8a9e486
generate accept header sync
crowlKats Jul 8, 2021
ac56367
Merge branch 'main' into native_http_websocket
lucacasonato Jul 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions cli/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1153,9 +1153,42 @@ declare namespace Deno {

export function memoryUsage(): MemoryUsage;

export interface WebSocketConnEventString {
kind: "text";
value: string;
}

export interface WebSocketConnEventBinary {
kind: "binary";
value: Uint8Array;
}

export interface WebSocketConnEventClose {
kind: "close";
value: { code: number; reason: string };
}

export type WebSocketConnEvent =
| WebSocketConnEventString
| WebSocketConnEventBinary
| WebSocketConnEventClose;

export interface WebSocketConn extends AsyncIterable<WebSocketConnEvent> {
send(kind: "text", data: string): Promise<void>;
send(kind: "binary", data: Uint8Array): Promise<void>;

close(code?: number, reason?: string): Promise<void>;
}

export interface WebSocketUpgrade {
response: Response;
ws: WebSocketConn;
}

export interface RequestEvent {
readonly request: Request;
respondWith(r: Response | Promise<Response>): Promise<void>;
upgradeWebSocket(): Promise<WebSocketUpgrade>;
}

export interface HttpConn extends AsyncIterable<RequestEvent> {
Expand Down
36 changes: 18 additions & 18 deletions extensions/websocket/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use std::io::Cursor;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::TcpStream;
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
use tokio_tungstenite::tungstenite::{
Expand Down Expand Up @@ -63,24 +65,22 @@ impl WebSocketPermissions for NoWebSocketPermissions {
}
}

type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
struct WsStreamResource {
tx: AsyncRefCell<SplitSink<WsStream, Message>>,
rx: AsyncRefCell<SplitStream<WsStream>>,
pub type WsStream = MaybeTlsStream<TcpStream>;
pub struct WsStreamResource<T: AsyncRead + AsyncWrite> {
pub tx: AsyncRefCell<SplitSink<WebSocketStream<T>, Message>>,
pub rx: AsyncRefCell<SplitStream<WebSocketStream<T>>>,
// When a `WsStreamResource` resource is closed, all pending 'read' ops are
// canceled, while 'write' ops are allowed to complete. Therefore only
// 'read' futures are attached to this cancel handle.
cancel: CancelHandle,
pub cancel: CancelHandle,
}

impl Resource for WsStreamResource {
impl<T: AsyncRead + AsyncWrite> Resource for WsStreamResource<T> {
fn name(&self) -> Cow<str> {
"webSocketStream".into()
}
}

impl WsStreamResource {}

// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
Expand Down Expand Up @@ -174,7 +174,7 @@ where
_ => unreachable!(),
};

let (stream, response): (WsStream, Response) =
let (stream, response): (WebSocketStream<WsStream>, Response) =
client_async(request, socket).await.map_err(|err| {
type_error(format!(
"failed to connect to WebSocket: {}",
Expand Down Expand Up @@ -216,7 +216,7 @@ pub struct SendArgs {
text: Option<String>,
}

pub async fn op_ws_send(
pub async fn op_ws_send<T: AsyncRead + AsyncWrite>(
state: Rc<RefCell<OpState>>,
args: SendArgs,
buf: Option<ZeroCopyBuf>,
Expand All @@ -232,7 +232,7 @@ pub async fn op_ws_send(
let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)
.get::<WsStreamResource<T>>(rid)
.ok_or_else(bad_resource_id)?;
let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
tx.send(msg).await?;
Expand All @@ -247,7 +247,7 @@ pub struct CloseArgs {
reason: Option<String>,
}

pub async fn op_ws_close(
pub async fn op_ws_close<T: AsyncRead + AsyncWrite>(
state: Rc<RefCell<OpState>>,
args: CloseArgs,
_bufs: Option<ZeroCopyBuf>,
Expand All @@ -264,7 +264,7 @@ pub async fn op_ws_close(
let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)
.get::<WsStreamResource<T>>(rid)
.ok_or_else(bad_resource_id)?;
let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
tx.send(msg).await?;
Expand All @@ -283,15 +283,15 @@ pub enum NextEventResponse {
Closed,
}

pub async fn op_ws_next_event(
pub async fn op_ws_next_event<T: AsyncRead + AsyncWrite>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_bufs: Option<ZeroCopyBuf>,
) -> Result<NextEventResponse, AnyError> {
let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)
.get::<WsStreamResource<T>>(rid)
.ok_or_else(bad_resource_id)?;

let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await;
Expand Down Expand Up @@ -334,9 +334,9 @@ pub fn init<P: WebSocketPermissions + 'static>(
op_sync(op_ws_check_permission::<P>),
),
("op_ws_create", op_async(op_ws_create::<P>)),
("op_ws_send", op_async(op_ws_send)),
("op_ws_close", op_async(op_ws_close)),
("op_ws_next_event", op_async(op_ws_next_event)),
("op_ws_send", op_async(op_ws_send::<WsStream>)),
("op_ws_close", op_async(op_ws_close::<WsStream>)),
("op_ws_next_event", op_async(op_ws_next_event::<WsStream>)),
])
.state(move |state| {
state.put::<WsUserAgent>(WsUserAgent(user_agent.clone()));
Expand Down
1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ deno_webidl = { path = "../extensions/webidl", version = "0.5.0" }
deno_websocket = { path = "../extensions/websocket", version = "0.10.0" }

atty = "0.2.14"
base64 = "0.13.0"
bytes = "1"
dlopen = "0.1.8"
encoding_rs = "0.8.28"
Expand Down
63 changes: 62 additions & 1 deletion runtime/js/40_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@
const request = fromInnerRequest(innerRequest, "immutable");

const respondWith = createRespondWith(responseSenderRid, this.#rid);
const upgradeWebSocket = createUpgradeWebSocket(requestBodyRid);

return { request, respondWith };
return { request, respondWith, upgradeWebSocket };
crowlKats marked this conversation as resolved.
Show resolved Hide resolved
}

/** @returns {void} */
Expand Down Expand Up @@ -219,6 +220,66 @@
});
}

function createUpgradeWebSocket(rid) {
return async function upgradeWebSocket() {
const { key, rid } = await core.opAsync(
"op_http_upgrade_websocket",
rid,
);

const response = new Response(undefined, {
status: 101,
headers: {
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Accept": key,
},
});

const ws = {
async *[Symbol.asyncIterator]() {
let data = await core.opAsync(
"op_http_ws_next_event",
rid,
);

if (data.kind === "error") {
throw new Error(data.value);
} else if (data.kind === "close") {
return data;
} else {
yield data;
}
},
async send(data) {
if (typeof data === "string") {
await core.opAsync("op_http_ws_close", {
rid,
kind: "text",
text: data,
});
} else if (data instanceof Uint8Array) {
await core.opAsync("op_http_ws_close", {
rid,
kind: "binary",
}, data);
} else {
throw new TypeError("Only string or Uint8Array can be sent");
}
crowlKats marked this conversation as resolved.
Show resolved Hide resolved
},
async close(code, reason) {
await core.opAsync("op_http_ws_close", {
rid,
code,
reason,
});
},
};

return { response, ws };
};
}

window.__bootstrap.http = {
serveHttp,
};
Expand Down
75 changes: 75 additions & 0 deletions runtime/ops/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use deno_core::error::null_opbuf;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::futures::stream::SplitStream;
use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
Expand All @@ -22,11 +23,15 @@ use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use deno_websocket::tokio_tungstenite::tungstenite::Message;
use deno_websocket::tokio_tungstenite::WebSocketStream;
use deno_websocket::WsStreamResource;
use hyper::body::HttpBody;
use hyper::http;
use hyper::server::conn::Connection;
use hyper::server::conn::Http;
use hyper::service::Service as HyperService;
use hyper::upgrade::Upgraded;
use hyper::Body;
use hyper::Request;
use hyper::Response;
Expand Down Expand Up @@ -55,6 +60,22 @@ pub fn init() -> Extension {
("op_http_response", op_async(op_http_response)),
("op_http_response_write", op_async(op_http_response_write)),
("op_http_response_close", op_async(op_http_response_close)),
(
"op_http_upgrade_websocket",
op_async(op_http_upgrade_websocket),
),
(
"op_http_ws_send",
op_async(deno_websocket::op_ws_send::<WebSocketStream<Upgraded>>),
),
(
"op_http_ws_close",
op_async(deno_websocket::op_ws_close::<WebSocketStream<Upgraded>>),
),
(
"op_http_ws_next_event",
op_async(deno_websocket::op_ws_next_event::<WebSocketStream<Upgraded>>),
),
])
.build()
}
Expand Down Expand Up @@ -507,6 +528,60 @@ async fn op_http_response_write(
Ok(())
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct HttpUpgradeWebsocketResponse {
rid: ResourceId,
key: String,
}

async fn op_http_upgrade_websocket(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_data: Option<ZeroCopyBuf>,
) -> Result<HttpUpgradeWebsocketResponse, AnyError> {
let conn_resource = state
.borrow()
.resource_table
.get::<ConnResource>(rid)
.ok_or_else(bad_resource_id)?;

let service = conn_resource.deno_service.inner.borrow_mut().ok_or("")?; // TODO

let key = service
.request
.headers()
.get(http::header::SEC_WEBSOCKET_KEY)
.ok_or("failed to read ws key from headers")?; // TODO
let digest = ring::digest::digest(
&ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", key.to_str()?).as_bytes(),
);
let return_key = base64::encode(digest);

let upgraded = service.request.on_upgrade().await?;
let stream =
deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded,
deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server,
None,
)
.await;

let (ws_tx, ws_rx) = stream.split();

let rid = state.borrow_mut().resource_table.add(WsStreamResource {
rx: AsyncRefCell::new(ws_rx),
tx: AsyncRefCell::new(ws_tx),
cancel: Default::default(),
});

Ok(HttpUpgradeWebsocketResponse {
key: return_key,
rid,
})
}

type BytesStream =
Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;

Expand Down