Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize read and write ops #2259

Merged
merged 11 commits into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ ts_sources = [
"../js/deno.ts",
"../js/dir.ts",
"../js/dispatch.ts",
"../js/dispatch_minimal.ts",
"../js/dom_types.ts",
"../js/errors.ts",
"../js/event.ts",
Expand Down
133 changes: 133 additions & 0 deletions cli/dispatch_minimal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
// Do not add flatbuffer dependencies to this module.
//! Connects to js/dispatch_minimal.ts sendAsyncMinimal This acts as a faster
//! alternative to flatbuffers using a very simple list of int32s to lay out
//! messages. The first i32 is used to determine if a message a flatbuffer
//! message or a "minimal" message, see `has_minimal_token()`.
use crate::state::ThreadSafeState;
use deno::Buf;
use deno::Op;
use deno::PinnedBuf;
use futures::Future;

const DISPATCH_MINIMAL_TOKEN: i32 = 0xCAFE;
const OP_READ: i32 = 1;
ry marked this conversation as resolved.
Show resolved Hide resolved
const OP_WRITE: i32 = 2;

pub fn has_minimal_token(s: &[i32]) -> bool {
s[0] == DISPATCH_MINIMAL_TOKEN
}

#[derive(Clone, Debug, PartialEq)]
struct Record {
pub promise_id: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
}

impl Into<Buf> for Record {
fn into(self) -> Buf {
let vec = vec![
DISPATCH_MINIMAL_TOKEN,
self.promise_id,
self.op_id,
self.arg,
self.result,
];
//let len = vec.len();
ry marked this conversation as resolved.
Show resolved Hide resolved
let buf32 = vec.into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 5 * 4];
unsafe { Box::from_raw(ptr) }
piscisaureus marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl From<&[i32]> for Record {
ry marked this conversation as resolved.
Show resolved Hide resolved
fn from(s: &[i32]) -> Record {
let ptr = s.as_ptr();
let ints = unsafe { std::slice::from_raw_parts(ptr, 5) };
assert_eq!(ints[0], DISPATCH_MINIMAL_TOKEN);
Record {
promise_id: ints[1],
op_id: ints[2],
arg: ints[3],
result: ints[4],
}
}
}

pub fn dispatch_minimal(
state: &ThreadSafeState,
control32: &[i32],
zero_copy: Option<PinnedBuf>,
) -> Op {
let record = Record::from(control32);
let is_sync = record.promise_id == 0;
let min_op = match record.op_id {
OP_READ => ops::read(record.arg, zero_copy),
OP_WRITE => ops::write(record.arg, zero_copy),
_ => unimplemented!(),
};

let mut record_a = record.clone();
ry marked this conversation as resolved.
Show resolved Hide resolved
let mut record_b = record.clone();
let state = state.clone();

let fut = Box::new(
min_op
.and_then(move |result| {
ry marked this conversation as resolved.
Show resolved Hide resolved
record_a.result = result;
Ok(record_a)
}).or_else(|err| -> Result<Record, ()> {
debug!("unexpected err {}", err);
record_b.result = -1;
Ok(record_b)
}).then(move |result| -> Result<Buf, ()> {
let record = result.unwrap();
let buf: Buf = record.into();
state.metrics_op_completed(buf.len());
Ok(buf)
}),
);
if is_sync {
Op::Sync(fut.wait().unwrap())
} else {
Op::Async(fut)
}
}

mod ops {
use crate::errors;
use crate::resources;
use crate::tokio_write;
use deno::PinnedBuf;
use futures::Future;

type MinimalOp = dyn Future<Item = i32, Error = errors::DenoError> + Send;

pub fn read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("read rid={}", rid);
let zero_copy = zero_copy.unwrap();
match resources::lookup(rid as u32) {
None => Box::new(futures::future::err(errors::bad_resource())),
Some(resource) => Box::new(
tokio::io::read(resource, zero_copy)
.map_err(|err| err.into())
ry marked this conversation as resolved.
Show resolved Hide resolved
.and_then(move |(_resource, _buf, nread)| Ok(nread as i32)),
),
}
}

pub fn write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("write rid={}", rid);
let zero_copy = zero_copy.unwrap();
match resources::lookup(rid as u32) {
None => Box::new(futures::future::err(errors::bad_resource())),
Some(resource) => Box::new(
tokio_write::write(resource, zero_copy)
.map_err(|err| err.into())
ry marked this conversation as resolved.
Show resolved Hide resolved
.and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)),
),
}
}
}
1 change: 1 addition & 0 deletions cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ extern crate nix;
mod ansi;
pub mod compiler;
pub mod deno_dir;
mod dispatch_minimal;
pub mod errors;
pub mod flags;
mod fs;
Expand Down
35 changes: 31 additions & 4 deletions cli/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
use atty;
use crate::ansi;
use crate::compiler::get_compiler_config;
use crate::dispatch_minimal::dispatch_minimal;
use crate::dispatch_minimal::has_minimal_token;
use crate::errors;
use crate::errors::{DenoError, DenoResult, ErrorKind};
use crate::fs as deno_fs;
Expand Down Expand Up @@ -74,18 +76,44 @@ fn empty_buf() -> Buf {
Box::new([])
}

