Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dead code: legacy read/write ops #2776

Merged
merged 3 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub mod deno_dir;
pub mod deno_error;
pub mod diagnostics;
mod disk_cache;
mod dispatch_minimal;
mod file_fetcher;
pub mod flags;
pub mod fmt_errors;
Expand Down
22 changes: 0 additions & 22 deletions cli/msg.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ union Any {
PermissionRevoke,
Permissions,
PermissionsRes,
Read,
ReadDir,
ReadDirRes,
ReadRes,
Readlink,
ReadlinkRes,
Remove,
Expand Down Expand Up @@ -83,8 +81,6 @@ union Any {
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
Write,
WriteRes,
}

enum ErrorKind: byte {
Expand Down Expand Up @@ -491,24 +487,6 @@ table OpenRes {
rid: uint32;
}

table Read {
rid: uint32;
// (ptr, len) is passed as second parameter to Deno.core.send().
}

table ReadRes {
nread: uint;
eof: bool;
}

table Write {
rid: uint32;
}

table WriteRes {
nbyte: uint;
}

table Close {
rid: uint32;
}
Expand Down
74 changes: 11 additions & 63 deletions cli/dispatch_minimal.rs → cli/ops/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@
use crate::state::ThreadSafeState;
use deno::Buf;
use deno::CoreOp;
use deno::ErrBox;
use deno::Op;
use deno::OpId;
use deno::PinnedBuf;
use futures::Future;

const OP_READ: OpId = 1;
const OP_WRITE: OpId = 2;

#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
pub struct Record {
Expand Down Expand Up @@ -72,21 +69,23 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}

pub fn dispatch_minimal(
pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;
pub type Dispatcher = fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of this name. I always imagined "dispatcher" as a function which maps "op id" to "op handler".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's what this does (modulo the op id part)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmmm, I'm not sure. It looks like Dispatcher is actually an "op", it returns MinimalOp which is the result of that op. I had a feeling that it was supposed to be that Dispatcher selects MinimalOp which returns some MinimalOpResult. But that's nomenclature... just thinking out loud

Copy link
Member Author

@ry ry Aug 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Op = Promise = Future .. there are many ops for each dispatcher. An OpDispatcher (yet to be defined but outlined in #2730) is something that returns an Op.

So I think the term Dispatcher is appropriate here. That said, I consider this bit ephemeral as I plan on adding the OpDispatcher trait soon.


pub fn dispatch(
d: Dispatcher,
state: &ThreadSafeState,
op_id: OpId,
mut record: Record,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
let mut record = parse_min_record(control).unwrap();
let is_sync = record.promise_id == 0;
let min_op = match op_id {
OP_READ => ops::read(record.arg, zero_copy),
OP_WRITE => ops::write(record.arg, zero_copy),
_ => unimplemented!(),
};

let state = state.clone();

let rid = record.arg;
let min_op = d(rid, zero_copy);

let fut = Box::new(min_op.then(move |result| -> Result<Buf, ()> {
match result {
Ok(r) => {
Expand All @@ -109,54 +108,3 @@ pub fn dispatch_minimal(
Op::Async(fut)
}
}

mod ops {
use crate::deno_error;
use crate::resources;
use crate::tokio_write;
use deno::ErrBox;
use deno::PinnedBuf;
use futures::Future;

type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;

pub fn read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
None => {
return Box::new(
futures::future::err(deno_error::no_buffer_specified()),
)
}
Some(buf) => buf,
};
match resources::lookup(rid as u32) {
None => Box::new(futures::future::err(deno_error::bad_resource())),
Some(resource) => Box::new(
tokio::io::read(resource, zero_copy)
.map_err(ErrBox::from)
.and_then(move |(_resource, _buf, nread)| Ok(nread as i32)),
),
}
}

pub fn write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("write rid={}", rid);
let zero_copy = match zero_copy {
None => {
return Box::new(
futures::future::err(deno_error::no_buffer_specified()),
)
}
Some(buf) => buf,
};
match resources::lookup(rid as u32) {
None => Box::new(futures::future::err(deno_error::bad_resource())),
Some(resource) => Box::new(
tokio_write::write(resource, zero_copy)
.map_err(ErrBox::from)
.and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)),
),
}
}
}
86 changes: 0 additions & 86 deletions cli/ops/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::ops::serialize_response;
use crate::ops::CliOpResult;
use crate::resources;
use crate::state::ThreadSafeState;
use crate::tokio_write;
use deno::*;
use flatbuffers::FlatBufferBuilder;
use futures::Future;
Expand Down Expand Up @@ -119,91 +118,6 @@ pub fn op_close(
}
}

pub fn op_read(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
) -> CliOpResult {
let cmd_id = base.cmd_id();
let inner = base.inner_as_read().unwrap();
let rid = inner.rid();

match resources::lookup(rid) {
None => Err(deno_error::bad_resource()),
Some(resource) => {
let op = tokio::io::read(resource, data.unwrap())
.map_err(ErrBox::from)
.and_then(move |(_resource, _buf, nread)| {
let builder = &mut FlatBufferBuilder::new();
let inner = msg::ReadRes::create(
builder,
&msg::ReadResArgs {
nread: nread as u32,
eof: nread == 0,
},
);
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::ReadRes,
..Default::default()
},
))
});
if base.sync() {
let buf = op.wait()?;
Ok(Op::Sync(buf))
} else {
Ok(Op::Async(Box::new(op)))
}
}
}
}

