Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscription support #433

Merged
merged 343 commits into from
Mar 19, 2020
Merged

Add subscription support #433

merged 343 commits into from
Mar 19, 2020

Conversation

nWacky
Copy link
Contributor

@nWacky nWacky commented Sep 30, 2019

Before merge: Resolve TODO#433 (import crates in subscriptions example from GitHub instead of local)

Related issue: #54

How to implement custom GraphQL subscription

Note: by SubscriptionCoordinator and SubscriptionConnection I mean types implementing these traits

Current execution logic is inspired with this and this idea. In a nutshell:

When a request is received, it gets processed by a SubscriptionCoordinator. This struct decides if incoming request should be resolved into a new stream or if one of already existing streams (that coordinator knows about) is good enough. juniper_subscriptions::Coordinator resolves each request into a new stream.

SubscriptionCoordinator should call juniper::http::resolve_into_stream each time it needs to resolve a GraphQLRequest to Value<Stream<Item = Value<S>>. If subscripiton-related macros are used, Value::Object<Stream<Item = Value<_>> is returned. It keeps each field and corresponding Stream<Item = Value<_>>. Objects can be iterated over and needed GraphQL objects can be assembled from returned Streams. When iterating over an object returned from subscription macros, each object field should be mapped to Value::Scalar<StreamT>, where StreamT is the type user has specified in field resolvers.

#[juniper::graphql_subscription] macro implements GraphQLType + GraphQLSubscriptionType. GraphQLType is used to store which fields this subscription has, which types they have, etc. Its resolvers ( resolve, resolve_field, resolve_into_type) are not used. GraphQLSubscriptionType's resolve_into_stream (called by Executor), resolve_field_into_stream, resolve_type_into_stream (called by resolve_into_stream) are used instead.

Once a Value<Stream> is returned from resolve_into_stream, it is returned to the user (user most probably called juniper::http::resolve_into_stream to get to resolvers and resolve a request into stream).

Once a stream is returned, it could be put inside a SubscriptionConnection (if called by SubscriptionCoordinator). SubscriptionConnection implements Stream<Item = GraphQLResponse>. GraphQLResponse can be deserialized and sent to end user => SubscriptionConnection can be polled by server implementation. SubscriptionConnection could also have functionality for stopping, reconnecting, etc (will be implemented in the follow-up PRs).

SubscriptionCoordinator and SubscriptionConnection are types that manage and control already obtained Streams. GraphQLSubscriptionType manages resolvers on GraphQL objects.

Data flow in `warp_subscriptions` example

  • HTTP server is initialized with RootNode and SubscriptionCoordinator
  1. server gets a request
  2. request is deserialized into GraphQlRequest
  3. graphql_subscriptions_async(ws_connection, coordinator) - example's helper for subscription execution
    1. coordinator.subscribe(request, context) - resolve GraphQlRequest into Stream and return SubscriptionConnection containing that stream
      • juniper::http::resolve_into_stream(...) - get operation name and variables from request
      • crate::resolve_into_stream(...) - parse document, make sure no errors are found
      • executor::resolve_validated_subscription(...) - create executor and resolve subscription query into Value<Stream<Item = Value<S>>>
      • executor.resolve_into_stream(...) - check if self.subscribe has errors and push them to executor
      • self.subscribe(...) - return value.resolve_into_stream(...).
      • value.resolve_into_stream(...) - user logic how to resolve this value into stream
        • by default, each value V returned from stream is treated as a if a query was executed asynchronously and
          V was returned. Ongoing execution logic creates new Value::Object with fields requested by user
          (for example, if user {id name} was requested, each User { id name email } returned from stream will be returned as User { id name } GraphQL object)
    2. Returned SubscriptionConnection can be used as a Stream over GraphQLResponses, which can be deserialized and sent to the end user.
      juniper_subscriptions::Connection waits until all object's fields are resolved and then returns the whole object.
    3. Every time a GraphQlResponse is yielded from a stream, send it to end user

Starting playground server

  • Go to examples/warp_subscriptions and cargo run. New warp server should start on localhost:8080
  • Subscription queries should be able to be executed at this point:
