Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for GraphQL subscriptions in mergeSchema #420

Closed
rohitghatol opened this issue Oct 9, 2017 · 24 comments
Closed

Support for GraphQL subscriptions in mergeSchema #420

rohitghatol opened this issue Oct 9, 2017 · 24 comments

Comments

@rohitghatol
Copy link

Need Documentation of the use case of Merging Subscriptions from multiple GraphQL End Points.

If this is not available now, then we can discuss few scenarios in this thread and then open a specific Issue for that.

@stubailo stubailo changed the title Document - How to stitching multiple graphql endpoints and support subscriptions too? Support for GraphQL subscriptions Oct 9, 2017
@stubailo
Copy link
Contributor

stubailo commented Oct 9, 2017

Yeah, that's currently not supported, but let's start talking about how to make it happen!

@freiksenet
Copy link
Contributor

Steps to make it happen:

  1. Add support for subscription type just like we've added support for query/mutation in schema generation. This should be easy.
  2. The above should make it work with local schemas out of the box, but we need to test that.
  3. Add support for remote schema subscription proxying. I'm not sure how this should work, but I feel we should be as implementation independent as possible, but I'm not sure that this will actually work. Maybe we can passthrough subscription somehow, I don't know. Alternatively, we can allow user to specify how to proxy subscriptions.

@tim-soft
Copy link

Getting access to the parent schema's pub sub engine seems like the tricky part. I wonder what performance implications/overhead would come from having some kind of proxying passthrough. Option to connect directly to the pub sub?

@freiksenet
Copy link
Contributor

@tim-soft I'm more and more of the opinion that while we can provide all the schema level subscription parts, we will have to just make users write custom code to handle proxying / pass through.

@freiksenet freiksenet changed the title Support for GraphQL subscriptions Support for GraphQL subscriptions in mergeSchema Oct 16, 2017
@shipiak
Copy link
Contributor

shipiak commented Nov 1, 2017

Hi guys,

would it be at least possible to add support for subscriptions defined in local schema?

When I use:

mergeSchemas({
  schemas: [localSchema, remoteSchema]
});

