Skip to content

Commit

Permalink
Improve logging of gRPC request handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tiziano88 committed Jun 5, 2020
1 parent 0a2077d commit 652eeb7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 21 deletions.
1 change: 1 addition & 0 deletions examples/aggregator/client/aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/escaping.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_split.h"
#include "absl/types/optional.h"
Expand Down
38 changes: 17 additions & 21 deletions oak/server/rust/oak_runtime/src/node/grpc/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ impl Node for GrpcServerNode {
notify_receiver: oneshot::Receiver<()>,
) {
// Receive a `channel_writer` handle used to pass handles for temporary channels.
info!("{}: Waiting for a channel writer", self.node_name);
info!("{}: Waiting for channel writer", self.node_name);
let channel_writer = GrpcServerNode::get_channel_writer(&runtime, handle)
.expect("Couldn't initialize a channel writer");
.expect("Couldn't initialize channel writer");

// Handles incoming authentication gRPC requests
let auth_handler = match self.oidc_client_info {
Expand Down Expand Up @@ -199,11 +199,11 @@ impl Node for GrpcServerNode {
// Necessary for creating a Tokio Runtime.
.enable_time()
.build()
.expect("Couldn't create an Async runtime");
.expect("Couldn't create Async runtime");

// Start a gRPC server.
info!(
"{}: Starting a gRPC server pseudo-Node on: {}",
"{}: Starting gRPC server pseudo-Node on: {}",
self.node_name, self.address
);
let result = async_runtime.block_on(server);
Expand Down Expand Up @@ -244,7 +244,6 @@ impl Service<http::Request<hyper::Body>> for HttpRequestHandler {
self.runtime.clone(),
self.writer,
request.uri().path().to_string(),
MetadataMap::from_headers(request.headers().clone()),
);

let method_name = request.uri().path().to_string();
Expand Down Expand Up @@ -330,7 +329,6 @@ struct GrpcRequestHandler {
writer: oak_abi::Handle,
/// Name of the gRPC method that should be invoked.
method_name: String,
metadata_map: MetadataMap,
}

impl UnaryService<Vec<u8>> for GrpcRequestHandler {
Expand All @@ -340,11 +338,8 @@ impl UnaryService<Vec<u8>> for GrpcRequestHandler {
fn call(&mut self, request: tonic::Request<Vec<u8>>) -> Self::Future {
let handler = self.clone();

let oak_label_result = get_oak_label(&self.metadata_map);

let metrics_data = handler.runtime.metrics_data();
let future = async move {
debug!("Processing gRPC request: {:?}", request);
metrics_data
.grpc_server_metrics
.grpc_server_started_total
Expand All @@ -356,8 +351,15 @@ impl UnaryService<Vec<u8>> for GrpcRequestHandler {
.with_label_values(&[&handler.method_name])
.start_timer();

let oak_label = oak_label_result?;
debug!("gRPC Oak Label: {:?}", oak_label);
let oak_label = get_oak_label(request.metadata())?;
info!(
"handling gRPC request; peer address: {}, method: {}, request size: {} bytes, label: {:?}",
// TODO(#1089): Ensure that the client address is available.
request.remote_addr().map(|addr| addr.to_string()).unwrap_or_else(|| "<unknown>".to_string()),
handler.method_name,
request.get_ref().len(),
oak_label
);

// Create a gRPC request.
// TODO(#953): Add streaming support.
Expand Down Expand Up @@ -395,17 +397,11 @@ impl UnaryService<Vec<u8>> for GrpcRequestHandler {
}

impl GrpcRequestHandler {
fn new(
runtime: RuntimeProxy,
writer: oak_abi::Handle,
method_name: String,
metadata_map: MetadataMap,
) -> Self {
fn new(runtime: RuntimeProxy, writer: oak_abi::Handle, method_name: String) -> Self {
Self {
runtime,
writer,
method_name,
metadata_map,
}
}

Expand Down Expand Up @@ -447,15 +443,15 @@ impl GrpcRequestHandler {
handles: vec![],
};
request.encode(&mut message.data).map_err(|error| {
error!("Couldn't serialize a GrpcRequest message: {}", error);
error!("Couldn't serialize GrpcRequest message: {}", error);
})?;

// Send a message to the temporary channel.
self.runtime
.channel_write(request_writer, message)
.map_err(|error| {
error!(
"Couldn't write a message to the temporary channel: {:?}",
"Couldn't write message to the temporary channel: {:?}",
error
);
})?;
Expand All @@ -464,7 +460,7 @@ impl GrpcRequestHandler {
self.runtime
.channel_write(self.writer, invocation)
.map_err(|error| {
error!("Couldn't write a gRPC invocation message: {:?}", error);
error!("Couldn't write gRPC invocation message: {:?}", error);
})?;

Ok(response_reader)
Expand Down

0 comments on commit 652eeb7

Please sign in to comment.