subscription {
    users {
        id
        kind
        name
        friends {
            id
            kind
        }
    }
}
  • New users are returned every 5 seconds. Fragments and multiple subscriptions are also supported.
Note: playground doesn't support anonymous subscriptions referencing fragments (graphql/graphql-playground/issues/1067), so the following request can be sent to `localhost:8000/subscriptions` to test it

{"id":"1","type":"start","payload":{"variables":{},"extensions":{},"operationName":null,"query":"subscription { ...myFragment } fragment myFragment on Subscription { users { id name friends { id name } }"}}

@theduke
Copy link
Member

theduke commented Oct 2, 2019

Hey, awesome that you are working on this!

I'll browse the code.

@theduke theduke self-assigned this Oct 2, 2019
@tyranron
Copy link
Member

tyranron commented Oct 2, 2019

@theduke this is still quite a WIP and we will come through multiple iterations until it will be ready for a serious review.

However, your comments and considerations would be vital for us at any stage.

@nWacky
Copy link
Contributor Author

nWacky commented Oct 2, 2019

@theduke,

Great, thanks for code review. It is quite in some state at the moment: it's possible to create a subscription, map it to some user code with SubscriptionHandlerAsync::resolve_field_async (for async subscriptions only) and return an iterator/stream.

Issues I know about that I will try to resolve today/tomorrow:

  • Subscription handler returning all values from the first stream, then all values from the second and so on

