Skip to content

Commit

Permalink
minimal op map fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
bartlomieju committed Sep 20, 2019
1 parent 79f63c9 commit ff158ce
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 76 deletions.
75 changes: 12 additions & 63 deletions core/examples/http_bench.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// This is not a real HTTP server. We read blindly one time into 'requestBuf',
// then write this fixed 'responseBuf'. The point of this benchmark is to
// exercise the event loop in a simple yet semi-realistic way.
// TODO: sync these ops via `Deno.core.ops`;
const OP_LISTEN = 1;
const OP_ACCEPT = 2;
const OP_READ = 3;
const OP_WRITE = 4;
const OP_CLOSE = 0;
const OP_LISTEN = Deno.core.opsMap["listen"];
const OP_ACCEPT = Deno.core.opsMap["accept"];
const OP_READ = Deno.core.opsMap["read"];
const OP_WRITE = Deno.core.opsMap["write"];
const OP_CLOSE = Deno.core.opsMap["close"];
const requestBuf = new Uint8Array(64 * 1024);
const responseBuf = new Uint8Array(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
Expand All @@ -16,43 +15,6 @@ const responseBuf = new Uint8Array(
const promiseMap = new Map();
let nextPromiseId = 1;

const opRegistry = [];

class Op {
constructor(name, handler) {
this.name = name;
this.handler = handler;
this.opId = 0;
opRegistry.push(this);
}

setOpId(opId) {
this.opId = opId;
}

/** Returns i32 number */
sendSync(arg, zeroCopy = null) {
const buf = send(0, this.opId, arg, zeroCopy);
const record = recordFromBuf(buf);
return record.result;
}

/** Returns Promise<number> */
sendAsync(arg, zeroCopy = null) {
const promiseId = nextPromiseId++;
const p = createResolvable();
promiseMap.set(promiseId, p);
send(promiseId, this.opId, arg, zeroCopy);
return p;
}
}

const opListen = new Op("listen");
const opAccept = new Op("accept");
const opClose = new Op("close");
const opRead = new Op("read");
const opWrite = new Op("write");

function assert(cond) {
if (!cond) {
throw Error("assert");
Expand Down Expand Up @@ -118,29 +80,29 @@ function handleAsyncMsgFromRust(opId, buf) {

/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
return sendSync(opListen.opId, -1);
return sendSync(OP_LISTEN, -1);
}

/** Accepts a connection, returns rid. */
async function accept(rid) {
return await sendAsync(opAccept.opId, rid);
return await sendAsync(OP_ACCEPT, rid);
}

/**
* Reads a packet from the rid, presumably an http request. data is ignored.
* Returns bytes read.
*/
async function read(rid, data) {
return await sendAsync(opRead.opId, rid, data);
return await sendAsync(OP_READ, rid, data);
}

/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
async function write(rid, data) {
return await sendAsync(opWrite.opId, rid, data);
return await sendAsync(OP_WRITE, rid, data);
}

function close(rid) {
return sendSync(opClose.opId, rid);
return sendSync(OP_CLOSE, rid);
}

async function serve(rid) {
Expand All @@ -158,26 +120,13 @@ async function serve(rid) {
close(rid);
}

// TODO: this should be acquired from Rust via `Deno.core.getOpMap()`
const opMap = {
listen: 1,
accept: 2,
read: 3,
write: 4,
close: 0
};

async function main() {
Deno.core.setAsyncHandler(handleAsyncMsgFromRust);

// TODO: poor man's Deno.core.getOpMap()
for (const [key, opId] of Object.entries(opMap)) {
const op = opRegistry.find(el => el.name === key);
op.setOpId(opId);
}

Deno.core.print("http_bench.js start\n");

Deno.core.print("ops map " + JSON.stringify(Deno.core.opsMap) + "\n");

const listenerRid = listen();
Deno.core.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`);
while (true) {
Expand Down
2 changes: 1 addition & 1 deletion core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ fn main() {
});

let mut isolate = deno::Isolate::new(startup_data, false);
isolate.register_op("close", serialize_http_bench_op(op_close));
isolate.register_op("listen", serialize_http_bench_op(op_listen));
isolate.register_op("accept", serialize_http_bench_op(op_accept));
isolate.register_op("read", serialize_http_bench_op(op_read));
isolate.register_op("write", serialize_http_bench_op(op_write));
isolate.register_op("close", serialize_http_bench_op(op_close));

isolate.then(|r| {
js_check(r);
Expand Down
27 changes: 17 additions & 10 deletions core/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl Isolate {
have_unpolled_ops: false,
pending_dyn_imports: FuturesUnordered::new(),
startup_script,
op_registry: OpRegistry::default(),
op_registry: OpRegistry::new(),
}
}

Expand All @@ -272,6 +272,12 @@ impl Isolate {
control: &[u8],
zero_copy_buf: Option<PinnedBuf>,
) -> CoreOp {
if op_id == 0 {
let op_map = self.op_registry.get_op_map();
let op_map_json = serde_json::to_string(&op_map).unwrap();
let buf = op_map_json.as_bytes().to_owned().into_boxed_slice();
return Op::Sync(buf);
}
let ops = &self.op_registry.ops;
let op_handler = &*ops.get(op_id as usize).expect("Op not found!");
op_handler(control, zero_copy_buf)
Expand Down Expand Up @@ -348,15 +354,16 @@ impl Isolate {
) {
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };

let op = if let Some(ref f) = isolate.dispatch {
f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf))
} else {
isolate.call_op(
op_id,
control_buf.as_ref(),
PinnedBuf::new(zero_copy_buf),
)
};
let op = isolate.call_op(
op_id,
control_buf.as_ref(),
PinnedBuf::new(zero_copy_buf),
);
// let op = if let Some(ref f) = isolate.dispatch {
// f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf))
// } else {
//
// };

debug_assert_eq!(isolate.shared.size(), 0);
match op {
Expand Down
15 changes: 14 additions & 1 deletion core/ops.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use crate::CoreOp;
use crate::CoreOpHandler;
use crate::Op;
use crate::OpId;
use crate::PinnedBuf;
use std::collections::HashMap;

#[derive(Default)]
Expand All @@ -9,8 +12,18 @@ pub struct OpRegistry {
pub phone_book: HashMap<String, OpId>,
}

fn get_op_map(_control: &[u8], _zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
Op::Sync(Box::new([]))
}

impl OpRegistry {
#[allow(dead_code)]
pub fn new() -> Self {
// TODO: this is make shift fix for get op map
let mut registry = Self::default();
registry.register_op("get_op_map", Box::new(get_op_map));
registry
}

pub fn get_op_map(&self) -> HashMap<String, OpId> {
self.phone_book.clone()
}
Expand Down
7 changes: 6 additions & 1 deletion core/shared_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ SharedQueue Binary Layout
return Deno.core.send(opId, control, zeroCopy);
}

const opsMapBytes = dispatch(0, []);
const opsMapJson = String.fromCharCode.apply(null, opsMapBytes);
const opsMap = JSON.parse(opsMapJson);

const denoCore = {
setAsyncHandler,
dispatch,
Expand All @@ -189,7 +193,8 @@ SharedQueue Binary Layout
push,
reset,
shift
}
},
opsMap
};

assert(window[GLOBAL_NAMESPACE] != null);
Expand Down
2 changes: 2 additions & 0 deletions deno_typescript/lib.deno_core.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ declare interface DenoCore {
shift(): Uint8Array | null;
};

opsMap: Record<string, number>;

recv(cb: MessageCallback): void;

send(
Expand Down

0 comments on commit ff158ce

Please sign in to comment.