Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start porting flatbuffer ops to JSON #2799

Merged
merged 9 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cli/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,21 @@ ts_sources = [
"../js/dir.ts",
"../js/dispatch.ts",
"../js/dispatch_flatbuffers.ts",
"../js/dispatch_json.ts",
"../js/dispatch_minimal.ts",
"../js/dom_file.ts",
"../js/dom_types.ts",
"../js/dom_util.ts",
"../js/error_stack.ts",
"../js/errors.ts",
"../js/event.ts",
"../js/event_target.ts",
"../js/fetch.ts",
"../js/format_error.ts",
"../js/dom_file.ts",
"../js/file_info.ts",
"../js/files.ts",
"../js/flatbuffers.ts",
"../js/form_data.ts",
"../js/format_error.ts",
"../js/get_random_values.ts",
"../js/globals.ts",
"../js/headers.ts",
Expand Down Expand Up @@ -134,10 +135,10 @@ ts_sources = [
"../js/url_search_params.ts",
"../js/util.ts",
"../js/utime.ts",
"../js/version.ts",
"../js/window.ts",
"../js/workers.ts",
"../js/write_file.ts",
"../js/version.ts",
"../js/xeval.ts",
"../tsconfig.json",

Expand Down
2 changes: 2 additions & 0 deletions cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ extern crate indexmap;
#[cfg(unix)]
extern crate nix;
extern crate rand;
extern crate serde;
extern crate serde_derive;
extern crate url;

mod ansi;
Expand Down
38 changes: 0 additions & 38 deletions cli/msg.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ union Any {
Cwd,
CwdRes,
Dial,
Environ,
EnvironRes,
Exit,
Fetch,
FetchSourceFile,
FetchSourceFileRes,
Expand All @@ -29,8 +26,6 @@ union Any {
HostGetMessageRes,
HostGetWorkerClosed,
HostPostMessage,
IsTTY,
IsTTYRes,
Kill,
Link,
Listen,
Expand Down Expand Up @@ -77,9 +72,6 @@ union Any {
Truncate,
HomeDir,
HomeDirRes,
ExecPath,
ExecPathRes,
Utime,
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
Expand Down Expand Up @@ -286,21 +278,11 @@ table GlobalTimerRes { }

table GlobalTimerStop { }

table Exit {
code: int;
}

table Environ {}

table SetEnv {
key: string;
value: string;
}

table EnvironRes {
map: [KeyValue];
}

table KeyValue {
key: string;
value: string;
Expand Down Expand Up @@ -469,18 +451,6 @@ table HomeDirRes {
path: string;
}

table ExecPath {}

table ExecPathRes {
path: string;
}

table Utime {
filename: string;
atime: uint64;
mtime: uint64;
}

table Open {
filename: string;
perm: uint;
Expand Down Expand Up @@ -600,14 +570,6 @@ table NowRes {
subsec_nanos: uint32;
}

table IsTTY {}

table IsTTYRes {
stdin: bool;
stdout: bool;
stderr: bool;
}

table Seek {
rid: uint32;
offset: int;
Expand Down
20 changes: 3 additions & 17 deletions cli/ops/dispatch_flatbuffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ use super::files::{op_close, op_open, op_read, op_seek, op_write};
use super::fs::{
op_chdir, op_chmod, op_chown, op_copy_file, op_cwd, op_link,
op_make_temp_dir, op_mkdir, op_read_dir, op_read_link, op_remove, op_rename,
op_stat, op_symlink, op_truncate, op_utime,
op_stat, op_symlink, op_truncate,
};
use super::metrics::op_metrics;
use super::net::{op_accept, op_dial, op_listen, op_shutdown};
use super::os::{
op_env, op_exec_path, op_exit, op_home_dir, op_is_tty, op_set_env, op_start,
};
use super::os::{op_home_dir, op_set_env, op_start};
use super::performance::op_now;
use super::permissions::{op_permissions, op_revoke_permission};
use super::process::{op_kill, op_run, op_run_status};
Expand Down Expand Up @@ -65,13 +63,8 @@ pub fn dispatch(

let op_result = op_func(state, &base, zero_copy);

let state = state.clone();

match op_result {
Ok(Op::Sync(buf)) => {
state.metrics_op_completed(buf.len());
Op::Sync(buf)
}
Ok(Op::Sync(buf)) => Op::Sync(buf),
Ok(Op::Async(fut)) => {
let result_fut = Box::new(
fut
Expand Down Expand Up @@ -107,7 +100,6 @@ pub fn dispatch(
},
)
};
state.metrics_op_completed(buf.len());
Ok(buf)
})
.map_err(|err| panic!("unexpected error {:?}", err)),
Expand All @@ -129,7 +121,6 @@ pub fn dispatch(
..Default::default()
},
);
state.metrics_op_completed(response_buf.len());
Op::Sync(response_buf)
}
}
Expand Down Expand Up @@ -162,9 +153,6 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
msg::Any::CreateWorker => Some(op_create_worker),
msg::Any::Cwd => Some(op_cwd),
msg::Any::Dial => Some(op_dial),
msg::Any::Environ => Some(op_env),
msg::Any::ExecPath => Some(op_exec_path),
msg::Any::Exit => Some(op_exit),
msg::Any::Fetch => Some(op_fetch),
msg::Any::FetchSourceFile => Some(op_fetch_source_file),
msg::Any::FormatError => Some(op_format_error),
Expand All @@ -174,7 +162,6 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
msg::Any::HostGetMessage => Some(op_host_get_message),
msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed),
msg::Any::HostPostMessage => Some(op_host_post_message),
msg::Any::IsTTY => Some(op_is_tty),
msg::Any::Kill => Some(op_kill),
msg::Any::Link => Some(op_link),
msg::Any::Listen => Some(op_listen),
Expand Down Expand Up @@ -203,7 +190,6 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
msg::Any::Symlink => Some(op_symlink),
msg::Any::Truncate => Some(op_truncate),
msg::Any::HomeDir => Some(op_home_dir),
msg::Any::Utime => Some(op_utime),
msg::Any::Write => Some(op_write),

// TODO(ry) split these out so that only the appropriate Workers can access
Expand Down
113 changes: 113 additions & 0 deletions cli/ops/dispatch_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use crate::state::ThreadSafeState;
use crate::tokio_util;
use deno::*;
use futures::Future;
use futures::Poll;
pub use serde_derive::Deserialize;
use serde_json::json;
pub use serde_json::Value;

pub type AsyncJsonOp = Box<dyn Future<Item = Value, Error = ErrBox> + Send>;

pub enum JsonOp {
Sync(Value),
Async(AsyncJsonOp),
}

fn json_err(err: ErrBox) -> Value {
use crate::deno_error::GetErrorKind;
json!({
"message": err.to_string(),
"kind": err.kind() as u32,
})
}

pub type Dispatcher = fn(
state: &ThreadSafeState,
args: Value,
zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox>;

fn serialize_result(
promise_id: Option<u64>,
result: Result<Value, ErrBox>,
) -> Buf {
let value = match result {
Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
Err(err) => json!({ "err": json_err(err), "promiseId": promise_id }),
};
let vec = serde_json::to_vec(&value).unwrap();
vec.into_boxed_slice()
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct AsyncArgs {
promise_id: Option<u64>,
}

pub fn dispatch(
d: Dispatcher,
state: &ThreadSafeState,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
let async_args: AsyncArgs = serde_json::from_slice(control).unwrap();
ry marked this conversation as resolved.
Show resolved Hide resolved
let promise_id = async_args.promise_id;
let is_sync = promise_id.is_none();

let result = serde_json::from_slice(control)
.map_err(ErrBox::from)
.and_then(move |args| d(state, args, zero_copy));
match result {
Ok(JsonOp::Sync(sync_value)) => {
assert!(promise_id.is_none());
CoreOp::Sync(serialize_result(promise_id, Ok(sync_value)))
}
Ok(JsonOp::Async(fut)) => {
assert!(promise_id.is_some());
let fut2 = Box::new(fut.then(move |result| -> Result<Buf, ()> {
Ok(serialize_result(promise_id, result))
}));
CoreOp::Async(fut2)
}
Err(sync_err) => {
let buf = serialize_result(promise_id, Err(sync_err));
if is_sync {
CoreOp::Sync(buf)
} else {
CoreOp::Async(Box::new(futures::future::ok(buf)))
}
}
}
}

// This is just type conversion. Implement From trait?
// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85
fn convert_blocking_json<F>(f: F) -> Poll<Value, ErrBox>
where
F: FnOnce() -> Result<Value, ErrBox>,
{
use futures::Async::*;
match tokio_threadpool::blocking(f) {
Ok(Ready(Ok(v))) => Ok(Ready(v)),
Ok(Ready(Err(err))) => Err(err),
Ok(NotReady) => Ok(NotReady),
Err(err) => panic!("blocking error {}", err),
}
}

pub fn blocking_json<F>(is_sync: bool, f: F) -> Result<JsonOp, ErrBox>
where
F: 'static + Send + FnOnce() -> Result<Value, ErrBox>,
{
if is_sync {
Ok(JsonOp::Sync(f()?))
} else {
Ok(JsonOp::Async(Box::new(futures::sync::oneshot::spawn(
tokio_util::poll_fn(move || convert_blocking_json(f)),
&tokio_executor::DefaultExecutor::current(),
))))
}
}
11 changes: 3 additions & 8 deletions cli/ops/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,15 @@ fn test_parse_min_record() {

pub fn dispatch(
d: Dispatcher,
state: &ThreadSafeState,
_state: &ThreadSafeState,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
let mut record = parse_min_record(control).unwrap();
let is_sync = record.promise_id == 0;

// TODO(ry) Currently there aren't any sync minimal ops. This is just a sanity
// check. Remove later.
assert!(!is_sync);

let state = state.clone();

let rid = record.arg;
let min_op = d(rid, zero_copy);

Expand All @@ -102,10 +98,9 @@ pub fn dispatch(
record.result = -1;
}
}
let buf: Buf = record.into();
state.metrics_op_completed(buf.len());
Ok(buf)
Ok(record.into())
}));

if is_sync {
Op::Sync(fut.wait().unwrap())
} else {
Expand Down
Loading