Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add op_id throughout op API #2734

Merged
merged 8 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 13 additions & 26 deletions cli/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,26 @@ use crate::state::ThreadSafeState;
use deno::Buf;
use deno::CoreOp;
use deno::Op;
use deno::OpId;
use deno::PinnedBuf;
use futures::Future;

const DISPATCH_MINIMAL_TOKEN: i32 = 0xCAFE;
const OP_READ: i32 = 1;
const OP_WRITE: i32 = 2;
const OP_READ: OpId = 1;
const OP_WRITE: OpId = 2;

#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
pub struct Record {
pub promise_id: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
}

impl Into<Buf> for Record {
fn into(self) -> Buf {
let vec = vec![
DISPATCH_MINIMAL_TOKEN,
self.promise_id,
self.op_id,
self.arg,
self.result,
];
let vec = vec![self.promise_id, self.arg, self.result];
let buf32 = vec.into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 5 * 4];
let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
unsafe { Box::from_raw(ptr) }
}
}
Expand All @@ -48,32 +41,25 @@ pub fn parse_min_record(bytes: &[u8]) -> Option<Record> {
let p32 = p as *const i32;
let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) };

if s.len() < 5 {
if s.len() != 3 {
return None;
}
let ptr = s.as_ptr();
let ints = unsafe { std::slice::from_raw_parts(ptr, 5) };
if ints[0] != DISPATCH_MINIMAL_TOKEN {
return None;
}
let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
Some(Record {
promise_id: ints[1],
op_id: ints[2],
arg: ints[3],
result: ints[4],
promise_id: ints[0],
arg: ints[1],
result: ints[2],
})
}