pub fn op_write(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
) -> CliOpResult {
let cmd_id = base.cmd_id();
let inner = base.inner_as_write().unwrap();
let rid = inner.rid();

match resources::lookup(rid) {
None => Err(deno_error::bad_resource()),
Some(resource) => {
let op = tokio_write::write(resource, data.unwrap())
.map_err(ErrBox::from)
.and_then(move |(_resource, _buf, nwritten)| {
let builder = &mut FlatBufferBuilder::new();
let inner = msg::WriteRes::create(
builder,
&msg::WriteResArgs {
nbyte: nwritten as u32,
},
);
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::WriteRes,
..Default::default()
},
))
});
if base.sync() {
let buf = op.wait()?;
Ok(Op::Sync(buf))
} else {
Ok(Op::Async(Box::new(op)))
}
}
}
}

pub fn op_seek(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
Expand Down
43 changes: 43 additions & 0 deletions cli/ops/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use super::dispatch_minimal::MinimalOp;
use crate::deno_error;
use crate::resources;
use crate::tokio_write;
use deno::ErrBox;
use deno::PinnedBuf;
use futures::Future;

pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
None => {
return Box::new(futures::future::err(deno_error::no_buffer_specified()))
}
Some(buf) => buf,
};
match resources::lookup(rid as u32) {
None => Box::new(futures::future::err(deno_error::bad_resource())),
Some(resource) => Box::new(
tokio::io::read(resource, zero_copy)
.map_err(ErrBox::from)
.and_then(move |(_resource, _buf, nread)| Ok(nread as i32)),
),
}
}

pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("write rid={}", rid);
let zero_copy = match zero_copy {
None => {
return Box::new(futures::future::err(deno_error::no_buffer_specified()))
}
Some(buf) => buf,
};
match resources::lookup(rid as u32) {
None => Box::new(futures::future::err(deno_error::bad_resource())),
Some(resource) => Box::new(
tokio_write::write(resource, zero_copy)
.map_err(ErrBox::from)
.and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)),
),
}
}
28 changes: 18 additions & 10 deletions cli/ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use crate::deno_error::GetErrorKind;
use crate::dispatch_minimal::dispatch_minimal;
use crate::dispatch_minimal::parse_min_record;
use crate::msg;
use crate::state::ThreadSafeState;
use crate::tokio_util;
Expand All @@ -13,12 +11,15 @@ use hyper;
use hyper::rt::Future;
use tokio_threadpool;

mod dispatch_minimal;
mod io;

mod compiler;
use compiler::{op_cache, op_fetch_source_file};
mod errors;
use errors::{op_apply_source_map, op_format_error};
mod files;
use files::{op_close, op_open, op_read, op_seek, op_write};
use files::{op_close, op_open, op_seek};
mod fetch;
use fetch::op_fetch;
mod fs;
Expand Down Expand Up @@ -71,6 +72,8 @@ fn empty_buf() -> Buf {
}

const FLATBUFFER_OP_ID: OpId = 44;
const OP_READ: OpId = 1;
const OP_WRITE: OpId = 2;

pub fn dispatch_all(
state: &ThreadSafeState,
Expand All @@ -81,11 +84,18 @@ pub fn dispatch_all(
) -> CoreOp {
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
let op = if op_id != FLATBUFFER_OP_ID {
let min_record = parse_min_record(control).unwrap();
dispatch_minimal(state, op_id, min_record, zero_copy)
} else {
dispatch_all_legacy(state, control, zero_copy, op_selector)

let op = match op_id {
OP_READ => {
dispatch_minimal::dispatch(io::op_read, state, control, zero_copy)
}
OP_WRITE => {
dispatch_minimal::dispatch(io::op_write, state, control, zero_copy)
}
FLATBUFFER_OP_ID => {
dispatch_all_legacy(state, control, zero_copy, op_selector)
}
_ => panic!("bad op_id"),
};
state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);
op
Expand Down Expand Up @@ -226,7 +236,6 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
msg::Any::Open => Some(op_open),
msg::Any::PermissionRevoke => Some(op_revoke_permission),
msg::Any::Permissions => Some(op_permissions),
msg::Any::Read => Some(op_read),
msg::Any::ReadDir => Some(op_read_dir),
msg::Any::Readlink => Some(op_read_link),
msg::Any::Remove => Some(op_remove),
Expand All @@ -245,7 +254,6 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
msg::Any::Truncate => Some(op_truncate),
msg::Any::HomeDir => Some(op_home_dir),
msg::Any::Utime => Some(op_utime),
msg::Any::Write => Some(op_write),

// TODO(ry) split these out so that only the appropriate Workers can access
// them.
Expand Down
6 changes: 5 additions & 1 deletion js/dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ function flatbufferRecordFromBuf(buf: Uint8Array): FlatbufferRecord {
}

export function handleAsyncMsgFromRust(opId: number, ui8: Uint8Array): void {
const buf32 = new Int32Array(ui8.buffer, ui8.byteOffset, ui8.byteLength / 4);
if (opId !== FLATBUFFER_OP_ID) {
// Fast and new
const buf32 = new Int32Array(
ui8.buffer,
ui8.byteOffset,
ui8.byteLength / 4
);
const recordMin = recordFromBufMinimal(opId, buf32);
handleAsyncMsgFromRustMinimal(ui8, recordMin);
} else {
Expand Down
Loading