Skip to content

Commit

Permalink
Add op_id argument to Deno.core.dispatch
Browse files Browse the repository at this point in the history
Removes the magic number hack to switch between flatbuffers and the
minimal dispatcher.

Adds machinery to pass the op_id through the shared_queue.
  • Loading branch information
ry committed Aug 6, 2019
1 parent 046cccf commit 8d16fe2
Show file tree
Hide file tree
Showing 22 changed files with 354 additions and 245 deletions.
39 changes: 13 additions & 26 deletions cli/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,26 @@ use crate::state::ThreadSafeState;
use deno::Buf;
use deno::CoreOp;
use deno::Op;
use deno::OpId;
use deno::PinnedBuf;
use futures::Future;

const DISPATCH_MINIMAL_TOKEN: i32 = 0xCAFE;
const OP_READ: i32 = 1;
const OP_WRITE: i32 = 2;
const OP_READ: OpId = 1;
const OP_WRITE: OpId = 2;

#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
pub struct Record {
pub promise_id: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
}

impl Into<Buf> for Record {
fn into(self) -> Buf {
let vec = vec![
DISPATCH_MINIMAL_TOKEN,
self.promise_id,
self.op_id,
self.arg,
self.result,
];
let vec = vec![self.promise_id, self.arg, self.result];
let buf32 = vec.into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 5 * 4];
let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
unsafe { Box::from_raw(ptr) }
}
}
Expand All @@ -48,32 +41,25 @@ pub fn parse_min_record(bytes: &[u8]) -> Option<Record> {
let p32 = p as *const i32;
let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) };

if s.len() < 5 {
if s.len() != 3 {
return None;
}
let ptr = s.as_ptr();
let ints = unsafe { std::slice::from_raw_parts(ptr, 5) };
if ints[0] != DISPATCH_MINIMAL_TOKEN {
return None;
}
let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
Some(Record {
promise_id: ints[1],
op_id: ints[2],
arg: ints[3],
result: ints[4],
promise_id: ints[0],
arg: ints[1],
result: ints[2],
})
}