#[test]
fn test_parse_min_record() {
let buf = vec![
0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0,
];
let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0];
assert_eq!(
parse_min_record(&buf),
Some(Record {
promise_id: 1,
op_id: 2,
arg: 3,
result: 4,
})
Expand All @@ -88,11 +74,12 @@ fn test_parse_min_record() {

pub fn dispatch_minimal(
state: &ThreadSafeState,
op_id: OpId,
mut record: Record,
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
let is_sync = record.promise_id == 0;
let min_op = match record.op_id {
let min_op = match op_id {
OP_READ => ops::read(record.arg, zero_copy),
OP_WRITE => ops::write(record.arg, zero_copy),
_ => unimplemented!(),
Expand Down
17 changes: 7 additions & 10 deletions cli/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,7 @@ use crate::tokio_write;
use crate::version;
use crate::worker::Worker;
use atty;
use deno::Buf;
use deno::CoreOp;
use deno::ErrBox;
use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::OpResult;
use deno::PinnedBuf;
use deno::*;
use flatbuffers::FlatBufferBuilder;
use futures;
use futures::Async;
Expand Down Expand Up @@ -82,16 +75,20 @@ fn empty_buf() -> Buf {
Box::new([])
}

const FLATBUFFER_OP_ID: OpId = 44;

pub fn dispatch_all(
state: &ThreadSafeState,
op_id: OpId,
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
) -> CoreOp {
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
let op = if let Some(min_record) = parse_min_record(control) {
dispatch_minimal(state, min_record, zero_copy)
let op = if op_id != FLATBUFFER_OP_ID {
let min_record = parse_min_record(control).unwrap();
dispatch_minimal(state, op_id, min_record, zero_copy)
} else {
dispatch_all_legacy(state, control, zero_copy, op_selector)
};
Expand Down
4 changes: 3 additions & 1 deletion cli/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use deno::CoreOp;
use deno::ErrBox;
use deno::Loader;
use deno::ModuleSpecifier;
use deno::OpId;
use deno::PinnedBuf;
use futures::future::Shared;
use futures::Future;
Expand Down Expand Up @@ -104,10 +105,11 @@ impl Deref for ThreadSafeState {
impl ThreadSafeState {
pub fn dispatch(
&self,
op_id: OpId,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
ops::dispatch_all(self, control, zero_copy, self.dispatch_selector)
ops::dispatch_all(self, op_id, control, zero_copy, self.dispatch_selector)
}
}

Expand Down
4 changes: 2 additions & 2 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ impl Worker {
{
let mut i = isolate.lock().unwrap();
let state_ = state.clone();
i.set_dispatch(move |control_buf, zero_copy_buf| {
state_.dispatch(control_buf, zero_copy_buf)
i.set_dispatch(move |op_id, control_buf, zero_copy_buf| {
state_.dispatch(op_id, control_buf, zero_copy_buf)
});
let state_ = state.clone();
i.set_js_error_create(move |v8_exception| {
Expand Down
3 changes: 2 additions & 1 deletion core/core.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
// Deno and therefore do not flow through to the runtime type library.

declare interface MessageCallback {
(msg: Uint8Array): void;
(opId: number, msg: Uint8Array): void;
}

declare interface DenoCore {
dispatch(
opId: number,
control: Uint8Array,
zeroCopy?: ArrayBufferView | null
): Uint8Array | null;
Expand Down
20 changes: 9 additions & 11 deletions core/examples/http_bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,19 @@ function createResolvable() {
return Object.assign(promise, methods);
}

const scratch32 = new Int32Array(4);
const scratch32 = new Int32Array(3);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteLength
);
assert(scratchBytes.byteLength === 4 * 4);
assert(scratchBytes.byteLength === 3 * 4);

function send(promiseId, opId, arg, zeroCopy = null) {
scratch32[0] = promiseId;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
return Deno.core.dispatch(scratchBytes, zeroCopy);
scratch32[1] = arg;
scratch32[2] = -1;
return Deno.core.dispatch(opId, scratchBytes, zeroCopy);
}

/** Returns Promise<number> */
Expand All @@ -55,13 +54,12 @@ function sendAsync(opId, arg, zeroCopy = null) {
}

function recordFromBuf(buf) {
assert(buf.byteLength === 16);
assert(buf.byteLength === 3 * 4);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
return {
promiseId: buf32[0],
opId: buf32[1],
arg: buf32[2],
result: buf32[3]
arg: buf32[1],
result: buf32[2]
};
}

Expand All @@ -72,7 +70,7 @@ function sendSync(opId, arg) {
return record.result;
}

function handleAsyncMsgFromRust(buf) {
function handleAsyncMsgFromRust(opId, buf) {
const record = recordFromBuf(buf);
const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
Expand Down
47 changes: 23 additions & 24 deletions core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,23 @@ impl log::Log for Logger {
fn flush(&self) {}
}

const OP_LISTEN: i32 = 1;
const OP_ACCEPT: i32 = 2;
const OP_READ: i32 = 3;
const OP_WRITE: i32 = 4;
const OP_CLOSE: i32 = 5;
const OP_LISTEN: OpId = 1;
const OP_ACCEPT: OpId = 2;
const OP_READ: OpId = 3;
const OP_WRITE: OpId = 4;
const OP_CLOSE: OpId = 5;

#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
}

impl Into<Buf> for Record {
fn into(self) -> Buf {
let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result]
.into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 16];
let buf32 = vec![self.promise_id, self.arg, self.result].into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
unsafe { Box::from_raw(ptr) }
}
}
Expand All @@ -63,28 +61,26 @@ impl From<&[u8]> for Record {
fn from(s: &[u8]) -> Record {
#[allow(clippy::cast_ptr_alignment)]
let ptr = s.as_ptr() as *const i32;
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
Record {
promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
arg: ints[1],
result: ints[2],
}
}
}

impl From<Buf> for Record {
fn from(buf: Buf) -> Record {
assert_eq!(buf.len(), 4 * 4);
assert_eq!(buf.len(), 3 * 4);
#[allow(clippy::cast_ptr_alignment)]
let ptr = Box::into_raw(buf) as *mut [i32; 4];
let ptr = Box::into_raw(buf) as *mut [i32; 3];
let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) };
assert_eq!(ints.len(), 4);
assert_eq!(ints.len(), 3);
Record {
promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
arg: ints[1],
result: ints[2],
}
}
}
Expand All @@ -93,7 +89,6 @@ impl From<Buf> for Record {
fn test_record_from() {
let r = Record {
promise_id: 1,
op_id: 2,
arg: 3,
result: 4,
};
Expand All @@ -102,7 +97,7 @@ fn test_record_from() {
#[cfg(target_endian = "little")]
assert_eq!(
buf,
vec![1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
vec![1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
);
let actual = Record::from(buf);
assert_eq!(actual, expected);
Expand All @@ -111,10 +106,14 @@ fn test_record_from() {

pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
fn dispatch(
op_id: OpId,
control: &[u8],
zero_copy_buf: Option<PinnedBuf>,
) -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match record.op_id {
let http_bench_op = match op_id {
OP_LISTEN => {
assert!(is_sync);
op_listen()
Expand All @@ -139,7 +138,7 @@ fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
let rid = record.arg;
op_write(rid, zero_copy_buf)
}
_ => panic!("bad op {}", record.op_id),
_ => panic!("bad op {}", op_id),
};
let mut record_a = record.clone();
let mut record_b = record.clone();
Expand Down
Loading