OperationType::Subscription => root_node
.schema
.subscription_type()
.expect("No mutation type found"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/mutation/subscription/

@nWacky
Copy link
Contributor Author

nWacky commented Oct 4, 2019

This branch is still WIP, but slowly getting there.
Here's what's new:

  • With using Value<ValuesStream<S>> for synchronous subscriptions and Object<ValuesIterator<S>> for asynchronous subscriptions (I will eventually figure out which type it's better to return later) user is now able to get field names and streams/iterators
  • Updated futures version to 0.3.0-alpha.19

@LegNeato
Copy link
Member

Ok, I landed @tyranron's PR and rebased the async branch onto master.

@@ -10,7 +10,7 @@ edition = "2018"
log = "0.4.8"
env_logger = "0.6.2"
warp = "0.1.19"
futures-preview = { version = "0.3.0-alpha.19", features = ["nightly", "async-await", "compat"] }
futures-preview = { version = "0.3.0-alpha.19", features = ["async-await", "compat"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nWacky async-await is unnecessary with 0.3.0-alpha.19

@nWacky
Copy link
Contributor Author

nWacky commented Oct 11, 2019

Still WIP.

This week I added functionality for filtering out response fields (for synchronous subscriptions only).

For now it works this way: after subscription fields are resolved and some iterator over user's type is returned, each returned object is treated as a separate query. (for example, got Iterator<Human> where Human is user's struct. After getting each object executor.replaced_context(ctx).resolve_with_ctx(&(), &r) is called, which creates Value::Object with fields which were specified in the request).

The issue I had was with Executor not living long enough because it stores a lot of references to variables, which are defined in execute_validated_subcription (and I am mapping Executor to returned iterator to resolve each variable when it is returned):

...
request is deserialized --->
request.subscribe(root_node, context) --->
    ...
    executor::execute_validated_subcription(...) 
     
        // Parse definitions and set default variables
        let errors = RwLock::new(Vec::new()); // create a variable to store errors in
        // Create `Executor` (&errors and other references are passed there)
        executor.resolve_into_iterator(...) // next function for execution process
        Ok((value, errors)) //returns execution result and errors
      --->
    ...
    value.resolve_into_stream(..., &executor) ---> 
     
        let iter: Iterator<Obj> = (|| {
            // User creates `Iterator<Obj>`
        })();
        
        // At this point it's necessary to add user's object resolver
        iter.map(|x| executor.resolve_with_ctx(x)) 
        
     --->
HTTP server got iterator / stream --->
rather return it as a bunch of objects or one by one;

So that Executor lives no longer than execute_validated_subcription().

When we discussed it we decided to create new OwnedExecutor, which is going to store all variables. However, Executor's fragments is a reference to a hash map pointing to a reference to local Fragment<S> vector.

So (at least for now) I just moved all these variables to a function where subscribe is called

let mut executor_variables = juniper::OwnedExecutor::new(); // this is where Executor variables are stored (and it can return Executor)
let mut fragments = vec![]; // this is here to escape `OwnedExecutor` self referencing 
let mut executor = juniper::OptionalExecutor::new(); // this is `Option<Executor>` to keep because executor is still declared in `execute_validated_subcription`
let (res, err) = request.subscribe(
    root_node,
    context,
    &mut executor_variables,
    &mut fragments,
    &mut executor
).0.unwrap();

@theduke
Copy link
Member

theduke commented Oct 11, 2019

Sorry for the delay, I'll finally be able to carve out some time this weekend.

@nWacky
Copy link
Contributor Author

nWacky commented Oct 11, 2019

@theduke

It's still in progress, so you didn't miss anything important 🙃
But any comments or suggestions are welcome

@Type1J
Copy link

Type1J commented Oct 15, 2019

We're pretty happy about this PR. Keep up the good work!

@nWacky
Copy link
Contributor Author

nWacky commented Oct 16, 2019

Still WIP, but this push can be considered somewhat like early preview.

  • Added #[juniper::subscription] macro
  • Resolved some todos
  • Updated schema in juniper_benchmarks (tests still do not compile)

What's left:

  • Fragment spread and inline fragment support
  • Update tests

@nWacky
Copy link
Contributor Author

nWacky commented Mar 15, 2020

Cleaned up code, updated docs, formatted and tested it. Updated description.

Everything should be ready for a pre-review now

A little side note: benchmarks and server integration crates do not support subscriptions for now and there is a PR for making clippy lints pass. I believe it could be reviewed and merged after this PR is merged (not to make the diff too big) and then further subscription functionality can be added

Copy link
Member

@LegNeato LegNeato left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WOW, amazing work! Thank you sooooo much 🎉 🍺 🚀 .

I did a review sweep. It looks like all the commits are from you, even the ones already on master. Is that just a weird display thing in github or was there a git mistake? We'll squash this at merge time so maybe it doesn't matter?

I'll wait on doing committing anything on master until this lands to prevent more rebase pain...sorry for that 😥!

juniper/src/http/mod.rs Outdated Show resolved Hide resolved
juniper/src/http/playground.rs Outdated Show resolved Hide resolved
juniper/src/macros/tests/impl_subscription.rs Outdated Show resolved Hide resolved
juniper/src/parser/document.rs Outdated Show resolved Hide resolved
juniper/src/schema/model.rs Outdated Show resolved Hide resolved
juniper_warp/src/lib.rs Outdated Show resolved Hide resolved
juniper_warp/src/lib.rs Outdated Show resolved Hide resolved
juniper_warp/src/lib.rs Outdated Show resolved Hide resolved
juniper_warp/src/lib.rs Show resolved Hide resolved
Cargo.toml Outdated Show resolved Hide resolved
@LegNeato LegNeato changed the title WIP: Add subscription support to async-await branch Add subscription support to async-await branch Mar 16, 2020
@LegNeato LegNeato changed the title Add subscription support to async-await branch Add subscription support Mar 16, 2020
Co-Authored-By: Christian Legnitto <LegNeato@users.noreply.github.com>
@nWacky nWacky changed the base branch from async-await to master March 16, 2020 18:28
@LegNeato
Copy link
Member

To fix the release stuff you probably need to add juniper_subscriptions to https://github.com/instrumentisto/juniper/blob/67ba411e4436d1bd327bab5b7765497244511477/juniper/release.toml#L7

@LegNeato
Copy link
Member

LegNeato commented Mar 19, 2020

I'm going to land this and we can iterate on master, including TODO#433.

🍻 😍 🍾 🎉 👍 💯 🥇 ❤️
THANK YOU SO MUCH FOR THIS PR @nWacky and @tyranron
🍻 😍 🍾 🎉 👍 💯 🥇 ❤️

It was a ton of work and will be moving the ecosystem forward.

@LegNeato LegNeato merged commit eb941e5 into graphql-rust:master Mar 19, 2020
@geropl geropl mentioned this pull request Mar 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.