Skip to content

Commit

Permalink
Finished example.
Browse files Browse the repository at this point in the history
  • Loading branch information
wildarch committed Jun 12, 2020
1 parent 485e5cd commit caa442b
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 49 deletions.
1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 6 additions & 12 deletions examples/injection/client/injection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
#include "oak/client/application_client.h"
#include "oak/common/label.h"

ABSL_FLAG(std::string, address, "127.0.0.1:8080", "Address of the Oak application to connect to");
ABSL_FLAG(std::string, address, "localhost:8080", "Address of the Oak application to connect to");
ABSL_FLAG(std::string, ca_cert, "", "Path to the PEM-encoded CA root certificate");

using ::oak::examples::injection::BlobResponse;
using ::oak::examples::injection::BlobStore;
using ::oak::examples::injection::GetBlobRequest;
using ::oak::examples::injection::PutBlobRequest;
using ::oak::examples::injection::BlobResponse;

int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
Expand All @@ -53,7 +53,7 @@ int main(int argc, char** argv) {
BlobResponse putResponse;
grpc::Status putStatus = stub->PutBlob(&putContext, putRequest, &putResponse);
if (!putStatus.ok()) {
LOG(FATAL) << "PutBlob failed: " << putStatus.error_code() << ": " << putStatus.error_message();
LOG(FATAL) << "PutBlob failed: " << putStatus.error_code() << ": " << putStatus.error_message();
}
LOG(INFO) << "Blob stored at id: " << putResponse.id();

Expand All @@ -63,19 +63,13 @@ int main(int argc, char** argv) {
BlobResponse getResponse;
grpc::Status getStatus = stub->GetBlob(&getContext, getRequest, &getResponse);
if (!getStatus.ok()) {
LOG(FATAL) << "GetBlob failed: "
<< getStatus.error_code()
<< ": "
<< getStatus.error_message();
LOG(FATAL) << "GetBlob failed: " << getStatus.error_code() << ": " << getStatus.error_message();
}
LOG(INFO) << "Sucessfully retrieved Blob";

if (putRequest.blob() != getResponse.blob()) {
LOG(FATAL) << "Blobs were different. Original: '"
<< putRequest.blob()
<< "', retrieved: '"
<< getResponse.blob()
<< "'";
LOG(FATAL) << "Blobs were different. Original: '" << putRequest.blob() << "', retrieved: '"
<< getResponse.blob() << "'";
}
LOG(INFO) << "Blobs match!";

Expand Down
2 changes: 1 addition & 1 deletion examples/injection/config/config.textproto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ initial_node_configuration: {
name: "main"
wasm_config: {
wasm_module_name: "app"
wasm_entrypoint_name: "grpc_oak_main"
wasm_entrypoint_name: "grpc_fe"
}
}
220 changes: 197 additions & 23 deletions examples/injection/module/rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#![allow(dead_code)]
//
// Copyright 2019 The Project Oak Authors
//
Expand All @@ -19,32 +18,61 @@ mod proto {
include!(concat!(env!("OUT_DIR"), "/oak.examples.injection.rs"));
}

use oak::grpc;
//use oak::io::{Receiver, Sender},
use proto::{BlobResponse, BlobStore, BlobStoreDispatcher, GetBlobRequest, PutBlobRequest};
// BlobStoreInterface
// BlobStoreRequest
use oak::{
grpc,
io::{Receiver, Sender},
};
use proto::{
blob_request::Request, BlobRequest, BlobResponse, BlobStore, BlobStoreDispatcher,
BlobStoreInterface, BlobStoreProviderSender, BlobStoreRequest, BlobStoreSender, GetBlobRequest,
PutBlobRequest,
};

oak::entrypoint!(grpc_oak_main => |_in_channel| {
// TODO remove
oak::entrypoint!(grpc_basic_store => |_in_channel| {
oak::logger::init_default();
let grpc_channel = oak::grpc::server::init("[::]:8080")
.expect("could not create gRPC server pseudo-Node");
let dispatcher = BlobStoreDispatcher::new(Node::default());
oak::run_event_loop(dispatcher, grpc_channel);
});

/*
oak::entrypoint!(grpc_fe => |_in_channel| {
let (provider_write_handle, provider_read_handle) = oak::channel_create().unwrap();
oak::node_create(&oak::node_config::wasm("injection", "provider"), provider_read_handle)
oak::logger::init_default();
let (to_provider_write_handle, to_provider_read_handle) = oak::channel_create().unwrap();
let (from_provider_write_handle, from_provider_read_handle) = oak::channel_create().unwrap();
oak::node_create(&oak::node_config::wasm("app", "provider"), to_provider_read_handle)
.expect("Failed to create provider");
Sender::new(to_provider_write_handle)
.send(&BlobStoreProviderSender { sender: Some(Sender::new(from_provider_write_handle)) })
.expect("Failed to send handle to provider");

let dispatcher = BlobStoreDispatcher::new(BlobStoreFrontend::default());
let frontend = BlobStoreFrontend::new(
Sender::new(to_provider_write_handle),
Receiver::new(from_provider_read_handle));
let dispatcher = BlobStoreDispatcher::new(frontend);
let grpc_channel = oak::grpc::server::init("[::]:8080")
.expect("could not create gRPC server pseudo-Node");
oak::run_event_loop(dispatcher, grpc_channel);
});
*/

oak::entrypoint!(provider => |frontend_read| {
let frontend_sender =
Receiver::<BlobStoreProviderSender>::new(frontend_read).receive()
.expect("Did not receive a write handle")
.sender
.expect("No write handle in received message");
oak::run_event_loop(BlobStoreProvider::new(frontend_sender), frontend_read);
});

oak::entrypoint!(store => |reader| {
let sender =
Receiver::<BlobStoreSender>::new(reader).receive()
.expect("Did not receive a write handle")
.sender
.expect("No write handle in received message");
oak::run_event_loop(BlobStoreImpl::new(sender), reader);
});

#[derive(Default)]
struct Node {
Expand Down Expand Up @@ -88,23 +116,169 @@ fn blob_index(id: u64) -> usize {
(id - 1) as usize
}

/*
enum CachedStore {
NotCached {
sender: Sender<BlobStoreRequest>,
receiver: Receiver<BlobStoreInterface>,
},
Cached(BlobStoreInterface),
}

struct BlobStoreFrontend {
interface: Option<BlobStoreInterface>,
req_channel: Sender<BlobStoreRequest>,
res_channel: Receiver<BlobStoreInterface>,
store: CachedStore,
}

impl BlobStoreFrontend {
pub fn new(
sender: Sender<BlobStoreRequest>,
receiver: Receiver<BlobStoreInterface>,
) -> BlobStoreFrontend {
BlobStoreFrontend {
store: CachedStore::NotCached { sender, receiver },
}
}

fn get_interface(&mut self) -> &BlobStoreInterface {
// Make sure it is cached
if let CachedStore::NotCached { sender, receiver } = &self.store {
sender
.send(&BlobStoreRequest {})
.expect("Failed to send BlobStoreRequest");
let iface = receiver
.receive()
.expect("Failed to receive BlobStoreInterface");
self.store = CachedStore::Cached(iface.clone());
};

match &self.store {
CachedStore::Cached(iface) => &iface,
_ => unreachable!(),
}
}
}

impl BlobStore for BlobStoreFrontend {
fn get_blob(&mut self, _request: GetBlobRequest) -> grpc::Result<BlobResponse> {
unimplemented!()
fn get_blob(&mut self, request: GetBlobRequest) -> grpc::Result<BlobResponse> {
let iface = self.get_interface();
iface
.sender
.as_ref()
.unwrap()
.send(&BlobRequest {
request: Some(Request::Get(request)),
})
.expect("Could not forward get request");
let response = iface
.receiver
.as_ref()
.unwrap()
.receive()
.expect("Failed to receive get response");
Ok(response)
}

fn put_blob(&mut self, _request: PutBlobRequest) -> grpc::Result<BlobResponse> {
unimplemented!()
fn put_blob(&mut self, request: PutBlobRequest) -> grpc::Result<BlobResponse> {
let iface = self.get_interface();
iface
.sender
.as_ref()
.unwrap()
.send(&BlobRequest {
request: Some(Request::Put(request)),
})
.expect("Could not forward put request");
let response = iface
.receiver
.as_ref()
.unwrap()
.receive()
.expect("Failed to receive put response");
Ok(response)
}
}

struct BlobStoreProvider;
struct BlobStoreImpl;
*/
struct BlobStoreProvider {
sender: Sender<BlobStoreInterface>,
}

impl BlobStoreProvider {
pub fn new(sender: Sender<BlobStoreInterface>) -> BlobStoreProvider {
BlobStoreProvider { sender }
}
}

impl oak::Node<BlobStoreRequest> for BlobStoreProvider {
fn handle_command(&mut self, _command: BlobStoreRequest) -> Result<(), oak::OakError> {
// Create new BlobStore
let (to_store_write_handle, to_store_read_handle) = oak::channel_create().unwrap();
let (from_store_write_handle, from_store_read_handle) = oak::channel_create().unwrap();
oak::node_create(
&oak::node_config::wasm("app", "store"),
to_store_read_handle,
)?;

Sender::new(to_store_write_handle).send(&BlobStoreSender {
sender: Some(Sender::new(from_store_write_handle)),
})?;

self.sender.send(&BlobStoreInterface {
sender: Some(Sender::new(to_store_write_handle)),
receiver: Some(Receiver::new(from_store_read_handle)),
})
}
}

struct BlobStoreImpl {
sender: Sender<BlobResponse>,
blobs: Vec<String>,
}

impl BlobStoreImpl {
pub fn new(sender: Sender<BlobResponse>) -> BlobStoreImpl {
BlobStoreImpl {
sender,
blobs: Vec::new(),
}
}

fn get_blob(&mut self, request: GetBlobRequest) -> BlobResponse {
self.blobs
.get(blob_index(request.id))
.map(|blob| BlobResponse {
blob: blob.clone(),
id: request.id,
})
// Return the default instance if the blob was not found.
.unwrap_or_default()
}

fn put_blob(&mut self, request: PutBlobRequest) -> BlobResponse {
if request.id == 0 {
// Insert a new blob
self.blobs.push(request.blob.clone());
BlobResponse {
id: self.blobs.len() as u64,
blob: request.blob,
}
} else if let Some(blob) = self.blobs.get_mut(blob_index(request.id)) {
*blob = request.blob.clone();
BlobResponse {
id: request.id,
blob: request.blob,
}
} else {
BlobResponse::default()
}
}
}

impl oak::Node<BlobRequest> for BlobStoreImpl {
fn handle_command(&mut self, request: BlobRequest) -> Result<(), oak::OakError> {
let response = match request.request {
Some(Request::Get(req)) => self.get_blob(req),
Some(Request::Put(req)) => self.put_blob(req),
None => panic!("No inner request"),
};
self.sender.send(&response)
}
}
45 changes: 32 additions & 13 deletions examples/injection/proto/injection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,56 @@ import "oak/proto/handle.proto";
package oak.examples.injection;

message BlobStoreInterface {
oak.handle.Sender put = 1 [(oak.handle.message_type) = ".oak.examples.injection.GetBlobRequest"];
oak.handle.Sender get = 2 [(oak.handle.message_type) = ".oak.examples.injection.PutBlobRequest"];
oak.handle.Receiver recv = 3 [(oak.handle.message_type) = ".oak.examples.injection.BlobResponse"];
oak.handle.Sender sender = 1 [(oak.handle.message_type) = ".oak.examples.injection.BlobRequest"];
oak.handle.Receiver receiver = 2
[(oak.handle.message_type) = ".oak.examples.injection.BlobResponse"];
}

message GetBlobRequest {
fixed64 id = 1;
fixed64 id = 1;
}

message PutBlobRequest {
// Put 0 for new blob to store.
fixed64 id = 1;
string blob = 2;
// Put 0 for new blob to store.
fixed64 id = 1;
string blob = 2;
}

message BlobRequest {
oneof request {
GetBlobRequest get = 1;
PutBlobRequest put = 2;
}
}

message BlobResponse {
fixed64 id = 1;
string blob = 2;
fixed64 id = 1;
string blob = 2;
}

message BlobStoreRequest {}
message BlobStoreRequest {
}

message BlobStoreResponse {
BlobStoreInterface interface = 1;
BlobStoreInterface interface = 1;
}

/*
service BlobStoreProviderService {
rpc GetBlobStore(BlobStoreRequest) returns (BlobStoreResponse);
}
*/

service BlobStore {
rpc GetBlob(GetBlobRequest) returns (BlobResponse);
rpc PutBlob(PutBlobRequest) returns (BlobResponse);
rpc GetBlob(GetBlobRequest) returns (BlobResponse);
rpc PutBlob(PutBlobRequest) returns (BlobResponse);
}

message BlobStoreProviderSender {
oak.handle.Sender sender = 1
[(oak.handle.message_type) = ".oak.examples.injection.BlobStoreInterface"];
}

message BlobStoreSender {
oak.handle.Sender sender = 1 [(oak.handle.message_type) = ".oak.examples.injection.BlobResponse"];
}

0 comments on commit caa442b

Please sign in to comment.