Skip to content

Commit

Permalink
Polish juniper_actix to work
Browse files Browse the repository at this point in the history
  • Loading branch information
tyranron committed Sep 21, 2023
1 parent 0326e05 commit d46c59a
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 45 deletions.
25 changes: 3 additions & 22 deletions examples/actix_subscriptions/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use juniper::{
tests::fixtures::starwars::schema::{Database, Query},
EmptyMutation, FieldError, GraphQLObject, RootNode,
};
use juniper_actix::{
graphiql_handler, graphql_handler, playground_handler,
subscriptions::{graphql_transport_ws_handler, graphql_ws_handler},
};
use juniper_actix::{graphiql_handler, graphql_handler, playground_handler, subscriptions};
use juniper_graphql_ws::ConnectionConfig;

type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;
Expand All @@ -26,7 +23,7 @@ fn schema() -> Schema {
}

async fn playground() -> Result<HttpResponse, Error> {
playground_handler("/graphql", Some("/legacy-subscriptions")).await
playground_handler("/graphql", Some("/subscriptions")).await
}

async fn graphiql() -> Result<HttpResponse, Error> {
Expand Down Expand Up @@ -103,22 +100,7 @@ async fn subscriptions(
// playground has a hard-coded timeout set to 20 secs
let config = config.with_keep_alive_interval(Duration::from_secs(15));

graphql_transport_ws_handler(req, stream, schema, config).await
}

async fn legacy_subscriptions(
req: HttpRequest,
stream: web::Payload,
schema: web::Data<Schema>,
) -> Result<HttpResponse, Error> {
let context = Database::new();
let schema = schema.into_inner();
let config = ConnectionConfig::new(context);
// set the keep alive interval to 15 secs so that it doesn't timeout in playground
// playground has a hard-coded timeout set to 20 secs
let config = config.with_keep_alive_interval(Duration::from_secs(15));

graphql_ws_handler(req, stream, schema, config).await
subscriptions::ws_handler(req, stream, schema, config).await
}

#[actix_web::main]
Expand All @@ -140,7 +122,6 @@ async fn main() -> std::io::Result<()> {
)
.wrap(middleware::Compress::default())
.wrap(middleware::Logger::default())
.service(web::resource("/legacy-subscriptions").route(web::get().to(legacy_subscriptions)))
.service(web::resource("/subscriptions").route(web::get().to(subscriptions)))
.service(
web::resource("/graphql")
Expand Down
9 changes: 9 additions & 0 deletions juniper_actix/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ All user visible changes to `juniper_actix` crate will be documented in this fil
- Switched to 4.0 version of [`actix-web` crate] and its ecosystem. ([#1034])
- Switched to 0.16 version of [`juniper` crate].
- Switched to 0.4 version of [`juniper_graphql_ws` crate].
- Renamed `subscriptions::subscriptions_handler()` as `subscriptions::graphql_ws_handler()` for processing the [legacy `graphql-ws` GraphQL over WebSocket Protocol][graphql-ws]. ([#1191])

### Added

- `subscriptions::graphql_transport_ws_handler()` allowing to process the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][graphql-transport-ws]. ([#1191])
- `subscriptions::ws_handler()` with auto-selection between the [legacy `graphql-ws` GraphQL over WebSocket Protocol][graphql-ws] and the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][graphql-transport-ws], based on the `Sec-Websocket-Protocol` HTTP header value. ([#1191])

### Fixed

Expand All @@ -21,6 +27,7 @@ All user visible changes to `juniper_actix` crate will be documented in this fil
[#1034]: /../../pull/1034
[#1169]: /../../issues/1169
[#1187]: /../../pull/1187
[#1191]: /../../pull/1191



Expand All @@ -36,3 +43,5 @@ See [old CHANGELOG](/../../blob/juniper_actix-v0.4.0/juniper_actix/CHANGELOG.md)
[`juniper` crate]: https://docs.rs/juniper
[`juniper_graphql_ws` crate]: https://docs.rs/juniper_graphql_ws
[Semantic Versioning 2.0.0]: https://semver.org
[graphql-transport-ws]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
[graphql-ws]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
94 changes: 72 additions & 22 deletions juniper_actix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,8 @@ pub async fn playground_handler(
.body(html))
}

/// `juniper_actix` subscriptions handler implementation.
/// Cannot be merged to `juniper_actix` yet as GraphQL over WS[1]
/// is not fully supported in current implementation.
///
/// *Note: this implementation is in an alpha state.*
///
/// [1]: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
#[cfg(feature = "subscriptions")]
/// `juniper_actix` subscriptions handler implementation.
pub mod subscriptions {
use std::{fmt, sync::Arc};

Expand All @@ -192,17 +186,69 @@ pub mod subscriptions {
use juniper_graphql_transport_ws::{ArcSchema, Init};
use tokio::sync::Mutex;

/// Serves the `graphql-ws` protocol over a WebSocket connection.
/// Serves by auto-selecting between the
/// [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] and the
/// [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], based on the
/// `Sec-Websocket-Protocol` HTTP header value.
///
/// The `schema` argument is your [`juniper`] schema.
///
/// The `init` argument is used to provide the custom [`juniper::Context`] and additional
/// configuration for connections. This can be a
/// [`juniper_graphql_transport_ws::ConnectionConfig`] if the context and configuration are
/// already known, or it can be a closure that gets executed asynchronously whenever a client
/// sends the subscription initialization message. Using a closure allows to perform an
/// authentication based on the parameters provided by a client.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
/// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
pub async fn ws_handler<Query, Mutation, Subscription, CtxT, S, I>(
req: HttpRequest,
stream: web::Payload,
schema: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
init: I,
) -> Result<HttpResponse, actix_web::Error>
where
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Query::TypeInfo: Send + Sync,
Mutation: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Mutation::TypeInfo: Send + Sync,
Subscription: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
{
if req
.headers()
.get("sec-websocket-protocol")
.map(AsRef::as_ref)
== Some("graphql-ws".as_bytes())
{
graphql_ws_handler(req, stream, schema, init).await
} else {
graphql_transport_ws_handler(req, stream, schema, init).await
}
}

/// Serves the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old].
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a `juniper_graphql_ws::ConnectionConfig` if the context and
/// connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the context and
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the ConnectionInit message. Using a closure allows you to perform
/// authentication based on the parameters provided by the client.
/// when the client sends the `GQL_CONNECTION_INIT` message. Using a closure allows to perform
/// an authentication based on the parameters provided by a client.
///
/// > __WARNING__: This protocol has been deprecated in favor of the
/// [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], which is
/// provided by the [`graphql_transport_ws_handler()`] function.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
/// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md
pub async fn graphql_ws_handler<Query, Mutation, Subscription, CtxT, S, I>(
req: HttpRequest,
stream: web::Payload,
root_node: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
schema: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
init: I,
) -> Result<HttpResponse, actix_web::Error>
where
Expand All @@ -217,7 +263,7 @@ pub mod subscriptions {
I: Init<S, CtxT> + Send,
{
let (s_tx, s_rx) =
juniper_graphql_ws::Connection::new(ArcSchema(root_node), init).split::<Message>();
juniper_graphql_ws::Connection::new(ArcSchema(schema), init).split::<Message>();

let mut resp = ws::start(
Actor {
Expand All @@ -236,17 +282,19 @@ pub mod subscriptions {
Ok(resp)
}

/// Serves the `graphql-transport`-ws protocol over a WebSocket connection.
/// Serves the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new].
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a `juniper_graphql_ws::ConnectionConfig` if the context and
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the ConnectionInit message. Using a closure allows you to perform
/// authentication based on the parameters provided by the client.
/// connections. This can be a [`juniper_graphql_transport_ws::ConnectionConfig`] if the context
/// and configuration are already known, or it can be a closure that gets executed
/// asynchronously when the client sends the `ConnectionInit` message. Using a closure allows to
/// perform an authentication based on the parameters provided by a client.
///
/// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md
pub async fn graphql_transport_ws_handler<Query, Mutation, Subscription, CtxT, S, I>(
req: HttpRequest,
stream: web::Payload,
root_node: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
schema: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
init: I,
) -> Result<HttpResponse, actix_web::Error>
where
Expand All @@ -260,9 +308,8 @@ pub mod subscriptions {
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Send,
{
let (s_tx, s_rx) =
juniper_graphql_transport_ws::Connection::new(ArcSchema(root_node), init)
.split::<Message>();
let (s_tx, s_rx) = juniper_graphql_transport_ws::Connection::new(ArcSchema(schema), init)
.split::<Message>();

let mut resp = ws::start(
Actor {
Expand Down Expand Up @@ -304,6 +351,9 @@ pub mod subscriptions {
Ok(msg) => {
let tx = self.tx.clone();

// TODO: Somehow this implementation always closes as `1006: Abnormal closure`
// due to excessive polling of `tx` part.
// Needs to be reworked.
async move {
tx.lock()
.await
Expand Down
3 changes: 2 additions & 1 deletion juniper_graphql_transport_ws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct ExecutionParams<S: Schema> {
}

/// Possible inputs received from a client.
#[derive(Debug)]
pub enum Input<S> {
/// Deserialized [`ClientMessage`].
Message(ClientMessage<S>),
Expand Down Expand Up @@ -535,7 +536,7 @@ where
<Self as Sink<T>>::poll_ready(self, cx)
}

fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
fn poll_close(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
self.sink_state = ConnectionSinkState::Closed;
if let Some(waker) = self.stream_waker.take() {
// Wake up the stream so it can close too.
Expand Down

0 comments on commit d46c59a

Please sign in to comment.