Skip to content

Commit

Permalink
support subscriptions. (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmac authored Jul 25, 2024
1 parent a58ba20 commit ec8322a
Show file tree
Hide file tree
Showing 24 changed files with 566 additions and 63 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ GraphQL stitching composes a single schema from multiple underlying GraphQL reso
![Stitched graph](./docs/images/stitching.png)

**Supports:**
- All operation types: query, mutation, and subscription.
- Merged object and abstract types.
- Multiple and composite keys per merged type.
- Shared objects, fields, enums, and inputs across locations.
- Multiple and composite type keys.
- Combining local and remote schemas.
- File uploads via [multipart form spec](https://github.com/jaydenseric/graphql-multipart-request-spec).
- Tested with all minor versions of `graphql-ruby`.

**NOT Supported:**
- Computed fields (ie: federation-style `@requires`).
- Subscriptions, defer/stream.
- Defer/stream.

This Ruby implementation is a sibling to [GraphQL Tools](https://the-guild.dev/graphql/stitching) (JS) and [Bramble](https://movio.github.io/bramble/) (Go), and its capabilities fall somewhere in between them. GraphQL stitching is similar in concept to [Apollo Federation](https://www.apollographql.com/docs/federation/), though more generic. The opportunity here is for a Ruby application to stitch its local schemas together or onto remote sources without requiring an additional proxy service running in another language. If your goal is to build a purely high-throughput federated reverse proxy, consider not using Ruby.

Expand Down Expand Up @@ -445,6 +446,7 @@ The [Executor](./docs/executor.md) component builds atop the Ruby fiber-based im
- [Modeling foreign keys for stitching](./docs/mechanics.md##modeling-foreign-keys-for-stitching)
- [Deploying a stitched schema](./docs/mechanics.md#deploying-a-stitched-schema)
- [Schema composition merge patterns](./docs/composer.md#merge-patterns)
- [Subscriptions tutorial](./docs/subscriptions.md)
- [Field selection routing](./docs/mechanics.md#field-selection-routing)
- [Root selection routing](./docs/mechanics.md#root-selection-routing)
- [Stitched errors](./docs/mechanics.md#stitched-errors)
Expand Down
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ Major components include:
Additional topics:

- [Stitching mechanics](./mechanics.md) - more about building for stitching and how it operates.
- [Subscriptions](./subscriptions.md) - explore how to stitch realtime event subscriptions.
- [Federation entities](./federation_entities.md) - more about Apollo Federation compatibility.
206 changes: 206 additions & 0 deletions docs/subscriptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
## Stitching subscriptions

Stitching is an interesting prospect for subscriptions because socket-based interactions can be isolated to their own schema/server with very little implementation beyond resolving entity keys. Then, entity data can be stitched onto subscription payloads from other locations.

### Composing a subscriptions schema

For simplicity, subscription resolvers should exist together in a single schema (multiple schemas with subscriptions probably aren't worth the confusion). This subscriptions schema may provide basic entity types that will merge with other locations. For example, here's a bare-bones subscriptions schema:

```ruby
class SubscriptionSchema < GraphQL::Schema
class Post < GraphQL::Schema::Object
field :id, ID, null: false
end

class Comment < GraphQL::Schema::Object
field :id, ID, null: false
end

class CommentAddedToPost < GraphQL::Schema::Subscription
argument :post_id, ID, required: true
field :post, Post, null: false
field :comment, Comment, null: true

def subscribe(post_id:)
{ post: { id: post_id }, comment: nil }
end

def update(post_id:)
{ post: { id: post_id }, comment: object }
end
end

class SubscriptionType < GraphQL::Schema::Object
field :comment_added_to_post, subscription: CommentAddedToPost
end

use GraphQL::Subscriptions::ActionCableSubscriptions
subscription SubscriptionType
end
```

The above subscriptions schema can compose with other locations, such as the following that provides full entity types:

```ruby
class EntitiesSchema < GraphQL::Schema
class StitchingResolver < GraphQL::Schema::Directive
graphql_name "stitch"
locations FIELD_DEFINITION
argument :key, String, required: true
argument :arguments, String, required: false
repeatable true
end

class Comment < GraphQL::Schema::Object
field :id, ID, null: false
field :message, String, null: false
end

class Post < GraphQL::Schema::Object
field :id, ID, null: false
field :title, String, null: false
field :comments, [Comment, null: false], null: false
end

class QueryType < GraphQL::Schema::Object
field :posts, [Post, null: true] do
directive StitchingResolver, key: "id"
argument :ids, [ID], required: true
end

def posts(ids:)
Post.where(id: ids)
end

field :comments, [Comment, null: true] do
directive StitchingResolver, key: "id"
argument :ids, [ID], required: true
end

def comments(ids:)
Comment.where(id: ids)
end
end

query QueryType
end
```

These schemas can be composed as normal into a stitching client. The subscriptions schema must be locally-executable while other entity schema(s) may be served from anywhere:

```ruby
StitchedSchema = GraphQL::Stitching::Client.new(locations: {
subscriptions: {
schema: SubscriptionSchema, # << locally executable!
},
entities: {
schema: GraphQL::Schema.from_definition(entities_schema_sdl),
executable: GraphQL::Stitching::HttpExecutable.new("http://localhost:3001"),
},
})
```

### Serving stitched subscriptions

Once you've stitched a schema with subscriptions, it gets called as part of three workflows:

1. Controller - handles normal query and mutation requests recieved via HTTP.
2. Channel - handles subscription-create requests recieved through a socket connection.
3. Plugin – handles subscription-update events pushed to the socket connection.

#### Controller

A controller will recieve basic query and mutation requests sent over HTTP, including introspection requests. Fulfill these using the stitched schema client.

```ruby
class GraphqlController < ApplicationController
skip_before_action :verify_authenticity_token
layout false

def execute
result = StitchedSchema.execute(
params[:query],
context: {},
variables: params[:variables],
operation_name: params[:operationName],
)

render json: result
end
end
```

#### Channel

A channel handles subscription requests initiated via websocket connection. This mostly follows the [GraphQL Ruby documentation example](https://graphql-ruby.org/api-doc/2.3.9/GraphQL/Subscriptions/ActionCableSubscriptions), except that `execute` uses the stitched schema client while `unsubscribed` uses the subscriptions subschema directly:

```ruby
class GraphqlChannel < ApplicationCable::Channel
def subscribed
@subscription_ids = []
end

def execute(params)
result = StitchedSchema.execute(
params["query"],
context: { channel: self },
variables: params["variables"],
operation_name: params["operationName"]
)

payload = {
result: result.to_h,
more: result.subscription?,
}

if result.context[:subscription_id]
@subscription_ids << result.context[:subscription_id]
end

transmit(payload)
end

def unsubscribed
@subscription_ids.each { |sid|
# Go directly through the subscriptions subschema
# when managing/triggering subscriptions:
SubscriptionSchema.subscriptions.delete_subscription(sid)
}
end
end
```

What happens behind the scenes here is that stitching filters the `execute` request down to just subscription selections, and passes those through to the subscriptions subschema where they register an event binding. The subscriber response gets stitched while passing back up through the stitching client.

#### Plugin

Lastly, update events trigger with the filtered subscriptions selection, so must get stitched before transmitting. The stitching client adds an update handler into request context for this purpose. A small patch to the subscriptions plugin class can call this handler on update event payloads before transmitting them:

```ruby
class StitchedActionCableSubscriptions < GraphQL::Subscriptions::ActionCableSubscriptions
def execute_update(subscription_id, event, object)
super(subscription_id, event, object).tap do |result|
result.context[:stitch_subscription_update]&.call(result)
end
end
end

class SubscriptionSchema
# switch the plugin on the subscriptions schema to use the patched class...
use StitchedActionCableSubscriptions
end
```

### Triggering subscriptions

Subscription update events are triggered as normal directly through the subscriptions subschema:

```ruby
class Comment < ApplicationRecord
after_create :trigger_subscriptions

def trigger_subscriptions
SubscriptionsSchema.subscriptions.trigger(:comment_added_to_post, { post_id: post_id }, self)
end
end
```
12 changes: 12 additions & 0 deletions lib/graphql/stitching.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@

module GraphQL
module Stitching
# scope name of query operations.
QUERY_OP = "query"

# scope name of mutation operations.
MUTATION_OP = "mutation"

# scope name of subscription operations.
SUBSCRIPTION_OP = "subscription"

# introspection typename field.
TYPENAME = "__typename"

# @api private
EMPTY_OBJECT = {}.freeze

Expand Down
14 changes: 7 additions & 7 deletions lib/graphql/stitching/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,28 @@ def initialize(locations: nil, supergraph: nil, composer: nil)
@on_error = nil
end

def execute(query:, variables: nil, operation_name: nil, context: nil, validate: true)
def execute(raw_query = nil, query: nil, variables: nil, operation_name: nil, context: nil, validate: true)
request = Request.new(
@supergraph,
query,
raw_query || query, # << for parity with GraphQL Ruby Schema.execute
operation_name: operation_name,
variables: variables,
context: context,
)

if validate
validation_errors = request.validate
return error_result(validation_errors) if validation_errors.any?
return error_result(request, validation_errors) if validation_errors.any?
end

request.prepare!
load_plan(request)
request.execute
rescue GraphQL::ParseError, GraphQL::ExecutionError => e
error_result([e])
error_result(request, [e])
rescue StandardError => e
custom_message = @on_error.call(request, e) if @on_error
error_result([{ "message" => custom_message || "An unexpected error occured." }])
error_result(request, [{ "message" => custom_message || "An unexpected error occured." }])
end

def on_cache_read(&block)
Expand Down Expand Up @@ -93,12 +93,12 @@ def load_plan(request)
plan
end

def error_result(errors)
def error_result(request, errors)
public_errors = errors.map! do |e|
e.is_a?(Hash) ? e : e.to_h
end

{ "errors" => public_errors }
GraphQL::Query::Result.new(query: request, values: { "errors" => public_errors })
end
end
end
Expand Down
12 changes: 10 additions & 2 deletions lib/graphql/stitching/composer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class T < GraphQL::Schema::Object
# @return [String] name of the Mutation type in the composed schema.
attr_reader :mutation_name

# @return [String] name of the Subscription type in the composed schema.
attr_reader :subscription_name

# @api private
attr_reader :subgraph_types_by_name_and_location

Expand All @@ -49,6 +52,7 @@ class T < GraphQL::Schema::Object
def initialize(
query_name: "Query",
mutation_name: "Mutation",
subscription_name: "Subscription",
description_merger: nil,
deprecation_merger: nil,
default_value_merger: nil,
Expand All @@ -57,6 +61,7 @@ def initialize(
)
@query_name = query_name
@mutation_name = mutation_name
@subscription_name = subscription_name
@description_merger = description_merger || BASIC_VALUE_MERGER
@deprecation_merger = deprecation_merger || BASIC_VALUE_MERGER
@default_value_merger = default_value_merger || BASIC_VALUE_MERGER
Expand Down Expand Up @@ -97,7 +102,6 @@ def perform(locations_input)
# "Typename" => "location" => subgraph_type
@subgraph_types_by_name_and_location = schemas.each_with_object({}) do |(location, schema), memo|
raise CompositionError, "Location keys must be strings" unless location.is_a?(String)
raise CompositionError, "The subscription operation is not supported." if schema.subscription

introspection_types = schema.introspection_system.types.keys
schema.types.each do |type_name, subgraph_type|
Expand All @@ -107,10 +111,13 @@ def perform(locations_input)
raise CompositionError, "Query name \"#{@query_name}\" is used by non-query type in #{location} schema."
elsif type_name == @mutation_name && subgraph_type != schema.mutation
raise CompositionError, "Mutation name \"#{@mutation_name}\" is used by non-mutation type in #{location} schema."
elsif type_name == @subscription_name && subgraph_type != schema.subscription
raise CompositionError, "Subscription name \"#{@subscription_name}\" is used by non-subscription type in #{location} schema."
end

type_name = @query_name if subgraph_type == schema.query
type_name = @mutation_name if subgraph_type == schema.mutation
type_name = @subscription_name if subgraph_type == schema.subscription
@mapped_type_names[subgraph_type.graphql_name] = type_name if subgraph_type.graphql_name != type_name

memo[type_name] ||= {}
Expand Down Expand Up @@ -154,6 +161,7 @@ def perform(locations_input)
orphan_types(schema_types.values.select { |t| t.respond_to?(:kind) && t.kind.object? })
query schema_types[builder.query_name]
mutation schema_types[builder.mutation_name]
subscription schema_types[builder.subscription_name]
directives builder.schema_directives.values

own_orphan_types.clear
Expand Down Expand Up @@ -593,7 +601,7 @@ def extract_resolvers(type_name, types_by_location)
# @!scope class
# @!visibility private
def select_root_field_locations(schema)
[schema.query, schema.mutation].tap(&:compact!).each do |root_type|
[schema.query, schema.mutation, schema.subscription].tap(&:compact!).each do |root_type|
root_type.fields.each do |root_field_name, root_field|
root_field_locations = @field_map[root_type.graphql_name][root_field_name]
next unless root_field_locations.length > 1
Expand Down
2 changes: 1 addition & 1 deletion lib/graphql/stitching/composer/resolver_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def extract_federation_entities(schema, location)
memo[field_path] << new(
key: key.to_definition,
type_name: entity_type.graphql_name,
arguments: "representations: { #{key_fields.join(", ")}, __typename: $.__typename }",
arguments: "representations: { #{key_fields.join(", ")}, #{TYPENAME}: $.#{TYPENAME} }",
)
end
end
Expand Down
Loading

0 comments on commit ec8322a

Please sign in to comment.