Skip to content

Commit

Permalink
Add the ability to recieve and send headers on requests. (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChillFish8 authored Sep 3, 2023
1 parent 127e4a3 commit 43a197f
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 97 deletions.
58 changes: 58 additions & 0 deletions datacake-rpc/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use std::ops::{Deref, DerefMut};

use rkyv::{Archive, Serialize};

use crate::rkyv_tooling::DatacakeSerializer;
use crate::Status;

/// A wrapper type around the internal [hyper::Body]
pub struct Body(pub(crate) hyper::Body);

impl Body {
/// Creates a new body.
pub fn new(inner: hyper::Body) -> Self {
Self(inner)
}
Expand Down Expand Up @@ -36,3 +42,55 @@ impl DerefMut for Body {
&mut self.0
}
}

/// The serializer trait converting replies into hyper bodies.
///
/// This is a light abstraction to allow users to be able to
/// stream data across the RPC system which may not fit in memory.
///
/// Any types which implement [TryAsBody] will implement this type.
pub trait TryIntoBody {
/// Try convert the reply into a body or return an error
/// status.
fn try_into_body(self) -> Result<Body, Status>;
}

/// The serializer trait for converting replies into hyper bodies
/// using a reference to self.
///
/// This will work for most implementations but if you want to stream
/// hyper bodies for example, you cannot implement this trait.
pub trait TryAsBody {
/// Try convert the reply into a body or return an error
/// status.
fn try_as_body(&self) -> Result<Body, Status>;
}

impl<T> TryAsBody for T
where
T: Archive + Serialize<DatacakeSerializer>,
{
#[inline]
fn try_as_body(&self) -> Result<Body, Status> {
crate::rkyv_tooling::to_view_bytes(self)
.map(|v| Body::from(v.to_vec()))
.map_err(|e| Status::internal(e.to_string()))
}
}

impl<T> TryIntoBody for T
where
T: TryAsBody,
{
#[inline]
fn try_into_body(self) -> Result<Body, Status> {
<Self as TryAsBody>::try_as_body(&self)
}
}

impl TryIntoBody for Body {
#[inline]
fn try_into_body(self) -> Result<Body, Status> {
Ok(self)
}
}
136 changes: 119 additions & 17 deletions datacake-rpc/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use std::future::Future;
use std::marker::PhantomData;
use std::time::Duration;

use crate::handler::{Handler, RpcService, TryAsBody, TryIntoBody};
use http::header::IntoHeaderName;
use http::{HeaderMap, HeaderValue, StatusCode};

use crate::body::{Body, TryAsBody, TryIntoBody};
use crate::handler::{Handler, RpcService};
use crate::net::{Channel, Status};
use crate::request::{MessageMetadata, RequestContents};
use crate::{Body, DataView};
use crate::DataView;

/// A type alias for the returned data view of the RPC message reply.
pub type MessageReply<Svc, Msg> =
Expand Down Expand Up @@ -108,6 +113,7 @@ where
self.timeout = Some(timeout);
}

#[inline]
/// Creates a new RPC client which can handle a new service type.
///
/// [RpcClient]'s are cheap to create and should be preferred over
Expand All @@ -123,14 +129,107 @@ where
}
}

#[inline]
/// Sends a message to the server and wait for a reply.
///
/// This lets you send messages behind a reference which can help
/// avoid excess copies when it isn't needed.
///
/// In the event you need to send a [Body] or type which must consume `self`
/// you can use [Self::send_owned]
pub fn send<'a, 'slf: 'a, Msg>(
&'slf self,
msg: &'a Msg,
) -> impl Future<Output = Result<MessageReply<Svc, Msg>, Status>> + 'a
where
Msg: RequestContents + TryAsBody,
Svc: Handler<Msg>,
// Due to some interesting compiler errors, we couldn't use GATs here to enforce
// this on the trait side, which is a shame.
<Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
{
let ctx = self.create_rpc_context();
ctx.send(msg)
}

