Skip to content

Commit

Permalink
Merge pull request #257 from H1rono/fix-parse-request
Browse files Browse the repository at this point in the history
`ParseRequestInner`修正
  • Loading branch information
H1rono authored Dec 11, 2024
2 parents ebd08be + b7dedfc commit 5d042d5
Showing 1 changed file with 116 additions and 21 deletions.
137 changes: 116 additions & 21 deletions src/parser/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use futures::future::Ready;
use futures::ready;
use http_body::Body;
use http_body_util::{combinators::Collect, Collected};
use pin_project_lite::pin_project;

use crate::error::{Error, Result};
use crate::events::{Event, EventKind};
use crate::parser::RequestParser;

pin_project_lite::pin_project! {
pin_project! {
#[must_use]
#[project = CollectBodyProject]
struct CollectBody<B>
Expand Down Expand Up @@ -44,41 +45,127 @@ where
}
}

pin_project_lite::pin_project! {
pin_project! {
#[must_use]
#[project = ParseRequestInnerProject]
struct ParseRequestInner<K, B> {
#[pin]
kind: K,
#[project = ParseEventKindProject]
struct ParseEventKind<K, B> {
#[pin]
body: B,
inner: K,
body: Option<B>
}
}

impl<K, B> Future for ParseRequestInner<K, B>
impl<K, B> Future for ParseEventKind<K, B>
where
K: Future<Output = Result<EventKind>>,
{
type Output = ParseRequestInner<K, B>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let s = self.project();
let res = ready!(s.inner.poll(cx));
let next = match res {
Ok(kind) => {
let body = s.body.take().expect("polled after ready");
ParseRequestInner::ParseBody {
inner: ParseBody { kind, inner: body },
}
}
Err(e) => ParseRequestInner::ParseEventKindFailed {
inner: futures::future::ready(Err(e)),
},
};
Poll::Ready(next)
}
}

type ParseEventKindFailed = Ready<Result<Event>>;

pin_project! {
#[must_use]
#[project = ParseBodyProject]
struct ParseBody<B> {
kind: EventKind,
#[pin]
inner: B,
}
}

impl<B> Future for ParseBody<B>
where
B: Future<Output = Result<Bytes>>,
{
type Output = Result<Event>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let s = self.project();
let kind = match ready!(s.kind.poll(cx)) {
Ok(k) => k,
Err(e) => return Poll::Ready(Err(e)),
};
let body = ready!(s.body.poll(cx));
let body = ready!(s.inner.poll(cx));
let res: Result<Event> = {
let body = body?;
let body = std::str::from_utf8(&body).map_err(Error::read_body_failed)?;
super::parse_body(kind, body)
super::parse_body(*s.kind, body)
};
Poll::Ready(res)
}
}

pin_project_lite::pin_project! {
pin_project! {
#[must_use]
#[project = ParseRequestInnerProject]
#[project_replace = ParseRequestInnerProjectReplace]
enum ParseRequestInner<K, B> {
ParseEventKind {
#[pin]
inner: ParseEventKind<K, B>,
},
ParseEventKindFailed {
#[pin]
inner: ParseEventKindFailed,
},
ParseBody {
#[pin]
inner: ParseBody<B>,
}
}
}

impl<K, B> ParseRequestInner<K, B>
where
K: Future<Output = Result<EventKind>>,
B: Future<Output = Result<Bytes>>,
{
fn new(kind: K, body: B) -> Self {
Self::ParseEventKind {
inner: ParseEventKind {
inner: kind,
body: Some(body),
},
}
}
}

impl<K, B> Future for ParseRequestInner<K, B>
where
K: Future<Output = Result<EventKind>>,
B: Future<Output = Result<Bytes>>,
{
type Output = Result<Event>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use ParseRequestInnerProject::{ParseBody, ParseEventKind, ParseEventKindFailed};
let s = self.as_mut().project();
let next = match s {
ParseEventKind { inner } => ready!(inner.poll(cx)),
ParseEventKindFailed { inner } => return inner.poll(cx),
ParseBody { inner } => return inner.poll(cx),
};
self.project_replace(next);
cx.waker().wake_by_ref();
Poll::Pending
}
}

pin_project! {
/// <code>impl [Future]<Output = Result<[Event], [Error]>></code>
///
/// [Future]: std::future::Future
Expand All @@ -98,17 +185,16 @@ pin_project_lite::pin_project! {
impl<B> ParseRequest<B>
where
B: Body,
B::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
{
fn new(kind: Result<EventKind>, body: B) -> Self {
use http_body_util::BodyExt;

let kind = futures::future::ready(kind);
let inner = ParseRequestInner {
kind,
body: CollectBody {
collect: body.collect(),
},
let body = CollectBody {
collect: body.collect(),
};
let inner = ParseRequestInner::new(kind, body);
Self { inner }
}
}
Expand Down Expand Up @@ -187,7 +273,7 @@ mod tests {
use http_body_util::BodyExt;

use super::{CollectBody, ParseRequest};
use crate::{Event, EventKind};
use crate::{Error, ErrorKind, Event, EventKind};

#[test]
fn collect_body() {
Expand All @@ -208,4 +294,13 @@ mod tests {
let event = block_on(fut).unwrap();
assert!(matches!(event, Event::Ping(_)));
}

#[test]
fn parse_event_failed() {
let err: Error = ErrorKind::BotTokenMismatch.into();
let body = String::new();
let fut = ParseRequest::new(Err(err), body);
let err = block_on(fut).unwrap_err();
assert_eq!(err.kind(), ErrorKind::BotTokenMismatch);
}
}

0 comments on commit 5d042d5

Please sign in to comment.