fn slice_u8_as_i32(bytes: &[u8]) -> &[i32] {
let p = bytes.as_ptr();
// Assert pointer is 32 bit aligned before casting.
assert_eq!((p as usize) % std::mem::align_of::<i32>(), 0);
#[allow(clippy::cast_ptr_alignment)]
let p32 = p as *const i32;
unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) }
ry marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn dispatch_all(
state: &ThreadSafeState,
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
) -> Op {
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);

let control32 = slice_u8_as_i32(control);
let op = if has_minimal_token(control32) {
dispatch_minimal(state, control32, zero_copy)
} else {
dispatch_all_legacy(state, control, zero_copy, op_selector)
};
state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);
op
}

/// Processes raw messages from JavaScript.
/// This functions invoked every time Deno.core.dispatch() is called.
/// control corresponds to the first argument of Deno.core.dispatch().
/// data corresponds to the second argument of Deno.core.dispatch().
pub fn dispatch_all(
pub fn dispatch_all_legacy(
state: &ThreadSafeState,
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
) -> Op {
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
let base = msg::get_root_as_base(&control);
let is_sync = base.sync();
let inner_type = base.inner_type();
Expand All @@ -99,7 +127,6 @@ pub fn dispatch_all(
let op: Box<OpWithError> = op_func(state, &base, zero_copy);

let state = state.clone();
state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);

let fut = Box::new(
op.or_else(move |err: DenoError| -> Result<Buf, ()> {
Expand Down
86 changes: 63 additions & 23 deletions js/dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,86 @@ import * as flatbuffers from "./flatbuffers";
import * as msg from "gen/cli/msg_generated";
import * as errors from "./errors";
import * as util from "./util";
import {
RecordMinimal,
nextPromiseId,
recordFromBufMinimal,
handleAsyncMsgFromRustMinimal
} from "./dispatch_minimal";

let nextCmdId = 0;
const promiseTable = new Map<number, util.Resolvable<msg.Base>>();

interface Record extends RecordMinimal {
base?: msg.Base;
}

function recordFromBuf(buf: Uint8Array): Record {
ry marked this conversation as resolved.
Show resolved Hide resolved
// assert(buf.byteLength % 4 == 0);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);

const recordMin = recordFromBufMinimal(buf32);
if (recordMin) {
return recordMin;
} else {
const bb = new flatbuffers.ByteBuffer(buf);
const base = msg.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
return {
opId: -1,
arg: -1,
result: -1,
promiseId: cmdId,
base
};
}
}

export function handleAsyncMsgFromRust(ui8: Uint8Array): void {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = msg.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
const record = recordFromBuf(ui8);

if (record.base) {
// Legacy
const { promiseId, base } = record;
const promise = promiseTable.get(promiseId);
util.assert(promise != null, `Expecting promise in table. ${promiseId}`);
promiseTable.delete(record.promiseId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
} else {
promise!.resolve(base);
// Fast and new
handleAsyncMsgFromRustMinimal(ui8, record);
}
}

function ui8FromArrayBufferView(abv: ArrayBufferView): Uint8Array {
return new Uint8Array(abv.buffer, abv.byteOffset, abv.byteLength);
}

function sendInternal(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
sync = true
): [number, null | Uint8Array] {
const cmdId = nextCmdId++;
const message = msg.Base.createBase(
builder,
cmdId,
sync,
0,
0,
innerType,
inner
);
builder.finish(message);
const cmdId = nextPromiseId();
msg.Base.startBase(builder);
msg.Base.addInner(builder, inner);
msg.Base.addInnerType(builder, innerType);
msg.Base.addSync(builder, sync);
msg.Base.addCmdId(builder, cmdId);
builder.finish(msg.Base.endBase(builder));

const control = builder.asUint8Array();
const response = core.dispatch(control, zeroCopy);

const response = core.dispatch(
control,
zeroCopy ? ui8FromArrayBufferView(zeroCopy!) : undefined
ry marked this conversation as resolved.
Show resolved Hide resolved
);

builder.inUse = false;
return [cmdId, response];
Expand Down
Loading