#[inline]
/// Sends a message to the server and wait for a reply using an owned
/// message value.
///
/// This allows you to send types implementing [TryIntoBody] like [Body].
pub fn send_owned<'slf, Msg>(
&'slf self,
msg: Msg,
) -> impl Future<Output = Result<MessageReply<Svc, Msg>, Status>> + 'slf
where
Msg: RequestContents + TryIntoBody + 'slf,
Svc: Handler<Msg>,
// Due to some interesting compiler errors, we couldn't use GATs here to enforce
// this on the trait side, which is a shame.
<Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
{
let ctx = self.create_rpc_context();
ctx.send_owned(msg)
}

#[inline]
/// Creates a new RPC context which can customise more of
/// the request than the convenience methods, i.e. Headers.
pub fn create_rpc_context(&self) -> RpcContext<Svc> {
RpcContext {
client: self,
headers: HeaderMap::new(),
}
}
}

/// A configurable RPC context that can be used to
/// fine tune the request/response of the RPC calls.
///
/// This allows you to pass and receive headers while
/// being reasonably convenient and without
/// breaking existing implementations.
pub struct RpcContext<'a, Svc>
where
Svc: RpcService,
{
client: &'a RpcClient<Svc>,
headers: HeaderMap,
}

impl<'a, Svc> RpcContext<'a, Svc>
where
Svc: RpcService,
{
/// Set a single request header.
pub fn set_header<K>(mut self, key: K, value: HeaderValue) -> Self
where
K: IntoHeaderName,
{
self.headers.insert(key, value);
self
}

/// Set multiple request headers.
pub fn set_headers<K, I>(mut self, headers: I) -> Self
where
K: IntoHeaderName,
I: IntoIterator<Item = (K, HeaderValue)>,
{
for (key, value) in headers {
self.headers.insert(key, value);
}
self
}

/// Sends a message to the server and wait for a reply.
///
/// This lets you send messages behind a reference which can help
/// avoid excess copies when it isn't needed.
///
/// In the event you need to send a [Body] or type which must consume `self`
/// you can use [Self::send_owned]
pub async fn send<Msg>(&self, msg: &Msg) -> Result<MessageReply<Svc, Msg>, Status>
pub async fn send<Msg>(self, msg: &Msg) -> Result<MessageReply<Svc, Msg>, Status>
where
Msg: RequestContents + TryAsBody,
Svc: Handler<Msg>,
Expand All @@ -144,15 +243,15 @@ where
};

let body = msg.try_as_body()?;
self.send_body(body, metadata).await
self.send_inner(body, metadata).await
}

/// Sends a message to the server and wait for a reply using an owned
/// message value.
///
/// This allows you to send types implementing [TryIntoBody] like [Body].
pub async fn send_owned<Msg>(
&self,
self,
msg: Msg,
) -> Result<MessageReply<Svc, Msg>, Status>
where
Expand All @@ -168,11 +267,11 @@ where
};

let body = msg.try_into_body()?;
self.send_body(body, metadata).await
self.send_inner(body, metadata).await
}