root type Subscription which is defined in localSchema is missing. :(
I believe this should work correctly...

Many thanks for you response

@jtmthf
Copy link

jtmthf commented Dec 4, 2017

I've been going over this issue and it doesn't seem in most cases that the local instance would need access to the remote pubsub engine. If the local instance actually needed access to the remote pubsub, users could handle that in custom code using either the redis or mqtt engines.

What I believe could work is an implementation using the existing makeRemoteExecutableSchema. As makeRemoteExecutableSchema already takes a link that link could be augmented for subscription support similarly to how its done on the client side. subscriptions-transport-ws could be used if ws is passed as its wsImpl parameter allowing it to work on node.

Now I'm not familiar with the internals of schema merging, but how I imagine the rest of the implementation would work is as follows.

  • On creation of the local link instance, a websocket connection would be established with the remote server.
  • A websocket server is then created in user code than uses the created schema. It can optionally be passed a pubsub instance if there are local subscriptions, or none of the subscriptions only exist remotely.
  • On a subscription being received, the root field of the subscription is determined. The subscription query minus the fields not defined on that instance are then passed along. I assume this can be done using existing stitching internals. For remote instances, the query is passed along using the local instance's websocket client.
  • As pubub events happen on the backend, the local instance receives the result of their query over its websocket client. The local instance then resolves any remaining fields that are either defined locally or on another remote schema.
  • The final result is then sent back to the client. This part is a little tricky as the local instance will need to determine which connection to send the result back on. I'm not familiar enough with the internals of the implementation to describe how this would work.

Below is a very general example of how the API would work using lots of copy-paste from the docs and pretending top-level await is a thing.

import express from 'express';
import {
  graphqlExpress,
  graphiqlExpress,
} from 'apollo-server-express';
import bodyParser from 'body-parser';
import cors from 'cors';
import { execute } from 'graphql';
import { createServer } from 'http';
import { SubscriptionServer } from 'subscriptions-transport-ws';
import { HttpLink } from 'apollo-link-http';
import fetch from 'node-fetch';
import { split } from 'apollo-link';
import { WebSocketLink } from 'apollo-link-ws';
import { getMainDefinition } from 'apollo-utilities';

// Create an http link:
const httpLink = new HttpLink({
  uri: 'http://localhost:3000/graphql'
});

// Create a WebSocket link:
const wsLink = new WebSocketLink({
  uri: `ws://localhost:5000/`,
  options: {
    reconnect: true
  }
});

// using the ability to split links, you can send data to each link
// depending on what kind of operation is being sent
const link = split(
  // split based on operation type
  ({ query }) => {
    const { kind, operation } = getMainDefinition(query);
    return kind === 'OperationDefinition' && operation === 'subscription';
  },
  wsLink,
  httpLink,
);

const schema = await introspectSchema(link);

const executableSchema = makeRemoteExecutableSchema({
  schema,
  link,
});

const PORT = 3000;
const server = express();

server.use('*', cors({ origin: `http://localhost:${PORT}` }));

server.use('/graphql', bodyParser.json(), graphqlExpress({
  schema
}));

server.use('/graphiql', graphiqlExpress({
  endpointURL: '/graphql',
  subscriptionsEndpoint: `ws://localhost:${PORT}/subscriptions`
}));

// Wrap the Express server
const ws = createServer(server);
ws.listen(PORT, () => {
  console.log(`Apollo Server is now running on http://localhost:${PORT}`);
  // Set up the WebSocket for handling GraphQL subscriptions
  new SubscriptionServer({
    execute,
    schema
  }, {
    server: ws,
    path: '/subscriptions',
  });
});

@dyst5422
Copy link

dyst5422 commented Dec 6, 2017

How does the intermediate service know what client to push the socket message to? Are you suggesting that we set up a new websocket connection from the intermediate service to the originating service for every client connection? That seems like it would put a fair amount of extra pressure on the intermediate service.

@terion-name
Copy link

we set up a new websocket connection from the intermediate service to the originating service for every client connection?

why for every client? there should be enough a single socket connection to which operating service should push all the events, and intermediate should handle subscriptions and pushing to clients

@mfix22
Copy link
Contributor

mfix22 commented Jan 11, 2018

@stubailo just merged a subscriptions schema, with another local schema, and a remote schema and everything worked as intended.

@godspeedelbow
Copy link

@mfix22 So, it should work? Can you explain how you did that?

When I mock subscriptions with a local schema (via makeExecutableSchema), it works as expected. logged:

{ data: { bar: "yo" } }

However, when I wrap the local schema with mergeSchemas, the resolver is no longer called:

{ data: { bar: null } }

Code

import { makeExecutableSchema, mergeSchemas } from 'graphql-tools'
import gql from 'graphql-tag';
import {ApolloClient} from 'apollo-client';
import {SchemaLink} from 'apollo-link-schema';
import {InMemoryCache} from 'apollo-cache-inmemory';

const typeDefs = `
schema {
  query: Query
  subscription: Subscription
}

type Query {
  foo: String
}

type Subscription {
  bar: String,
}
`;

const localSchema = makeExecutableSchema({
  typeDefs,
  resolvers: {
    Subscription: {
      bar: () => 'yo',
    },
  },
});

// this works ...
// const schema = localSchema;

// ... but this doesn't work
const schema = mergeSchemas({
  schemas: [localSchema],
});

const client = new ApolloClient({
  cache: new InMemoryCache(),
  link: new SchemaLink({ schema }),
});

const query = gql`
  subscription {
    bar
  }
`;

client
  .subscribe({ query })
  .subscribe(console.log)

@godspeedelbow
Copy link

godspeedelbow commented Feb 13, 2018

Found a solution, based on suggestion by @jtmthf

  • in my case there's no need to support subscriptions on the server, so I can a link split
  • for any operation of type subscription I use subscriptionSchema, for all the other ones I use mergedSchema

Code

    const mergedSchemaLink = new SchemaLink({ schema: mergedSchema });
    const subscriptionLink = new SchemaLink({ schema: subscriptionSchema });

    // using the ability to split links, you can send data to each link
    // depending on what kind of operation is being sent
    const link = split(
      // split based on operation type
      ({ query }) => {
        const { kind, operation } = getMainDefinition(query);
        return kind === 'OperationDefinition' && operation === 'subscription';
      },
      subscriptionLink,
      mergedSchemaLink,
    );

    const apolloClient = new ApolloClient({
      cache: new InMemoryCache(),
      link,
    });

I realise this is actually a bit off topic, but maybe this work around helps others dealing with same issue.

@6be709c0
Copy link

How do you do If you need to subscribe from the server ? @godspeedelbow

@godspeedelbow
Copy link

@mlescaudron can you explain your use case a bit more, I am slightly confused to what you are asking me

@6be709c0
Copy link

I would like to have another server to subscribe to graphql. But I can't figure out how is it possible

@godspeedelbow
Copy link

Well, that server would effectively be/have a GraphQL client connecting to the GraphQL server. There's no limitations really whether the client is in the browser or a node process

@dyst5422
Copy link

Still doesn't cover the case of subscription forwarding.

Client1 subscribes to server1 which delegates the subscription to server2 by means of schema merging. Server2 pushes an update, which server1 gets...but then server A needs to know which client to push that to.

Client1 -> Server1 -> Server2 -> Server1 -> Client1

Seems like attaching some meta to the request with a client ID that gets returned on the push is the simplest solution. Should be easy as the graphql spec has extensions built in.

For the more complicated case of

Client -> Server1 -> Server2 -> ... -> ServerN-1 -> ServerN -> ServerN-1 -> ... Server2 -> Server1 -> Client

It seems a stack of identifiers is probably the more flexible option, with the subscribing server pushing an identifier onto the subscription request stack and popping from the update push stack.

@coco98
Copy link

coco98 commented Aug 11, 2018

@godspeedelbow AFAIK, schema-link doesn't support subscriptions: apollographql/apollo-link#374

So I'm not sure how this would work in the snippet you posted:

const subscriptionLink = new SchemaLink({ schema: subscriptionSchema });

@ericlewis
Copy link
Contributor

I have done investigation in to this, and you can indeed use mergeSchema with subscriptions. But there is a problem.

Resolvers do not work properly, graphql-tools is doing something strange here:
https://github.com/apollographql/graphql-tools/blob/1332e3cda0107d70d17334dae6ef636c68bdfd70/src/stitching/delegateToSchema.ts#L130

the underlying javascript looks like this:
_a[subscriptionKey] = __assign({}, transformedResult),

changing it to:
_a[subscriptionKey] = transformedResult,

allows you to properly receive the data you are expecting, transformed and everything.

So, modifying graphql-tools to remove the object spreading seems to fix the issue. I'm not sure if that is a valid solution however.

@wawhal
Copy link

wawhal commented Aug 17, 2018

@ericlewis Are you sure it works with remote schemas?
Can you lead me to an example or any docs if possible?

Asking this because whenever I serve a stitched child of two schemas that have subscriptions, I get this kind of an error on trying to make a subscription

{
  "errors": [
    {
      "message": "Expected Iterable, but did not find one for field Subscription.user.",
      "locations": [
        {
          "line": 2,
          "column": 3
        }
      ],
      "path": [
        "user"
      ],
      "extensions": {
        "code": "INTERNAL_SERVER_ERROR"
      }
    }
  ],
  "data": null
}

All I can make out of this error is that the type expected was an array, but it did not get an array from the parent API. But if I directly try to run a subscription over my parent API, it works totally fine.

Here is my code:

import {
  makeRemoteExecutableSchema,
  introspectSchema,
  mergeSchemas,
} from 'graphql-tools';
import { HttpLink } from 'apollo-link-http';
import { WebSocketLink } from 'apollo-link-ws';
import { SubscriptionClient } from 'subscriptions-transport-ws';
import { ApolloServer } from 'apollo-server';
import fetch from 'node-fetch';
import ws from 'ws';
import { split } from 'apollo-link';
import { getMainDefinition } from 'apollo-utilities';

const graphqlEndpoint = 'https://bazookaand.herokuapp.com/v1alpha1/graphql';

const makeWsLink = function (uri) {
  return new WebSocketLink(new SubscriptionClient(
    uri,
    { reconnect: true },
    ws
  ));
};

// create executable schemas from remote GraphQL APIs
const createRemoteExecutableSchemas = async () => {
  const httpLink = new HttpLink({
    uri: graphqlEndpoint,
    fetch
  });
  const wsLink = makeWsLink(graphqlEndpoint);
  const link = split(
    // split based on operation type
    ({ query }) => {
      const { kind, operation } = getMainDefinition(query);
      return kind === 'OperationDefinition' && operation === 'subscription';
    },
    wsLink,
    httpLink,
  );
  const remoteSchema = await introspectSchema(httpLink);
  const remoteExecutableSchema = makeRemoteExecutableSchema({
    schema: remoteSchema,
    link
  });
  return remoteExecutableSchema;
};

const createNewSchema = async () => {
  const schema = await createRemoteExecutableSchemas();
  return mergeSchemas({
    schemas: [schema]
  });
};

const runServer = async () => {
  // Get newly merged schema
  const schema = await createNewSchema();
  // start server with the new schema
  const server = new ApolloServer({
    schema
  });
  server.listen().then(() => console.log('4000'));
};

try {
  runServer();
} catch (err) {
  console.error(err);
}

@ericlewis
Copy link
Contributor

ericlewis commented Aug 21, 2018

@wawhal I think you need to make a resolver for it. We are using it ourselves internally, but only one of our APIs has subscriptions the other does not, it def does work with remote schemas though using my fix. I also wrote a resolver because mine was spitting out the same error.

@wawhal
Copy link

wawhal commented Aug 22, 2018

@ericlewis Not really. The problem was that my subscription was returning an array and the delegation resolvers were converting array to objects: For example, it was converting:

{
  data: {
    game: [
       {
          id: 4,
          name: CS
       },
       {
          id: 6,
          name: CS
       }
    ]
}

to:

{
  data: {
    game: {
       0: {
          id: 4,
          name: CS
       },
       1: {
          id: 6,
          name: CS
       }
    }
}

Which is why I was getting the error Expected iterable but did not find one for field Subscription.game.

This pull request should fix the problem.

Current workaround is to write a resolver that converts the object back to an array before return.

@ubbop42
Copy link

ubbop42 commented Dec 12, 2018

@wawhal Im following ur implementation. but i get {
"error": "Could not connect to websocket endpoint ws://localhost:3000/graphql. Please check if the endpoint url is correct."
}
It works fine on the remote graphQL service. do i need to add anything else.A resolver??

@lindesvard
Copy link

lindesvard commented Jan 8, 2019

Any updates on this? Having similar problem.

Have merged two schemas and one of them have subscription. The result in the gateway is:

{
  "data": {
    "postCreated": null
  }
}

I tried to console.log the output of the transformedResult (look @ericlewis post above) and first I get the correct object and then it logs out null. Here is the result I get from the console.log

{ id: 31366, message: '123', user: { nick: 'CG' } }
null

So it seems the actual data is coming but it isn't able to resolve it I guess?

edit: I managed to fix it for me. I use the RenameTypes and RenameRootFields. So if I don't rename the subscriptions I get all the data!

    return transformSchema(schema, [
      new RenameTypes(name => `${remote.prefix}_${name}`),
      new RenameRootFields((operation, name) => {
        return operation === 'Subscription' ? name : `${remote.prefix}_${name}`
      }),
    ])

@jjangga0214
Copy link

jjangga0214 commented Jan 31, 2020

This is a working example of remote schema with subscription by webscoket and query and mutation by http.

Flow

Client request
-> context is created by reading req or connection(jwt is decoded and create user object in the context)
-> remote schema is executed
-> link is called
-> link is splitted by operation(wsLink for subscription, httpLink for queries and mutations)
-> wsLink or httpLink access to context created above (=graphqlContext)
-> wsLink or httpLink use context to created headers(authorization header with signed jwt in this example) for remote schema.
-> "subscription" or "query or mutation" are forwarded to remote server.

Note

  1. Currently, ContextLink does not have any effect on WebsocketLink. So, instead of concat, we should create raw ApolloLink.
  2. When creating context, checkout connection, not only req. The former will be available if the request is websocket, and it contains meta information user sends, like an auth token.
  3. HttpLink expects global fetch with standard spec. Thus, do not use node-fetch, whose spec is incompatible (especially with typescript). Instead, use cross-fetch.
const wsLink = new ApolloLink(operation => {
    // This is your context!
    const context = operation.getContext().graphqlContext
    
    // Create a new websocket link per request
    return new WebSocketLink({
      uri: "<YOUR_URI>",
      options: {
        reconnect: true,
        connectionParams: { // give custom params to your websocket backend (e.g. to handle auth) 
          headers: {
            authorization: jwt.sign(context.user, process.env.SUPER_SECRET),
            foo: 'bar'
          }
        },
      },
      webSocketImpl: ws,
    }).request(operation)
    // Instead of using `forward()` of Apollo link, we directly use websocketLink's request method
  })

const httpLink = setContext((_graphqlRequest, { graphqlContext }) => {
  return {
    headers: {
      authorization: jwt.sign(graphqlContext.user, process.env.SUPER_SECRET),
    },
  }
}).concat(new HttpLink({
  uri,
  fetch,
}))

const link = split(
  operation => {
    const definition = getMainDefinition(operation.query)
    return (
      definition.kind === 'OperationDefinition' &&
      definition.operation === 'subscription'
    )
  },
  wsLink, // <-- Executed if above function returns true
  httpLink, // <-- Executed if above function returns false
)

const schema = await introspectSchema(link)

const executableSchema = makeRemoteExecutableSchema({
    schema,
    link,
  })

const server = new ApolloServer({
  schema: mergeSchemas({ schemas: [ executableSchema, /* ...anotherschemas */] }),
  context: ({ req, connection }) => {
    let authorization;
    if (req) { // when query or mutation is requested by http
      authorization = req.headers.authorization
    } else if (connection) { // when subscription is requested by websocket
      authorization = connection.context.authorization
    }
    const token = authorization.replace('Bearer ', '')
    return {
      user: getUserFromToken(token),
    }
  },
})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests