diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 4c68f2be64e7bb..a7142b09dc0cc4 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -1,11 +1,6 @@ // This is not a real HTTP server. We read blindly one time into 'requestBuf', // then write this fixed 'responseBuf'. The point of this benchmark is to // exercise the event loop in a simple yet semi-realistic way. -const OP_LISTEN = 1; -const OP_ACCEPT = 2; -const OP_READ = 3; -const OP_WRITE = 4; -const OP_CLOSE = 5; const requestBuf = new Uint8Array(64 * 1024); const responseBuf = new Uint8Array( "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" @@ -80,12 +75,12 @@ function handleAsyncMsgFromRust(opId, buf) { /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(OP_LISTEN, -1); + return sendSync(ops["listen"], -1); } /** Accepts a connection, returns rid. */ async function accept(rid) { - return await sendAsync(OP_ACCEPT, rid); + return await sendAsync(ops["accept"], rid); } /** @@ -93,16 +88,16 @@ async function accept(rid) { * Returns bytes read. */ async function read(rid, data) { - return await sendAsync(OP_READ, rid, data); + return await sendAsync(ops["read"], rid, data); } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ async function write(rid, data) { - return await sendAsync(OP_WRITE, rid, data); + return await sendAsync(ops["write"], rid, data); } function close(rid) { - return sendSync(OP_CLOSE, rid); + return sendSync(ops["close"], rid); } async function serve(rid) { @@ -120,8 +115,11 @@ async function serve(rid) { close(rid); } +let ops; + async function main() { Deno.core.setAsyncHandler(handleAsyncMsgFromRust); + ops = Deno.core.ops(); Deno.core.print("http_bench.js start\n"); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 3c077562d68c40..c019d8a1197f5f 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -36,12 +36,6 @@ impl log::Log for Logger { fn flush(&self) {} } -const OP_LISTEN: OpId = 1; -const OP_ACCEPT: OpId = 2; -const OP_READ: OpId = 3; -const OP_WRITE: OpId = 4; -const OP_CLOSE: OpId = 5; - #[derive(Clone, Debug, PartialEq)] pub struct Record { pub promise_id: i32, @@ -104,48 +98,24 @@ fn test_record_from() { // TODO test From<&[u8]> for Record } -pub type HttpBenchOp = dyn Future + Send; - -fn dispatch( - op_id: OpId, - control: &[u8], - zero_copy_buf: Option, -) -> CoreOp { - let record = Record::from(control); - let is_sync = record.promise_id == 0; - let http_bench_op = match op_id { - OP_LISTEN => { - assert!(is_sync); - op_listen() - } - OP_CLOSE => { - assert!(is_sync); - let rid = record.arg; - op_close(rid) - } - OP_ACCEPT => { - assert!(!is_sync); - let listener_rid = record.arg; - op_accept(listener_rid) - } - OP_READ => { - assert!(!is_sync); - let rid = record.arg; - op_read(rid, zero_copy_buf) - } - OP_WRITE => { - assert!(!is_sync); - let rid = record.arg; - op_write(rid, zero_copy_buf) - } - _ => panic!("bad op {}", op_id), - }; - let mut record_a = record.clone(); - let mut record_b = record.clone(); +pub type HttpOp = dyn Future + Send; - let fut = Box::new( - http_bench_op - .and_then(move |result| { +pub type HttpOpHandler = + fn(record: Record, zero_copy_buf: Option) -> Box; + +fn http_op( + handler: HttpOpHandler, +) -> impl Fn(&[u8], Option) -> CoreOp { + move |control: &[u8], zero_copy_buf: Option| -> CoreOp { + let record = Record::from(control); + let is_sync = record.promise_id == 0; + let op = handler(record.clone(), zero_copy_buf); + + let mut record_a = record.clone(); + let mut record_b = record.clone(); + + let fut = Box::new( + op.and_then(move |result| { record_a.result = result; Ok(record_a) }) @@ -158,12 +128,13 @@ fn dispatch( let record = result.unwrap(); Ok(record.into()) }), - ); + ); - if is_sync { - Op::Sync(fut.wait().unwrap()) - } else { - Op::Async(fut) + if is_sync { + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) + } } } @@ -181,7 +152,11 @@ fn main() { }); let mut isolate = deno::Isolate::new(startup_data, false); - isolate.set_dispatch(dispatch); + isolate.register_op("listen", http_op(op_listen)); + isolate.register_op("accept", http_op(op_accept)); + isolate.register_op("read", http_op(op_read)); + isolate.register_op("write", http_op(op_write)); + isolate.register_op("close", http_op(op_close)); isolate.then(|r| { js_check(r); @@ -225,7 +200,8 @@ fn new_rid() -> i32 { rid as i32 } -fn op_accept(listener_rid: i32) -> Box { +fn op_accept(record: Record, _zero_copy_buf: Option) -> Box { + let listener_rid = record.arg; debug!("accept {}", listener_rid); Box::new( futures::future::poll_fn(move || { @@ -248,9 +224,11 @@ fn op_accept(listener_rid: i32) -> Box { ) } -fn op_listen() -> Box { +fn op_listen( + _record: Record, + _zero_copy_buf: Option, +) -> Box { debug!("listen"); - Box::new(lazy(move || { let addr = "127.0.0.1:4544".parse::().unwrap(); let listener = tokio::net::TcpListener::bind(&addr).unwrap(); @@ -262,8 +240,9 @@ fn op_listen() -> Box { })) } -fn op_close(rid: i32) -> Box { +fn op_close(record: Record, _zero_copy_buf: Option) -> Box { debug!("close"); + let rid = record.arg; Box::new(lazy(move || { let mut table = RESOURCE_TABLE.lock().unwrap(); let r = table.remove(&rid); @@ -272,7 +251,8 @@ fn op_close(rid: i32) -> Box { })) } -fn op_read(rid: i32, zero_copy_buf: Option) -> Box { +fn op_read(record: Record, zero_copy_buf: Option) -> Box { + let rid = record.arg; debug!("read rid={}", rid); let mut zero_copy_buf = zero_copy_buf.unwrap(); Box::new( @@ -293,7 +273,8 @@ fn op_read(rid: i32, zero_copy_buf: Option) -> Box { ) } -fn op_write(rid: i32, zero_copy_buf: Option) -> Box { +fn op_write(record: Record, zero_copy_buf: Option) -> Box { + let rid = record.arg; debug!("write rid={}", rid); let zero_copy_buf = zero_copy_buf.unwrap(); Box::new( diff --git a/core/isolate.rs b/core/isolate.rs index bad79b5793c1c0..6795f25f06bfbe 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -13,10 +13,10 @@ use crate::libdeno::deno_buf; use crate::libdeno::deno_dyn_import_id; use crate::libdeno::deno_mod; use crate::libdeno::deno_pinned_buf; -use crate::libdeno::OpId; use crate::libdeno::PinnedBuf; use crate::libdeno::Snapshot1; use crate::libdeno::Snapshot2; +use crate::ops::*; use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use futures::stream::FuturesUnordered; @@ -34,24 +34,6 @@ use std::fmt; use std::ptr::null; use std::sync::{Arc, Mutex, Once}; -pub type Buf = Box<[u8]>; - -pub type OpAsyncFuture = Box + Send>; - -type PendingOpFuture = - Box + Send>; - -pub enum Op { - Sync(Buf), - Async(OpAsyncFuture), -} - -pub type CoreError = (); - -pub type CoreOp = Op; - -pub type OpResult = Result, E>; - /// Args: op_id, control_buf, zero_copy_buf type CoreDispatchFn = dyn Fn(OpId, &[u8], Option) -> CoreOp; @@ -179,6 +161,7 @@ pub struct Isolate { pending_dyn_imports: FuturesUnordered>, have_unpolled_ops: bool, startup_script: Option, + op_registry: OpRegistry, } unsafe impl Send for Isolate {} @@ -244,12 +227,17 @@ impl Isolate { have_unpolled_ops: false, pending_dyn_imports: FuturesUnordered::new(), startup_script, + op_registry: OpRegistry::new(), } } /// Defines the how Deno.core.dispatch() acts. /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf /// corresponds to the second argument of Deno.core.dispatch(). + /// + /// If this method is used then ops registered using `op_register` function are + /// ignored and all dispatching must be handled manually in provided callback. + // TODO: we want to deprecate and remove this API and move to `register_op` API pub fn set_dispatch(&mut self, f: F) where F: Fn(OpId, &[u8], Option) -> CoreOp + Send + Sync + 'static, @@ -257,6 +245,22 @@ impl Isolate { self.dispatch = Some(Arc::new(f)); } + /// New dispatch mechanism. Requires runtime to explicitly ask for op ids + /// before using any of the ops. + /// + /// Ops added using this method are only usable if `dispatch` is not set + /// (using `set_dispatch` method). + pub fn register_op(&mut self, name: &str, op: F) -> OpId + where + F: Fn(&[u8], Option) -> CoreOp + Send + Sync + 'static, + { + assert!( + self.dispatch.is_none(), + "set_dispatch should not be used in conjunction with register_op" + ); + self.op_registry.register(name, op) + } + pub fn set_dyn_import(&mut self, f: F) where F: Fn(deno_dyn_import_id, &str, &str) -> DynImportStream @@ -329,9 +333,17 @@ impl Isolate { let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; let op = if let Some(ref f) = isolate.dispatch { + assert!( + op_id != 0, + "op_id 0 is a special value that shouldn't be used with dispatch" + ); f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf)) } else { - panic!("isolate.dispatch not set") + isolate.op_registry.call( + op_id, + control_buf.as_ref(), + PinnedBuf::new(zero_copy_buf), + ) }; debug_assert_eq!(isolate.shared.size(), 0); diff --git a/core/lib.rs b/core/lib.rs index 9be1c3891789c5..42a692f1a10b53 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -11,6 +11,7 @@ mod js_errors; mod libdeno; mod module_specifier; mod modules; +mod ops; mod shared_queue; pub use crate::any_error::*; @@ -22,6 +23,7 @@ pub use crate::libdeno::OpId; pub use crate::libdeno::PinnedBuf; pub use crate::module_specifier::*; pub use crate::modules::*; +pub use crate::ops::*; pub fn v8_version() -> &'static str { use std::ffi::CStr; diff --git a/core/ops.rs b/core/ops.rs new file mode 100644 index 00000000000000..84c15e096cb528 --- /dev/null +++ b/core/ops.rs @@ -0,0 +1,111 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +pub use crate::libdeno::OpId; +use crate::PinnedBuf; +use futures::Future; +use std::collections::HashMap; + +pub type Buf = Box<[u8]>; + +pub type OpAsyncFuture = Box + Send>; + +pub(crate) type PendingOpFuture = + Box + Send>; + +pub type OpResult = Result, E>; + +pub enum Op { + Sync(Buf), + Async(OpAsyncFuture), +} + +pub type CoreError = (); + +pub type CoreOp = Op; + +/// Main type describing op +type OpDispatcher = dyn Fn(&[u8], Option) -> CoreOp; + +#[derive(Default)] +pub struct OpRegistry { + dispatchers: Vec>, + name_to_id: HashMap, +} + +impl OpRegistry { + pub fn new() -> Self { + let mut registry = Self::default(); + let op_id = registry.register("ops", |_, _| { + // ops is a special op which is handled in call. + unreachable!() + }); + assert_eq!(op_id, 0); + registry + } + + pub fn register(&mut self, name: &str, op: F) -> OpId + where + F: Fn(&[u8], Option) -> CoreOp + Send + Sync + 'static, + { + let op_id = self.dispatchers.len() as u32; + + let existing = self.name_to_id.insert(name.to_string(), op_id); + assert!( + existing.is_none(), + format!("Op already registered: {}", name) + ); + + self.dispatchers.push(Box::new(op)); + op_id + } + + fn json_map(&self) -> Buf { + let op_map_json = serde_json::to_string(&self.name_to_id).unwrap(); + op_map_json.as_bytes().to_owned().into_boxed_slice() + } + + pub fn call( + &self, + op_id: OpId, + control: &[u8], + zero_copy_buf: Option, + ) -> CoreOp { + // Op with id 0 has special meaning - it's a special op that is always + // provided to retrieve op id map. The map consists of name to `OpId` + // mappings. + if op_id == 0 { + return Op::Sync(self.json_map()); + } + + let d = &*self.dispatchers.get(op_id as usize).expect("Op not found!"); + d(control, zero_copy_buf) + } +} + +#[test] +fn test_op_registry() { + use std::sync::atomic; + use std::sync::Arc; + let mut op_registry = OpRegistry::new(); + + let c = Arc::new(atomic::AtomicUsize::new(0)); + let c_ = c.clone(); + + let test_id = op_registry.register("test", move |_, _| { + c_.fetch_add(1, atomic::Ordering::SeqCst); + CoreOp::Sync(Box::new([])) + }); + assert!(test_id != 0); + + let mut expected = HashMap::new(); + expected.insert("ops".to_string(), 0); + expected.insert("test".to_string(), 1); + assert_eq!(op_registry.name_to_id, expected); + + let res = op_registry.call(test_id, &[], None); + if let Op::Sync(buf) = res { + assert_eq!(buf.len(), 0); + } else { + unreachable!(); + } + assert_eq!(c.load(atomic::Ordering::SeqCst), 1); +} diff --git a/core/shared_queue.js b/core/shared_queue.js index 22a64a312bd4c7..7eeb612550719a 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -58,6 +58,13 @@ SharedQueue Binary Layout Deno.core.recv(handleAsyncMsgFromRust); } + function ops() { + // op id 0 is a special value to retreive the map of registered ops. + const opsMapBytes = Deno.core.send(0, new Uint8Array([]), null); + const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); + return JSON.parse(opsMapJson); + } + function assert(cond) { if (!cond) { throw Error("assert"); @@ -84,7 +91,6 @@ SharedQueue Binary Layout return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF]; } - // TODO(ry) rename to setMeta function setMeta(index, end, opId) { shared32[INDEX_OFFSETS + 2 * index] = end; shared32[INDEX_OFFSETS + 2 * index + 1] = opId; @@ -189,7 +195,8 @@ SharedQueue Binary Layout push, reset, shift - } + }, + ops }; assert(window[GLOBAL_NAMESPACE] != null); diff --git a/core/shared_queue.rs b/core/shared_queue.rs index 5f9554ad2140fb..dbb738f15d5be5 100644 --- a/core/shared_queue.rs +++ b/core/shared_queue.rs @@ -196,7 +196,7 @@ impl SharedQueue { #[cfg(test)] mod tests { use super::*; - use crate::isolate::Buf; + use crate::ops::Buf; #[test] fn basic() { diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index 0bd3b6415bd7bc..423e53578fc2d5 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -37,6 +37,11 @@ declare interface DenoCore { shift(): Uint8Array | null; }; + ops: { + init(): void; + get(name: string): number; + }; + recv(cb: MessageCallback): void; send(