async fn send_body<Msg>(
&self,
async fn send_inner<Msg>(
self,
body: Body,
metadata: MessageMetadata,
) -> Result<MessageReply<Svc, Msg>, Status>
Expand All @@ -183,23 +282,26 @@ where
// this on the trait side, which is a shame.
<Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
{
let future = self.channel.send_msg(metadata, body);
let future = self.client.channel.send_parts(metadata, self.headers, body);

let result = match self.timeout {
let response = match self.client.timeout {
Some(duration) => tokio::time::timeout(duration, future)
.await
.map_err(|_| Status::timeout())?
.map_err(Status::connection)?,
None => future.await.map_err(Status::connection)?,
};

match result {
Ok(body) => <<Svc as Handler<Msg>>::Reply>::from_body(body).await,
Err(buffer) => {
let status =
DataView::<Status>::using(buffer).map_err(|_| Status::invalid())?;
Err(status.to_owned().unwrap_or_else(|_| Status::invalid()))
},
let (head, body) = response.into_parts();

if head.status == StatusCode::OK {
return <<Svc as Handler<Msg>>::Reply>::from_body(Body::new(body)).await;
}

let buffer = crate::utils::to_aligned(body)
.await
.map_err(|e| Status::internal(e.message()))?;
let status = DataView::<Status>::using(buffer).map_err(|_| Status::invalid())?;
Err(status.to_owned().unwrap_or_else(|_| Status::invalid()))
}
}
66 changes: 8 additions & 58 deletions datacake-rpc/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use std::net::SocketAddr;
use std::sync::Arc;

use async_trait::async_trait;
use rkyv::{Archive, Serialize};
use http::HeaderMap;

use crate::body::TryIntoBody;
use crate::net::Status;
use crate::request::{Request, RequestContents};
use crate::rkyv_tooling::DatacakeSerializer;
use crate::Body;

/// A specific handler key.
Expand Down Expand Up @@ -235,7 +235,8 @@ pub(crate) trait OpaqueMessageHandler: Send + Sync {
async fn try_handle(
&self,
remote_addr: SocketAddr,
data: Body,
headers: HeaderMap,
body: Body,
) -> Result<Body, Status>;
}

Expand All @@ -257,67 +258,16 @@ where
async fn try_handle(
&self,
remote_addr: SocketAddr,
data: Body,
headers: HeaderMap,
body: Body,
) -> Result<Body, Status> {
let view = Msg::from_body(data).await?;
let view = Msg::from_body(body).await?;

let msg = Request::new(remote_addr, view);
let msg = Request::new(remote_addr, headers, view);

self.handler
.on_message(msg)
.await
.and_then(|reply| reply.try_into_body())
}
}

/// The serializer trait converting replies into hyper bodies.
///
/// This is a light abstraction to allow users to be able to
/// stream data across the RPC system which may not fit in memory.
///
/// Any types which implement [TryAsBody] will implement this type.
pub trait TryIntoBody {
/// Try convert the reply into a body or return an error
/// status.
fn try_into_body(self) -> Result<Body, Status>;
}

/// The serializer trait for converting replies into hyper bodies
/// using a reference to self.
///
/// This will work for most implementations but if you want to stream
/// hyper bodies for example, you cannot implement this trait.
pub trait TryAsBody {
/// Try convert the reply into a body or return an error
/// status.
fn try_as_body(&self) -> Result<Body, Status>;
}

impl<T> TryAsBody for T
where
T: Archive + Serialize<DatacakeSerializer>,
{
#[inline]
fn try_as_body(&self) -> Result<Body, Status> {
crate::rkyv_tooling::to_view_bytes(self)
.map(|v| Body::from(v.to_vec()))
.map_err(|e| Status::internal(e.to_string()))
}
}

impl<T> TryIntoBody for T
where
T: TryAsBody,
{
#[inline]
fn try_into_body(self) -> Result<Body, Status> {
<Self as TryAsBody>::try_as_body(&self)
}
}

impl TryIntoBody for Body {
#[inline]
fn try_into_body(self) -> Result<Body, Status> {
Ok(self)
}
}
5 changes: 3 additions & 2 deletions datacake-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ use std::hash::{Hash, Hasher};

/// A re-export of the async-trait macro.
pub use async_trait::async_trait;
pub use http;

pub use self::body::Body;
pub use self::body::{Body, TryAsBody, TryIntoBody};
pub use self::client::{MessageReply, RpcClient};
pub use self::handler::{Handler, RpcService, ServiceRegistry, TryAsBody, TryIntoBody};
pub use self::handler::{Handler, RpcService, ServiceRegistry};
pub use self::net::{
ArchivedErrorCode,
ArchivedStatus,
Expand Down
Loading

0 comments on commit 43a197f

Please sign in to comment.