Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Json ops metrics #2805

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cli/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,21 @@ ts_sources = [
"../js/dir.ts",
"../js/dispatch.ts",
"../js/dispatch_flatbuffers.ts",
"../js/dispatch_json.ts",
"../js/dispatch_minimal.ts",
"../js/dom_file.ts",
"../js/dom_types.ts",
"../js/dom_util.ts",
"../js/error_stack.ts",
"../js/errors.ts",
"../js/event.ts",
"../js/event_target.ts",
"../js/fetch.ts",
"../js/format_error.ts",
"../js/dom_file.ts",
"../js/file_info.ts",
"../js/files.ts",
"../js/flatbuffers.ts",
"../js/form_data.ts",
"../js/format_error.ts",
"../js/get_random_values.ts",
"../js/globals.ts",
"../js/headers.ts",
Expand Down Expand Up @@ -134,10 +135,10 @@ ts_sources = [
"../js/url_search_params.ts",
"../js/util.ts",
"../js/utime.ts",
"../js/version.ts",
"../js/window.ts",
"../js/workers.ts",
"../js/write_file.ts",
"../js/version.ts",
"../js/xeval.ts",
"../tsconfig.json",

Expand Down
2 changes: 2 additions & 0 deletions cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ extern crate indexmap;
#[cfg(unix)]
extern crate nix;
extern crate rand;
extern crate serde;
extern crate serde_derive;
extern crate url;

mod ansi;
Expand Down
50 changes: 0 additions & 50 deletions cli/msg.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ union Any {
Cwd,
CwdRes,
Dial,
Environ,
EnvironRes,
Exit,
Fetch,
FetchSourceFile,
FetchSourceFileRes,
Expand All @@ -29,16 +26,12 @@ union Any {
HostGetMessageRes,
HostGetWorkerClosed,
HostPostMessage,
IsTTY,
IsTTYRes,
Kill,
Link,
Listen,
ListenRes,
MakeTempDir,
MakeTempDirRes,
Metrics,
MetricsRes,
Mkdir,
NewConn,
Now,
Expand Down Expand Up @@ -77,9 +70,6 @@ union Any {
Truncate,
HomeDir,
HomeDirRes,
ExecPath,
ExecPathRes,
Utime,
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
Expand Down Expand Up @@ -286,21 +276,11 @@ table GlobalTimerRes { }

table GlobalTimerStop { }

table Exit {
code: int;
}

table Environ {}

table SetEnv {
key: string;
value: string;
}

table EnvironRes {
map: [KeyValue];
}

table KeyValue {
key: string;
value: string;
Expand Down Expand Up @@ -469,18 +449,6 @@ table HomeDirRes {
path: string;
}

table ExecPath {}

table ExecPathRes {
path: string;
}

table Utime {
filename: string;
atime: uint64;
mtime: uint64;
}

table Open {
filename: string;
perm: uint;
Expand Down Expand Up @@ -548,16 +516,6 @@ table NewConn {
local_addr: string;
}

table Metrics {}

table MetricsRes {
ops_dispatched: uint64;
ops_completed: uint64;
bytes_sent_control: uint64;
bytes_sent_data: uint64;
bytes_received: uint64;
}

enum ProcessStdio: byte { Inherit, Piped, Null }

table Run {
Expand Down Expand Up @@ -600,14 +558,6 @@ table NowRes {
subsec_nanos: uint32;
}

table IsTTY {}

table IsTTYRes {
stdin: bool;
stdout: bool;
stderr: bool;
}

table Seek {
rid: uint32;
offset: int;
Expand Down
14 changes: 0 additions & 14 deletions cli/msg.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,8 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
#![allow(dead_code)]
#![cfg_attr(feature = "cargo-clippy", allow(clippy::all, clippy::pedantic))]
use crate::state;
use flatbuffers;
use std::sync::atomic::Ordering;

// GN_OUT_DIR is set either by build.rs (for the Cargo build), or by
// build_extra/rust/run.py (for the GN+Ninja build).
include!(concat!(env!("GN_OUT_DIR"), "/gen/cli/msg_generated.rs"));

impl<'a> From<&'a state::Metrics> for MetricsResArgs {
fn from(m: &'a state::Metrics) -> Self {
MetricsResArgs {
ops_dispatched: m.ops_dispatched.load(Ordering::SeqCst) as u64,
ops_completed: m.ops_completed.load(Ordering::SeqCst) as u64,
bytes_sent_control: m.bytes_sent_control.load(Ordering::SeqCst) as u64,
bytes_sent_data: m.bytes_sent_data.load(Ordering::SeqCst) as u64,
bytes_received: m.bytes_received.load(Ordering::SeqCst) as u64,
}
}
}
22 changes: 3 additions & 19 deletions cli/ops/dispatch_flatbuffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ use super::files::{op_close, op_open, op_read, op_seek, op_write};
use super::fs::{
op_chdir, op_chmod, op_chown, op_copy_file, op_cwd, op_link,
op_make_temp_dir, op_mkdir, op_read_dir, op_read_link, op_remove, op_rename,
op_stat, op_symlink, op_truncate, op_utime,
op_stat, op_symlink, op_truncate,
};
use super::metrics::op_metrics;
use super::net::{op_accept, op_dial, op_listen, op_shutdown};
use super::os::{
op_env, op_exec_path, op_exit, op_home_dir, op_is_tty, op_set_env, op_start,
};
use super::os::{op_home_dir, op_set_env, op_start};
use super::performance::op_now;
use super::permissions::{op_permissions, op_revoke_permission};
use super::process::{op_kill, op_run, op_run_status};
Expand Down Expand Up @@ -65,13 +62,8 @@ pub fn dispatch(

let op_result = op_func(state, &base, zero_copy);

let state = state.clone();

match op_result {
Ok(Op::Sync(buf)) => {
state.metrics_op_completed(buf.len());
Op::Sync(buf)
}
Ok(Op::Sync(buf)) => Op::Sync(buf),
Ok(Op::Async(fut)) => {
let result_fut = Box::new(
fut
Expand Down Expand Up @@ -107,7 +99,6 @@ pub fn dispatch(
},
)
};
state.metrics_op_completed(buf.len());
Ok(buf)
})
.map_err(|err| panic!("unexpected error {:?}", err)),
Expand All @@ -129,7 +120,6 @@ pub fn dispatch(
..Default::default()
},
);
state.metrics_op_completed(response_buf.len());
Op::Sync(response_buf)
}
}
Expand Down Expand Up @@ -162,9 +152,6 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
msg::Any::CreateWorker => Some(op_create_worker),
msg::Any::Cwd => Some(op_cwd),
msg::Any::Dial => Some(op_dial),
msg::Any::Environ => Some(op_env),
msg::Any::ExecPath => Some(op_exec_path),
msg::Any::Exit => Some(op_exit),
msg::Any::Fetch => Some(op_fetch),
msg::Any::FetchSourceFile => Some(op_fetch_source_file),
msg::Any::FormatError => Some(op_format_error),
Expand All @@ -174,12 +161,10 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
msg::Any::HostGetMessage => Some(op_host_get_message),
msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed),
msg::Any::HostPostMessage => Some(op_host_post_message),
msg::Any::IsTTY => Some(op_is_tty),
msg::Any::Kill => Some(op_kill),
msg::Any::Link => Some(op_link),
msg::Any::Listen => Some(op_listen),
msg::Any::MakeTempDir => Some(op_make_temp_dir),
msg::Any::Metrics => Some(op_metrics),
msg::Any::Mkdir => Some(op_mkdir),
msg::Any::Now => Some(op_now),
msg::Any::Open => Some(op_open),
Expand All @@ -203,7 +188,6 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
msg::Any::Symlink => Some(op_symlink),
msg::Any::Truncate => Some(op_truncate),
msg::Any::HomeDir => Some(op_home_dir),
msg::Any::Utime => Some(op_utime),
msg::Any::Write => Some(op_write),

// TODO(ry) split these out so that only the appropriate Workers can access
Expand Down
113 changes: 113 additions & 0 deletions cli/ops/dispatch_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use crate::state::ThreadSafeState;
use crate::tokio_util;
use deno::*;
use futures::Future;
use futures::Poll;
pub use serde_derive::Deserialize;
use serde_json::json;
pub use serde_json::Value;

pub type AsyncJsonOp = Box<dyn Future<Item = Value, Error = ErrBox> + Send>;

pub enum JsonOp {
Sync(Value),
Async(AsyncJsonOp),
}

fn json_err(err: ErrBox) -> Value {
use crate::deno_error::GetErrorKind;
json!({
"message": err.to_string(),
"kind": err.kind() as u32,
})
}

pub type Dispatcher = fn(
state: &ThreadSafeState,
args: Value,
zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox>;

fn serialize_result(
promise_id: Option<u64>,
result: Result<Value, ErrBox>,
) -> Buf {
let value = match result {
Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
Err(err) => json!({ "err": json_err(err), "promiseId": promise_id }),
};
let vec = serde_json::to_vec(&value).unwrap();
vec.into_boxed_slice()
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct AsyncArgs {
promise_id: Option<u64>,
}

pub fn dispatch(
d: Dispatcher,
state: &ThreadSafeState,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
let async_args: AsyncArgs = serde_json::from_slice(control).unwrap();
let promise_id = async_args.promise_id;
let is_sync = promise_id.is_none();

let result = serde_json::from_slice(control)
.map_err(ErrBox::from)
.and_then(move |args| d(state, args, zero_copy));
match result {
Ok(JsonOp::Sync(sync_value)) => {
assert!(promise_id.is_none());
CoreOp::Sync(serialize_result(promise_id, Ok(sync_value)))
}
Ok(JsonOp::Async(fut)) => {
assert!(promise_id.is_some());
let fut2 = Box::new(fut.then(move |result| -> Result<Buf, ()> {
Ok(serialize_result(promise_id, result))
}));
CoreOp::Async(fut2)
}
Err(sync_err) => {
let buf = serialize_result(promise_id, Err(sync_err));
if is_sync {
CoreOp::Sync(buf)
} else {
CoreOp::Async(Box::new(futures::future::ok(buf)))
}
}
}
}

// This is just type conversion. Implement From trait?
// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85
fn convert_blocking_json<F>(f: F) -> Poll<Value, ErrBox>
where
F: FnOnce() -> Result<Value, ErrBox>,
{
use futures::Async::*;
match tokio_threadpool::blocking(f) {
Ok(Ready(Ok(v))) => Ok(Ready(v)),
Ok(Ready(Err(err))) => Err(err),
Ok(NotReady) => Ok(NotReady),
Err(err) => panic!("blocking error {}", err),
}
}

pub fn blocking_json<F>(is_sync: bool, f: F) -> Result<JsonOp, ErrBox>
where
F: 'static + Send + FnOnce() -> Result<Value, ErrBox>,
{
if is_sync {
Ok(JsonOp::Sync(f()?))
} else {
Ok(JsonOp::Async(Box::new(futures::sync::oneshot::spawn(
tokio_util::poll_fn(move || convert_blocking_json(f)),
&tokio_executor::DefaultExecutor::current(),
))))
}
}
Loading