#[test]
fn test_parse_min_record() {
let buf = vec![
0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0,
];
let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0];
assert_eq!(
parse_min_record(&buf),
Some(Record {
promise_id: 1,
op_id: 2,
arg: 3,
result: 4,
})
Expand All @@ -88,11 +74,12 @@ fn test_parse_min_record() {

pub fn dispatch_minimal(
state: &ThreadSafeState,
op_id: OpId,
mut record: Record,
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
let is_sync = record.promise_id == 0;
let min_op = match record.op_id {
let min_op = match op_id {
OP_READ => ops::read(record.arg, zero_copy),
OP_WRITE => ops::write(record.arg, zero_copy),
_ => unimplemented!(),
Expand Down
15 changes: 6 additions & 9 deletions cli/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,7 @@ use crate::tokio_write;
use crate::version;
use crate::worker::Worker;
use atty;
use deno::Buf;
use deno::CoreOp;
use deno::ErrBox;
use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::OpResult;
use deno::PinnedBuf;
use deno::*;
use flatbuffers::FlatBufferBuilder;
use futures;
use futures::Async;
Expand Down Expand Up @@ -82,17 +75,21 @@ fn empty_buf() -> Buf {
Box::new([])
}

const FLATBUFFER_OP_ID: OpId = 44;

pub fn dispatch_all(
state: &ThreadSafeState,
op_id: OpId,
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
) -> CoreOp {
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
let op = if let Some(min_record) = parse_min_record(control) {
dispatch_minimal(state, min_record, zero_copy)
dispatch_minimal(state, op_id, min_record, zero_copy)
} else {
debug_assert_eq!(op_id, FLATBUFFER_OP_ID);
dispatch_all_legacy(state, control, zero_copy, op_selector)
};
state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);
Expand Down
4 changes: 3 additions & 1 deletion cli/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use deno::CoreOp;
use deno::ErrBox;
use deno::Loader;
use deno::ModuleSpecifier;
use deno::OpId;
use deno::PinnedBuf;
use futures::future::Shared;
use futures::Future;
Expand Down Expand Up @@ -104,10 +105,11 @@ impl Deref for ThreadSafeState {
impl ThreadSafeState {
pub fn dispatch(
&self,
op_id: OpId,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
ops::dispatch_all(self, control, zero_copy, self.dispatch_selector)
ops::dispatch_all(self, op_id, control, zero_copy, self.dispatch_selector)
}
}

Expand Down
4 changes: 2 additions & 2 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ impl Worker {
{
let mut i = isolate.lock().unwrap();
let state_ = state.clone();
i.set_dispatch(move |control_buf, zero_copy_buf| {
state_.dispatch(control_buf, zero_copy_buf)
i.set_dispatch(move |op_id, control_buf, zero_copy_buf| {
state_.dispatch(op_id, control_buf, zero_copy_buf)
});
let state_ = state.clone();
i.set_js_error_create(move |v8_exception| {
Expand Down
3 changes: 2 additions & 1 deletion core/core.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
// Deno and therefore do not flow through to the runtime type library.

declare interface MessageCallback {
(msg: Uint8Array): void;
(opId: number, msg: Uint8Array): void;
}

declare interface DenoCore {
dispatch(
opId: number,
control: Uint8Array,
zeroCopy?: ArrayBufferView | null
): Uint8Array | null;
Expand Down
20 changes: 9 additions & 11 deletions core/examples/http_bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,19 @@ function createResolvable() {
return Object.assign(promise, methods);
}

const scratch32 = new Int32Array(4);
const scratch32 = new Int32Array(3);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteLength
);
assert(scratchBytes.byteLength === 4 * 4);
assert(scratchBytes.byteLength === 3 * 4);

function send(promiseId, opId, arg, zeroCopy = null) {
scratch32[0] = promiseId;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
return Deno.core.dispatch(scratchBytes, zeroCopy);
scratch32[1] = arg;
scratch32[2] = -1;
return Deno.core.dispatch(opId, scratchBytes, zeroCopy);
}

/** Returns Promise<number> */
Expand All @@ -55,13 +54,12 @@ function sendAsync(opId, arg, zeroCopy = null) {
}

function recordFromBuf(buf) {
assert(buf.byteLength === 16);
assert(buf.byteLength === 3 * 4);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
return {
promiseId: buf32[0],
opId: buf32[1],
arg: buf32[2],
result: buf32[3]
arg: buf32[1],
result: buf32[2]
};
}

Expand All @@ -72,7 +70,7 @@ function sendSync(opId, arg) {
return record.result;
}

function handleAsyncMsgFromRust(buf) {
function handleAsyncMsgFromRust(opId, buf) {
const record = recordFromBuf(buf);
const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
Expand Down
50 changes: 25 additions & 25 deletions core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,23 @@ impl log::Log for Logger {
fn flush(&self) {}
}

const OP_LISTEN: i32 = 1;
const OP_ACCEPT: i32 = 2;
const OP_READ: i32 = 3;
const OP_WRITE: i32 = 4;
const OP_CLOSE: i32 = 5;
const OP_LISTEN: OpId = 1;
const OP_ACCEPT: OpId = 2;
const OP_READ: OpId = 3;
const OP_WRITE: OpId = 4;
const OP_CLOSE: OpId = 5;

#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
}

impl Into<Buf> for Record {
fn into(self) -> Buf {
let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result]
.into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 16];
let buf32 = vec![self.promise_id, self.arg, self.result].into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
unsafe { Box::from_raw(ptr) }
}
}
Expand All @@ -63,28 +61,26 @@ impl From<&[u8]> for Record {
fn from(s: &[u8]) -> Record {
#[allow(clippy::cast_ptr_alignment)]
let ptr = s.as_ptr() as *const i32;
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
Record {
promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
arg: ints[1],
result: ints[2],
}
}
}

impl From<Buf> for Record {
fn from(buf: Buf) -> Record {
assert_eq!(buf.len(), 4 * 4);
assert_eq!(buf.len(), 3 * 4);
#[allow(clippy::cast_ptr_alignment)]
let ptr = Box::into_raw(buf) as *mut [i32; 4];
let ptr = Box::into_raw(buf) as *mut [i32; 3];
let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) };
assert_eq!(ints.len(), 4);
assert_eq!(ints.len(), 3);
Record {
promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
arg: ints[1],
result: ints[2],
}
}
}
Expand All @@ -93,7 +89,6 @@ impl From<Buf> for Record {
fn test_record_from() {
let r = Record {
promise_id: 1,
op_id: 2,
arg: 3,
result: 4,
};
Expand All @@ -102,7 +97,7 @@ fn test_record_from() {
#[cfg(target_endian = "little")]
assert_eq!(
buf,
vec![1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
vec![1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
);
let actual = Record::from(buf);
assert_eq!(actual, expected);
Expand All @@ -111,10 +106,14 @@ fn test_record_from() {

pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
fn dispatch(
op_id: OpId,
control: &[u8],
zero_copy_buf: Option<PinnedBuf>,
) -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match record.op_id {
let http_bench_op = match op_id {
OP_LISTEN => {
assert!(is_sync);
op_listen()
Expand All @@ -139,7 +138,7 @@ fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
let rid = record.arg;
op_write(rid, zero_copy_buf)
}
_ => panic!("bad op {}", record.op_id),
_ => panic!("bad op {}", op_id),
};
let mut record_a = record.clone();
let mut record_b = record.clone();
Expand All @@ -162,7 +161,8 @@ fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
);

if is_sync {
Op::Sync(fut.wait().unwrap())
let buf = fut.wait().unwrap();
Op::Sync(buf)
} else {
Op::Async(fut)
}
Expand Down
Loading

0 comments on commit 8d16fe2

Please sign in to comment.