diff --git a/CHANGELOG.md b/CHANGELOG.md index d2b4f931c..92db48dba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ ### vNEXT +- Added `addGraphQLSubscriptions` - use it to extend your network interface to work with `SubscriptionsClient` instance. +- Client now uses native WebSocket by default, and has optional field to provide another implementation (for NodeJS clients) +- Client now support INIT with custom object, so you can use if for authorization, or any other init params. +- Server and client are now separated with `browser` and `main` fields of `package.json` +- Client exposes workflow events for connect, disconnect and reconnect. +- Server exposes new events: `onUnsubscribe`, `onSubscribe`, `onConnect` and `onDisconnect`. +- Use `ws` package on server side, and expose it's options from server constructor. - ... ### v0.4.0 diff --git a/README.md b/README.md index 04269321c..6f91a5a93 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,14 @@ A GraphQL websocket server and client to facilitate GraphQL subscriptions. See [GitHunt-API](https://github.com/apollostack/GitHunt-API) and [GitHunt-React](https://github.com/apollostack/GitHunt-React) for an example server and client integration. ## Client -### `Constructor(url, options)` +### `Constructor(url, options, connectionCallback)` - `url: string` : url that the client will connect to -- `options?: Object` : optional object to modify default client behavior - * `timeout: number` : how long the client should wait in ms for a subscription to be started (default 5000 ms) +- `options?: Object` : optional, object to modify default client behavior + * `timeout?: number` : how long the client should wait in ms for a subscription to be started (default 5000 ms)how long the client should wait in ms for a subscription to be started (default 5000 ms) + * `connectionParams?: Object` : object that will be available as first argument of `onConnect` (in server side) + * `reconnect?: boolean` : automatic reconnect in case of connection error + * `reconnectionAttempts?: number` : how much reconnect attempts + * `connectionCallback?: (error) => {}` : optional, callback that called after the first init message, with the error (if there is one) ### Methods #### `subscribe(options, handler) => id` @@ -26,11 +30,19 @@ See [GitHunt-API](https://github.com/apollostack/GitHunt-API) and [GitHunt-React - `id: string` : the subscription ID of the subscription to unsubscribe from ## Server -### `Constructor(options, httpServer)` +### `Constructor(options, socketOptions)` - `options: {ServerOptions}` * `subscriptionManager: SubscriptionManager` : GraphQL subscription manager - * `onSubscribe?: (message: SubscribeMessage, params: SubscriptionOptions, webSocketRequest: WebSocketRequest)` : optional method to create custom params that will be used when resolving this subscription - * `keepAlive?: number` : optional interval in ms to send `SUBSCRIPTION_KEEPALIVE` messages to all clients + * `onSubscribe?: (message: SubscribeMessage, params: SubscriptionOptions, webSocket: WebSocket)` : optional method to create custom params that will be used when resolving this subscription + * `onUnsubscribe?: (webSocket: WebSocket)` : optional method that called when a client unsubscribe + * `onConnect?: (connectionParams: Object, webSocket: WebSocket)` : optional method that called when a client connects to the socket, called with the `connectionParams` from the client, if the return value is an object, its elements will be added to the context. return `false` or throw an exception to reject the connection. May return a Promise. + * `onDisconnect?: (webSocket: WebSocket)` : optional method that called when a client disconnects + * `keepAlive?: number` : optional interval in ms to send `KEEPALIVE` messages to all clients +- `socketOptions: {WebSocket.IServerOptions}` : options to pass to the WebSocket object (full docs [here](https://github.com/websockets/ws/blob/master/doc/ws.md)) + * `server?: HttpServer` - existing HTTP server to use (use without `host`/`port`) + * `host?: string` - server host + * `port?: number` - server port + * `path?: string` - endpoint path ### `addGraphQLSubscriptions(networkInterface, Client)` A Quick way to add the subscribe and unsubscribe functions to the [network interface](http://dev.apollodata.com/core/network.html#createNetworkInterface) @@ -38,6 +50,10 @@ A Quick way to add the subscribe and unsubscribe functions to the [network inter ## Client-server messages Each message has a type, as well as associated fields depending on the message type. ### Client -> Server + +#### INIT +Client sends this message after connecting, this triggers `onConnect` on the server. + #### SUBSCRIPTION_START Client sends this message to start a subscription for a query. - `query: GraphQLDocument` : GraphQL subscription @@ -50,6 +66,14 @@ Client sends this message to end a subscription. - `id: string` : subscription ID of the subscription to be terminated ### Server -> Client + +#### INIT_FAIL +The server sends this message if `onConnect` callback returns `false` or throws an exception. after sending this message, the server disconnects the client. +- `payload: Object`: the server side error + +#### INIT_SUCCESS +The server sends this message if `onConnect` callback returns any other value then `false`. + #### SUBSCRIPTION_SUCCESS The server sends this message to confirm that it has validated the subscription query and is subscribed to the triggers. @@ -66,5 +90,5 @@ GraphQL result sent periodically from server to client according to subscription - `payload: GraphQLResult` : GraphQL result from running the subscription - `id: string` : subscription ID -#### SUBSCRIPTION_KEEPALIVE +#### KEEPALIVE Server message sent periodically to keep the client connection alive. diff --git a/package.json b/package.json index 233d582e5..ca41912cd 100644 --- a/package.json +++ b/package.json @@ -2,18 +2,21 @@ "name": "subscriptions-transport-ws", "version": "0.4.1", "description": "A websocket transport for GraphQL subscriptions", - "main": "dist/index.js", + "main": "dist/server.js", + "browser": "dist/client.js", "repository": { "type": "git", "url": "git+https://github.com/apollostack/subscriptions-transport-ws.git" }, "dependencies": { + "@types/ws": "0.0.37", "backo2": "^1.0.2", + "eventemitter3": "^2.0.2", "graphql-subscriptions": "^0.2.0", "graphql-tag": "^1.2.3", "lodash.isobject": "^3.0.2", "lodash.isstring": "^4.0.1", - "websocket": "^1.0.23" + "ws": "^1.1.1" }, "scripts": { "compile": "tsc", @@ -34,7 +37,6 @@ "@types/mocha": "^2.2.31", "@types/node": "^6.0.38", "@types/sinon": "^1.16.31", - "@types/websocket": "^0.0.32", "chai": "^3.5.0", "graphql": "^0.7.0", "istanbul": "^1.0.0-alpha.2", diff --git a/src/client.ts b/src/client.ts index 98e361a9f..d9591405c 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,6 +1,9 @@ -import * as websocket from 'websocket'; -const W3CWebSocket = (websocket as { [key: string]: any })['w3cwebsocket']; import * as Backoff from 'backo2'; +import {EventEmitter, ListenerFn} from 'eventemitter3'; + +declare let window: any; +const _global = typeof global !== 'undefined' ? global : (typeof window !== 'undefined' ? window : {}); +const NativeWebSocket = _global.WebSocket || _global.MozWebSocket; import { SUBSCRIPTION_FAIL, @@ -8,13 +11,18 @@ import { SUBSCRIPTION_START, SUBSCRIPTION_SUCCESS, SUBSCRIPTION_END, - SUBSCRIPTION_KEEPALIVE, + KEEPALIVE, + INIT, + INIT_FAIL, + INIT_SUCCESS, } from './messageTypes'; import { GRAPHQL_SUBSCRIPTIONS } from './protocols'; import isString = require('lodash.isstring'); import isObject = require('lodash.isobject'); +export * from './helpers'; + export interface SubscriptionOptions { query: string; variables?: Object; @@ -23,28 +31,32 @@ export interface SubscriptionOptions { } export interface Subscription { - options: SubscriptionOptions, - handler: (error: Error[], result?: any) => void, + options: SubscriptionOptions; + handler: (error: Error[], result?: any) => void; } export interface Subscriptions { [id: string]: Subscription; } +export type ConnectionParams = {[paramName: string]: any}; + export interface ClientOptions { + connectionParams?: ConnectionParams; timeout?: number; reconnect?: boolean; reconnectionAttempts?: number; + connectionCallback?: (error: Error[], result?: any) => void; } const DEFAULT_SUBSCRIPTION_TIMEOUT = 5000; -export default class Client { - +export class SubscriptionClient { public client: any; public subscriptions: Subscriptions; private url: string; private maxId: number; + private connectionParams: ConnectionParams; private subscriptionTimeout: number; private waitingSubscriptions: {[id: string]: boolean}; // subscriptions waiting for SUBSCRIPTION_SUCCESS private unsentMessagesQueue: Array; // queued messages while websocket is opening. @@ -53,14 +65,27 @@ export default class Client { private reconnectionAttempts: number; private reconnectSubscriptions: Subscriptions; private backoff: any; + private connectionCallback: any; + private eventEmitter: EventEmitter; + private wsImpl: any; - constructor(url: string, options?: ClientOptions) { + constructor(url: string, options?: ClientOptions, webSocketImpl?: any) { const { + connectionCallback = undefined, + connectionParams = {}, timeout = DEFAULT_SUBSCRIPTION_TIMEOUT, reconnect = false, reconnectionAttempts = Infinity, } = (options || {}); + this.wsImpl = webSocketImpl || NativeWebSocket; + + if (!this.wsImpl) { + throw new Error('Unable to find native implementation, or alternative implementation for WebSocket!'); + } + + this.connectionParams = connectionParams; + this.connectionCallback = connectionCallback; this.url = url; this.subscriptions = {}; this.maxId = 0; @@ -72,9 +97,19 @@ export default class Client { this.reconnecting = false; this.reconnectionAttempts = reconnectionAttempts; this.backoff = new Backoff({ jitter: 0.5 }); + this.eventEmitter = new EventEmitter(); + this.connect(); } + public get status() { + return this.client.readyState; + } + + public close() { + this.client.close(); + } + public subscribe(options: SubscriptionOptions, handler: (error: Error[], result?: any) => void) { const { query, variables, operationName, context } = options; @@ -101,7 +136,7 @@ export default class Client { this.subscriptions[subId] = {options, handler}; this.waitingSubscriptions[subId] = true; setTimeout( () => { - if (this.waitingSubscriptions[subId]){ + if (this.waitingSubscriptions[subId]) { handler([new Error('Subscription timed out - no response from server')]); this.unsubscribe(subId); } @@ -109,6 +144,26 @@ export default class Client { return subId; } + public on(eventName: string, callback: ListenerFn, context?: any): Function { + const handler = this.eventEmitter.on(eventName, callback, context); + + return () => { + handler.off(eventName, callback, context); + }; + } + + public onConnect(callback: ListenerFn, context?: any): Function { + return this.on('connect', callback, context); + } + + public onDisconnect(callback: ListenerFn, context?: any): Function { + return this.on('disconnect', callback, context); + } + + public onReconnect(callback: ListenerFn, context?: any): Function { + return this.on('reconnect', callback, context); + } + public unsubscribe(id: number) { delete this.subscriptions[id]; delete this.waitingSubscriptions[id]; @@ -168,6 +223,7 @@ export default class Client { if (this.backoff.attempts > this.reconnectionAttempts) { return; } + if (!this.reconnecting) { this.reconnectSubscriptions = this.subscriptions; this.subscriptions = {}; @@ -176,46 +232,65 @@ export default class Client { } const delay = this.backoff.duration(); setTimeout(() => { - this.connect(); + this.connect(true); }, delay); } - private connect() { - this.client = new W3CWebSocket(this.url, GRAPHQL_SUBSCRIPTIONS); + private connect(isReconnect: boolean = false) { + this.client = new this.wsImpl(this.url, GRAPHQL_SUBSCRIPTIONS); this.client.onopen = () => { + this.eventEmitter.emit(isReconnect ? 'reconnect' : 'connect'); this.reconnecting = false; this.backoff.reset(); Object.keys(this.reconnectSubscriptions).forEach((key) => { const { options, handler } = this.reconnectSubscriptions[key]; this.subscribe(options, handler); - }) + }); this.unsentMessagesQueue.forEach((message) => { this.client.send(JSON.stringify(message)); }); this.unsentMessagesQueue = []; + + // Send INIT message, no need to wait for connection to success (reduce roundtrips) + this.sendMessage({type: INIT, payload: this.connectionParams}); }; this.client.onclose = () => { + this.eventEmitter.emit('disconnect'); + this.tryReconnect(); }; - this.client.onmessage = (message: { data: string }) => { + this.client.onerror = () => { + this.tryReconnect(); + }; + + this.client.onmessage = ({ data }: {data: any}) => { let parsedMessage: any; try { - parsedMessage = JSON.parse(message.data); + parsedMessage = JSON.parse(data); } catch (e) { - throw new Error('Message must be JSON-parseable.'); + throw new Error(`Message must be JSON-parseable. Got: ${data}`); } const subId = parsedMessage.id; - if (parsedMessage.type !== SUBSCRIPTION_KEEPALIVE && !this.subscriptions[subId]) { + if ([KEEPALIVE, INIT_SUCCESS, INIT_FAIL].indexOf(parsedMessage.type) === -1 && !this.subscriptions[subId]) { this.unsubscribe(subId); return; } // console.log('MSG', JSON.stringify(parsedMessage, null, 2)); switch (parsedMessage.type) { - + case INIT_FAIL: + if (this.connectionCallback) { + this.connectionCallback(parsedMessage.payload.error); + } + break; + case INIT_SUCCESS: + if (this.connectionCallback) { + this.connectionCallback(); + } + break; case SUBSCRIPTION_SUCCESS: delete this.waitingSubscriptions[subId]; @@ -234,12 +309,12 @@ export default class Client { } break; - case SUBSCRIPTION_KEEPALIVE: + case KEEPALIVE: break; default: - throw new Error('Invalid message type - must be of type `subscription_start`, `subscription_data` or `subscription_keepalive`.'); + throw new Error('Invalid message type!'); } }; } -}; +} diff --git a/src/helpers.ts b/src/helpers.ts index c131d52dc..a9d5686ed 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -1,8 +1,8 @@ -import Client from './client'; +import { SubscriptionClient } from './client'; import { print } from 'graphql-tag/printer'; -// quick way to add the subscribe and unsubscribe functions to the network interface -function addGraphQLSubscriptions(networkInterface: any, wsClient: Client): any { +// Quick way to add the subscribe and unsubscribe functions to the network interface +export function addGraphQLSubscriptions(networkInterface: any, wsClient: SubscriptionClient): any { return Object.assign(networkInterface, { subscribe(request: any, handler: any): number { return wsClient.subscribe({ @@ -12,8 +12,6 @@ function addGraphQLSubscriptions(networkInterface: any, wsClient: Client): any { }, unsubscribe(id: number): void { wsClient.unsubscribe(id); - } + }, }); } - -export { addGraphQLSubscriptions }; \ No newline at end of file diff --git a/src/index.ts b/src/index.ts deleted file mode 100644 index fed66d453..000000000 --- a/src/index.ts +++ /dev/null @@ -1,4 +0,0 @@ -import Client from './client'; -import SubscriptionServer from './server'; -import { addGraphQLSubscriptions } from './helpers'; -export { addGraphQLSubscriptions, SubscriptionServer, Client }; diff --git a/src/messageTypes.ts b/src/messageTypes.ts index 2a0fa6224..c1835e5b3 100644 --- a/src/messageTypes.ts +++ b/src/messageTypes.ts @@ -3,12 +3,18 @@ const SUBSCRIPTION_END = 'subscription_end'; const SUBSCRIPTION_DATA = 'subscription_data'; const SUBSCRIPTION_START = 'subscription_start'; const SUBSCRIPTION_SUCCESS = 'subscription_success'; -const SUBSCRIPTION_KEEPALIVE = 'subscription_keepalive'; +const KEEPALIVE = 'keepalive'; +const INIT = 'init'; +const INIT_SUCCESS = 'init_success'; +const INIT_FAIL = 'init_fail'; export { SUBSCRIPTION_DATA, SUBSCRIPTION_END, SUBSCRIPTION_FAIL, SUBSCRIPTION_START, SUBSCRIPTION_SUCCESS, - SUBSCRIPTION_KEEPALIVE, + KEEPALIVE, + INIT, + INIT_FAIL, + INIT_SUCCESS, }; diff --git a/src/server.ts b/src/server.ts index dd440b19c..9ab0000e9 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,9 +1,4 @@ -import { - server as WebSocketServer, // these are NOT the correct typings! - connection as Connection, - request as WebSocketRequest, - IMessage, -} from 'websocket'; +import * as WebSocket from 'ws'; import { SUBSCRIPTION_FAIL, @@ -11,34 +6,37 @@ import { SUBSCRIPTION_START, SUBSCRIPTION_END, SUBSCRIPTION_SUCCESS, - SUBSCRIPTION_KEEPALIVE, + KEEPALIVE, + INIT, + INIT_FAIL, + INIT_SUCCESS, } from './messageTypes'; -import { GRAPHQL_SUBSCRIPTIONS } from './protocols'; +import {GRAPHQL_SUBSCRIPTIONS} from './protocols'; +import {SubscriptionManager} from 'graphql-subscriptions'; +import isObject = require('lodash.isobject'); -import { SubscriptionManager } from 'graphql-subscriptions'; -import { SubscriptionOptions } from 'graphql-subscriptions/dist/pubsub'; -import { Server as HttpServer} from 'http'; +type ConnectionSubscriptions = {[subId: string]: number}; -type ConnectionSubscriptions = { [subId: string]: number }; +type ConnectionContext = { + initPromise?: Promise +}; export interface SubscribeMessage { - [key: string]: any, // any extention that will come with the message. + [key: string]: any; // any extension that will come with the message. + payload: string; query?: string; - variables?: { [key: string]: any }; + variables?: {[key: string]: any}; operationName?: string; id: string; type: string; -}; - -interface SubscriptionData { - query: string; - variables?: { [key: string]: any }; - operationName: string; } export interface ServerOptions { subscriptionManager: SubscriptionManager; onSubscribe?: Function; + onUnsubscribe?: Function; + onConnect?: Function; + onDisconnect?: Function; keepAlive?: number; // contextValue?: any; // rootValue?: any; @@ -47,19 +45,16 @@ export interface ServerOptions { // triggerGenerator?: (name: string, args: Object, context?: Object) => Array<{name: string, filter: Function}>; } -interface TriggerAction { - name: string; - rootValue: any; - contextValue?: any; -} - -class Server { +export class SubscriptionServer { private onSubscribe: Function; - private wsServer: WebSocketServer; + private onUnsubscribe: Function; + private onConnect: Function; + private onDisconnect: Function; + private wsServer: WebSocket.Server; private subscriptionManager: SubscriptionManager; - constructor(options: ServerOptions, httpServer: HttpServer) { - const { subscriptionManager, onSubscribe, keepAlive } = options; + constructor(options: ServerOptions, socketOptions: WebSocket.IServerOptions) { + const {subscriptionManager, onSubscribe, onUnsubscribe, onConnect, onDisconnect, keepAlive} = options; if (!subscriptionManager) { throw new Error('Must provide `subscriptionManager` to websocket server constructor.'); @@ -67,28 +62,29 @@ class Server { this.subscriptionManager = subscriptionManager; this.onSubscribe = onSubscribe; + this.onUnsubscribe = onUnsubscribe; + this.onConnect = onConnect; + this.onDisconnect = onDisconnect; // init and connect websocket server to http - this.wsServer = new WebSocketServer({ - httpServer: httpServer, - autoAcceptConnections: false, - // TODO: origin filter - }); + this.wsServer = new WebSocket.Server(socketOptions || {}); + + this.wsServer.on('connection', (request: WebSocket) => { + if (request.protocol === undefined || request.protocol.indexOf(GRAPHQL_SUBSCRIPTIONS) === -1) { + // Close the connection with an error code, and + // then terminates the actual network connection (sends FIN packet) + // 1002: protocol error + request.close(1002); + request.terminate(); - this.wsServer.on('request', (request) => { - if (request.requestedProtocols.indexOf(GRAPHQL_SUBSCRIPTIONS) === -1) { - request.reject(400, 'Unsupported protocol.'); return; } - // accept connection - const connection: Connection = request.accept(GRAPHQL_SUBSCRIPTIONS, request.origin); - // Regular keep alive messages if keepAlive is set if (keepAlive) { const keepAliveTimer = setInterval(() => { - if (connection && connection.state === 'open') { - this.sendKeepAlive(connection); + if (request.readyState === WebSocket.OPEN) { + this.sendKeepAlive(request); } else { clearInterval(keepAliveTimer); } @@ -96,138 +92,213 @@ class Server { } const connectionSubscriptions: ConnectionSubscriptions = Object.create(null); - connection.on('message', this.onMessage(connection, connectionSubscriptions, request)); - connection.on('close', this.onClose(connection, connectionSubscriptions)); + const connectionContext: ConnectionContext = Object.create(null); + + request.on('message', this.onMessage(request, connectionSubscriptions, connectionContext)); + request.on('close', () => { + this.onClose(request, connectionSubscriptions)(); + + if (this.onDisconnect) { + this.onDisconnect(request); + } + }); }); } - // TODO test that this actually works - private onClose(connection: Connection, connectionSubscriptions: ConnectionSubscriptions) { + private unsubscribe(connection: WebSocket, handleId: number) { + this.subscriptionManager.unsubscribe(handleId); + + if (this.onUnsubscribe) { + this.onUnsubscribe(connection); + } + } + + private onClose(connection: WebSocket, connectionSubscriptions: ConnectionSubscriptions) { return () => { - Object.keys(connectionSubscriptions).forEach( (subId) => { - this.subscriptionManager.unsubscribe(connectionSubscriptions[subId]); + Object.keys(connectionSubscriptions).forEach((subId) => { + this.unsubscribe(connection, connectionSubscriptions[subId]); delete connectionSubscriptions[subId]; }); - } + }; } - private onMessage(connection: Connection, connectionSubscriptions: ConnectionSubscriptions, webSocketRequest: WebSocketRequest) { - return (message: IMessage) => { + private onMessage(connection: WebSocket, connectionSubscriptions: ConnectionSubscriptions, connectionContext: ConnectionContext) { + let onInitResolve: any = null, onInitReject: any = null; + + connectionContext.initPromise = new Promise((resolve, reject) => { + onInitResolve = resolve; + onInitReject = reject; + }); + + return (message: any) => { let parsedMessage: SubscribeMessage; try { - parsedMessage = JSON.parse(message.utf8Data); + parsedMessage = JSON.parse(message); } catch (e) { - this.sendSubscriptionFail(connection, null, { errors: [{ message: e.message }] }); + this.sendSubscriptionFail(connection, null, {errors: [{message: e.message}]}); return; } const subId = parsedMessage.id; switch (parsedMessage.type) { + case INIT: + let onConnectPromise = Promise.resolve(true); + if (this.onConnect) { + onConnectPromise = Promise.resolve(this.onConnect(parsedMessage.payload, connection)); + } + + onInitResolve(onConnectPromise); + + connectionContext.initPromise.then((result) => { + if (result === false) { + throw new Error('Prohibited connection!'); + } + + return { + type: INIT_SUCCESS, + }; + }).catch((error: Error) => { + return { + type: INIT_FAIL, + error: error.message, + }; + }).then((resultMessage: any) => { + this.sendInitResult(connection, resultMessage); + }); + + break; case SUBSCRIPTION_START: - const baseParams: SubscriptionOptions = { - query: parsedMessage.query, - variables: parsedMessage.variables, - operationName: parsedMessage.operationName, - context: {}, - formatResponse: undefined, - formatError: undefined, - callback: undefined, - }; - let promisedParams = Promise.resolve(baseParams); - - if (this.onSubscribe){ - promisedParams = Promise.resolve(this.onSubscribe(parsedMessage, baseParams, webSocketRequest)); - } + connectionContext.initPromise.then((initResult) => { + const baseParams = { + query: parsedMessage.query, + variables: parsedMessage.variables, + operationName: parsedMessage.operationName, + context: Object.assign({}, isObject(initResult) ? initResult : {}), + formatResponse: undefined, + formatError: undefined, + callback: undefined, + }; + let promisedParams = Promise.resolve(baseParams); - // if we already have a subscription with this id, unsubscribe from it first - // TODO: test that this actually works - if (connectionSubscriptions[subId]) { - this.subscriptionManager.unsubscribe(connectionSubscriptions[subId]); - delete connectionSubscriptions[subId]; - } + if (this.onSubscribe) { + promisedParams = Promise.resolve(this.onSubscribe(parsedMessage, baseParams, connection)); + } + + // if we already have a subscription with this id, unsubscribe from it first + // TODO: test that this actually works + if (connectionSubscriptions[subId]) { + this.unsubscribe(connection, connectionSubscriptions[subId]); + delete connectionSubscriptions[subId]; + } + + promisedParams.then(params => { + if (typeof params !== 'object') { + const error = `Invalid params returned from onSubscribe! return values must be an object!`; + this.sendSubscriptionFail(connection, subId, { + errors: [{ + message: error, + }], + }); - promisedParams.then( params => { - // create a callback - // error could be a runtime exception or an object with errors - // result is a GraphQL ExecutionResult, which has an optional errors property - params.callback = (error: any, result: any) => { - if (!error) { - this.sendSubscriptionData(connection, subId, result); - } else if (error.errors) { - this.sendSubscriptionData(connection, subId, { errors: error.errors }); + throw new Error(error); + } + + // create a callback + // error could be a runtime exception or an object with errors + // result is a GraphQL ExecutionResult, which has an optional errors property + params.callback = (error: any, result: any) => { + if (!error) { + this.sendSubscriptionData(connection, subId, result); + } else if (error.errors) { + this.sendSubscriptionData(connection, subId, {errors: error.errors}); + } else { + this.sendSubscriptionData(connection, subId, {errors: [{message: error.message}]}); + } + }; + + return this.subscriptionManager.subscribe(params); + }).then((graphqlSubId: number) => { + connectionSubscriptions[subId] = graphqlSubId; + this.sendSubscriptionSuccess(connection, subId); + }).catch(e => { + if (e.errors) { + this.sendSubscriptionFail(connection, subId, {errors: e.errors}); } else { - this.sendSubscriptionData(connection, subId, { errors: [{ message: error.message }] }); + this.sendSubscriptionFail(connection, subId, {errors: [{message: e.message}]}); } - }; - return this.subscriptionManager.subscribe( params ); - }).then((graphqlSubId: number) => { - connectionSubscriptions[subId] = graphqlSubId; - this.sendSubscriptionSuccess(connection, subId); - }).catch( e => { - if (e.errors) { - this.sendSubscriptionFail(connection, subId, { errors: e.errors }); - } else { - this.sendSubscriptionFail(connection, subId, { errors: [{ message: e.message }] }); - } - return; + return; + }); }); break; case SUBSCRIPTION_END: - // find subscription id. Call unsubscribe. - // TODO untested. catch errors, etc. - if (typeof connectionSubscriptions[subId] !== 'undefined') { - this.subscriptionManager.unsubscribe(connectionSubscriptions[subId]); - delete connectionSubscriptions[subId]; - } + connectionContext.initPromise.then(() => { + // find subscription id. Call unsubscribe. + // TODO untested. catch errors, etc. + if (typeof connectionSubscriptions[subId] !== 'undefined') { + this.unsubscribe(connection, connectionSubscriptions[subId]); + delete connectionSubscriptions[subId]; + } + }); break; default: this.sendSubscriptionFail(connection, subId, { errors: [{ - message: 'Invalid message type. Message type must be `subscription_start` or `subscription_end`.' - }] + message: 'Invalid message type!', + }], }); } }; } - private sendSubscriptionData(connection: Connection, subId: string, payload: any): void { + private sendSubscriptionData(connection: WebSocket, subId: string, payload: any): void { let message = { type: SUBSCRIPTION_DATA, id: subId, payload, }; - connection.sendUTF(JSON.stringify(message)); + connection.send(JSON.stringify(message)); } - private sendSubscriptionFail(connection: Connection, subId: string, payload: any): void { + private sendSubscriptionFail(connection: WebSocket, subId: string, payload: any): void { let message = { type: SUBSCRIPTION_FAIL, id: subId, payload, }; - connection.sendUTF(JSON.stringify(message)); + connection.send(JSON.stringify(message)); } - private sendSubscriptionSuccess(connection: Connection, subId: string): void { + private sendSubscriptionSuccess(connection: WebSocket, subId: string): void { let message = { type: SUBSCRIPTION_SUCCESS, id: subId, }; - connection.sendUTF(JSON.stringify(message)); + connection.send(JSON.stringify(message)); + } + + private sendInitResult(connection: WebSocket, result: any): void { + connection.send(JSON.stringify(result), () => { + if (result.type === INIT_FAIL) { + // Close the connection with an error code, and + // then terminates the actual network connection (sends FIN packet) + // 1011: an unexpected condition prevented the request from being fulfilled + connection.close(1011); + connection.terminate(); + } + }); } - private sendKeepAlive(connection: Connection): void { + private sendKeepAlive(connection: WebSocket): void { let message = { - type: SUBSCRIPTION_KEEPALIVE, + type: KEEPALIVE, }; - connection.sendUTF(JSON.stringify(message)); + connection.send(JSON.stringify(message)); } -} -export default Server; +} \ No newline at end of file diff --git a/src/test/tests.ts b/src/test/tests.ts index 8a7d55379..69ad9e20e 100644 --- a/src/test/tests.ts +++ b/src/test/tests.ts @@ -4,6 +4,11 @@ import { expect, } from 'chai'; import * as sinon from 'sinon'; +import * as WebSocket from 'ws'; + +Object.assign(global, { + WebSocket: WebSocket, +}); import { GraphQLObjectType, @@ -11,13 +16,13 @@ import { GraphQLString, } from 'graphql'; -import { PubSub, SubscriptionManager } from 'graphql-subscriptions'; +import {PubSub, SubscriptionManager} from 'graphql-subscriptions'; import { SUBSCRIPTION_START, SUBSCRIPTION_FAIL, SUBSCRIPTION_DATA, - SUBSCRIPTION_KEEPALIVE, + KEEPALIVE, SUBSCRIPTION_END, } from '../messageTypes'; @@ -25,27 +30,19 @@ import { GRAPHQL_SUBSCRIPTIONS, } from '../protocols'; -import { createServer, IncomingMessage, ServerResponse } from 'http'; -import SubscriptionServer from '../server'; -import Client from '../client'; - -import { SubscribeMessage } from '../server'; -import { SubscriptionOptions } from 'graphql-subscriptions/dist/pubsub'; - -import { - server as WebSocketServer, - connection as Connection, - request as WebSocketRequest, -} from 'websocket'; -import * as websocket from 'websocket'; -const W3CWebSocket = (websocket as { [key: string]: any })['w3cwebsocket']; +import {createServer, IncomingMessage, ServerResponse} from 'http'; +import {SubscriptionServer} from '../server'; +import {SubscriptionClient} from '../client'; +import {SubscribeMessage} from '../server'; +import {SubscriptionOptions} from 'graphql-subscriptions/dist/pubsub'; const TEST_PORT = 4953; const KEEP_ALIVE_TEST_PORT = TEST_PORT + 1; const DELAYED_TEST_PORT = TEST_PORT + 2; const RAW_TEST_PORT = TEST_PORT + 4; +const EVENTS_TEST_PORT = TEST_PORT + 5; -const data: { [key: string]: { [key: string]: string } } = { +const data: {[key: string]: {[key: string]: string}} = { '1': { 'id': '1', 'name': 'Dan', @@ -63,8 +60,8 @@ const data: { [key: string]: { [key: string]: string } } = { const userType = new GraphQLObjectType({ name: 'User', fields: { - id: { type: GraphQLString }, - name: { type: GraphQLString }, + id: {type: GraphQLString}, + name: {type: GraphQLString}, }, }); @@ -72,7 +69,7 @@ const schema = new GraphQLSchema({ query: new GraphQLObjectType({ name: 'Query', fields: { - testString: { type: GraphQLString }, + testString: {type: GraphQLString}, }, }), subscription: new GraphQLObjectType({ @@ -82,7 +79,7 @@ const schema = new GraphQLSchema({ type: userType, // `args` describes the arguments that the `user` query accepts args: { - id: { type: GraphQLString }, + id: {type: GraphQLString}, }, // The resolve function describes how to 'resolve' or fulfill // the incoming query. @@ -95,7 +92,7 @@ const schema = new GraphQLSchema({ userFiltered: { type: userType, args: { - id: { type: GraphQLString }, + id: {type: GraphQLString}, }, resolve: function (_, {id}) { return data[id]; @@ -107,7 +104,11 @@ const schema = new GraphQLSchema({ return ctx; }, }, - error: { type: GraphQLString, resolve: () => { throw new Error('E1'); } }, + error: { + type: GraphQLString, resolve: () => { + throw new Error('E1'); + }, + }, }, }), }); @@ -116,7 +117,7 @@ const subscriptionManager = new SubscriptionManager({ schema, pubsub: new PubSub(), setupFunctions: { - 'userFiltered': (options: SubscriptionOptions, args: { [key: string]: any }) => ({ + 'userFiltered': (options: SubscriptionOptions, args: {[key: string]: any}) => ({ 'userFiltered': { filter: (user: any) => { return !args['id'] || user.id === args['id']; @@ -128,18 +129,30 @@ const subscriptionManager = new SubscriptionManager({ // indirect call to support spying const handlers = { - onSubscribe: (msg: SubscribeMessage, params: SubscriptionOptions, webSocketRequest: WebSocketRequest) => { - return Promise.resolve(Object.assign({}, params, { context: msg['context'] })); + onSubscribe: (msg: SubscribeMessage, params: SubscriptionOptions, webSocketRequest: WebSocket) => { + return Promise.resolve(Object.assign({}, params, {context: msg['context']})); }, }; const options = { subscriptionManager, - onSubscribe: (msg: SubscribeMessage, params: SubscriptionOptions, webSocketRequest: WebSocketRequest) => { + onSubscribe: (msg: SubscribeMessage, params: SubscriptionOptions, webSocketRequest: WebSocket) => { return handlers.onSubscribe(msg, params, webSocketRequest); }, }; +const eventsOptions = { + subscriptionManager, + onSubscribe: sinon.spy((msg: SubscribeMessage, params: SubscriptionOptions, webSocketRequest: WebSocket) => { + return Promise.resolve(Object.assign({}, params, {context: msg['context']})); + }), + onUnsubscribe: sinon.spy(), + onConnect: sinon.spy(() => { + return {test: 'test context'}; + }), + onDisconnect: sinon.spy(), +}; + function notFoundRequestListener(request: IncomingMessage, response: ServerResponse) { response.writeHead(404); response.end(); @@ -147,11 +160,15 @@ function notFoundRequestListener(request: IncomingMessage, response: ServerRespo const httpServer = createServer(notFoundRequestListener); httpServer.listen(TEST_PORT); -new SubscriptionServer(options, httpServer); +new SubscriptionServer(options, {server: httpServer}); const httpServerWithKA = createServer(notFoundRequestListener); httpServerWithKA.listen(KEEP_ALIVE_TEST_PORT); -new SubscriptionServer(Object.assign({}, options, {keepAlive: 10}), httpServerWithKA); +new SubscriptionServer(Object.assign({}, options, {keepAlive: 10}), {server: httpServerWithKA}); + +const httpServerWithEvents = createServer(notFoundRequestListener); +httpServerWithEvents.listen(EVENTS_TEST_PORT); +const eventsServer = new SubscriptionServer(eventsOptions, {server: httpServerWithEvents}); const httpServerWithDelay = createServer(notFoundRequestListener); httpServerWithDelay.listen(DELAYED_TEST_PORT); @@ -159,49 +176,214 @@ new SubscriptionServer(Object.assign({}, options, { onSubscribe: (msg: SubscribeMessage, params: SubscriptionOptions) => { return new Promise((resolve, reject) => { setTimeout(() => { - resolve(Object.assign({}, params, { context: msg['context'] })); + resolve(Object.assign({}, params, {context: msg['context']})); }, 100); }); }, -}), httpServerWithDelay); +}), {server: httpServerWithDelay}); const httpServerRaw = createServer(notFoundRequestListener); httpServerRaw.listen(RAW_TEST_PORT); -describe('Client', function() { +describe('Client', function () { - let wsServer: WebSocketServer; + let wsServer: WebSocket.Server; beforeEach(() => { - wsServer = new WebSocketServer({ - httpServer: httpServerRaw, - autoAcceptConnections: true, + wsServer = new WebSocket.Server({ + server: httpServerRaw, }); }); afterEach(() => { if (wsServer) { - wsServer.shutDown(); + wsServer.close(); } }); - it('removes subscription when it unsubscribes from it', function() { - const client = new Client(`ws://localhost:${TEST_PORT}/`); + it('should send INIT message when creating the connection', (done) => { + wsServer.on('connection', (connection: any) => { + connection.on('message', (message: any) => { + const parsedMessage = JSON.parse(message); + expect(parsedMessage.type).to.equals('init'); + done(); + }); + }); + + new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`); + }); + + it('should emit connect event for client side when socket is open', (done) => { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); + + const unregister = client.onConnect(() => { + unregister(); + done(); + }); + }); + + it('should emit disconnect event for client side when socket closed', (done) => { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`, { + connectionCallback: () => { + client.client.close(); + }, + }); + + const unregister = client.onDisconnect(() => { + unregister(); + done(); + }); + }); + + it('should emit reconnect event for client side when socket closed', (done) => { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`, { + reconnect: true, + reconnectionAttempts: 1, + connectionCallback: () => { + client.client.close(); + }, + }); - setTimeout( () => { + const unregister = client.onReconnect(() => { + unregister(); + done(); + }); + }); + + it('should throw an exception when query is not provided', () => { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); + + expect(() => { + client.subscribe({ + query: undefined, + operationName: 'useInfo', + variables: { + id: 3, + }, + }, function (error: any, result: any) { + //do nothing + } + ); + }).to.throw(); + }); + + it('should throw an exception when query is not valid', () => { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); + + expect(() => { + client.subscribe({ + query: {}, + operationName: 'useInfo', + variables: { + id: 3, + }, + }, function (error: any, result: any) { + //do nothing + } + ); + }).to.throw(); + }); + + it('should throw an exception when handler is not provided', () => { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); + + expect(() => { + client.subscribe({ + query: `subscription useInfo($id: String) { + user(id: $id) { + id + name + } + }`, + }, + undefined + ); + }).to.throw(); + }); + + it('should send connectionParams along with init message', (done) => { + const connectionParams: any = { + test: true, + }; + wsServer.on('connection', (connection: any) => { + connection.on('message', (message: any) => { + const parsedMessage = JSON.parse(message); + expect(JSON.stringify(parsedMessage.payload)).to.equal(JSON.stringify(connectionParams)); + done(); + }); + }); + + new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, { + connectionParams: connectionParams, + }); + }); + + it('should handle correctly init_fail message', (done) => { + wsServer.on('connection', (connection: any) => { + connection.on('message', (message: any) => { + connection.send(JSON.stringify({type: 'init_fail', payload: {error: 'test error'}})); + }); + }); + + new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, { + connectionCallback: (error: any) => { + expect(error).to.equals('test error'); + done(); + }, + }); + }); + + it('should handle init_fail message and handle server that closes connection', (done) => { + let client: any = null; + + wsServer.on('connection', (connection: any) => { + connection.on('message', (message: any) => { + connection.send(JSON.stringify({type: 'init_fail', payload: {error: 'test error'}}), () => { + connection.close(); + connection.terminate(); + + setTimeout(() => { + expect(client.client.readyState).to.equals(WebSocket.CLOSED); + done(); + }, 500); + }); + }); + }); + + client = new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`); + }); + + it('should handle correctly init_success message', (done) => { + wsServer.on('connection', (connection: any) => { + connection.on('message', (message: any) => { + connection.send(JSON.stringify({type: 'init_success'})); + }); + }); + + new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, { + connectionCallback: (error: any) => { + expect(error).to.equals(undefined); + done(); + }, + }); + }); + + it('removes subscription when it unsubscribes from it', function () { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); + + setTimeout(() => { let subId = client.subscribe({ - query: - `subscription useInfo($id: String) { + query: `subscription useInfo($id: String) { user(id: $id) { id name } }`, - operationName: 'useInfo', - variables: { - id: 3, - }, - }, function(error, result) { + operationName: 'useInfo', + variables: { + id: 3, + }, + }, function (error: any, result: any) { //do nothing } ); @@ -210,22 +392,21 @@ describe('Client', function() { }, 100); }); - it('queues messages while websocket is still connecting', function() { - const client = new Client(`ws://localhost:${TEST_PORT}/`); + it('queues messages while websocket is still connecting', function () { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); let subId = client.subscribe({ - query: - `subscription useInfo($id: String) { + query: `subscription useInfo($id: String) { user(id: $id) { id name } }`, - operationName: 'useInfo', - variables: { - id: 3, - }, - }, function(error, result) { + operationName: 'useInfo', + variables: { + id: 3, + }, + }, function (error: any, result: any) { //do nothing } ); @@ -237,17 +418,16 @@ describe('Client', function() { }, 100); }); - it('should call error handler when graphql result has errors', function(done) { - const client = new Client(`ws://localhost:${TEST_PORT}/`); + it('should call error handler when graphql result has errors', function (done) { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); - setTimeout( () => { - client.subscribe({ - query: - `subscription useInfo{ + setTimeout(() => { + client.subscribe({ + query: `subscription useInfo{ error }`, - variables: {}, - }, function(error, result) { + variables: {}, + }, function (error: any, result: any) { if (error) { client.unsubscribeAll(); done(); @@ -259,22 +439,21 @@ describe('Client', function() { } ); }, 100); - setTimeout( () => { + setTimeout(() => { subscriptionManager.publish('error', {}); }, 200); }); - it('should call error handler when graphql query is not valid', function(done) { - const client = new Client(`ws://localhost:${TEST_PORT}/`); + it('should call error handler when graphql query is not valid', function (done) { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); - setTimeout( () => { - client.subscribe({ - query: - `subscription useInfo{ + setTimeout(() => { + client.subscribe({ + query: `subscription useInfo{ invalid }`, - variables: {}, - }, function(error: Error[], result: any) { + variables: {}, + }, function (error: Error[], result: any) { if (error) { expect(error[0].message).to.equals('Cannot query field "invalid" on type "Subscription".'); done(); @@ -287,11 +466,11 @@ describe('Client', function() { }); function testBadServer(payload: any, errorMessage: string, done: Function) { - wsServer.on('connect', (connection: Connection) => { - connection.on('message', (message) => { - const parsedMessage = JSON.parse(message.utf8Data); + wsServer.on('connection', (connection: WebSocket) => { + connection.on('message', (message: any) => { + const parsedMessage = JSON.parse(message); if (parsedMessage.type === SUBSCRIPTION_START) { - connection.sendUTF(JSON.stringify({ + connection.send(JSON.stringify({ type: SUBSCRIPTION_FAIL, id: parsedMessage.id, payload, @@ -300,7 +479,7 @@ describe('Client', function() { }); }); - const client = new Client(`ws://localhost:${RAW_TEST_PORT}/`); + const client = new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`); client.subscribe({ query: ` subscription useInfo{ @@ -308,7 +487,7 @@ describe('Client', function() { } `, variables: {}, - }, function(errors: Error[], result: any) { + }, function (errors: Error[], result: any) { if (errors) { expect(errors[0].message).to.equals(errorMessage); } else { @@ -318,33 +497,32 @@ describe('Client', function() { }); } - it('should handle missing errors', function(done) { + it('should handle missing errors', function (done) { const errorMessage = 'Unknown error'; const payload = {}; testBadServer(payload, errorMessage, done); }); - it('should handle errors that are not an array', function(done) { + it('should handle errors that are not an array', function (done) { const errorMessage = 'Just an error'; const payload = { - errors: { message: errorMessage }, + errors: {message: errorMessage}, }; testBadServer(payload, errorMessage, done); }); - it('should throw an error when the susbcription times out', function(done) { + it('should throw an error when the susbcription times out', function (done) { // hopefully 1ms is fast enough to time out before the server responds - const client = new Client(`ws://localhost:${DELAYED_TEST_PORT}/`, { timeout: 1 }); + const client = new SubscriptionClient(`ws://localhost:${DELAYED_TEST_PORT}/`, {timeout: 1}); - setTimeout( () => { + setTimeout(() => { client.subscribe({ - query: - `subscription useInfo{ + query: `subscription useInfo{ error }`, operationName: 'useInfo', variables: {}, - }, function(error, result) { + }, function (error: any, result: any) { if (error) { expect(error[0].message).to.equals('Subscription timed out - no response from server'); done(); @@ -356,39 +534,40 @@ describe('Client', function() { }, 100); }); - it('should reconnect to the server', function(done) { + it('should reconnect to the server', function (done) { let connections = 0; - let client: Client; + let client: SubscriptionClient; let originalClient: any; - wsServer.on('connect', (connection: Connection) => { + wsServer.on('connection', (connection: WebSocket) => { connections += 1; if (connections === 1) { - wsServer.closeAllConnections(); + originalClient.close(); } else { expect(client.client).to.not.be.equal(originalClient); done(); } }); - client = new Client(`ws://localhost:${RAW_TEST_PORT}/`, { reconnect: true }); + client = new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, {reconnect: true}); originalClient = client.client; }); - it('should resubscribe after reconnect', function(done) { + it('should resubscribe after reconnect', function (done) { let connections = 0; - wsServer.on('connect', (connection: Connection) => { + let client: SubscriptionClient = null; + wsServer.on('connection', (connection: WebSocket) => { connections += 1; - connection.on('message', (message) => { - const parsedMessage = JSON.parse(message.utf8Data); + connection.on('message', (message: any) => { + const parsedMessage = JSON.parse(message); if (parsedMessage.type === SUBSCRIPTION_START) { if (connections === 1) { - wsServer.closeAllConnections(); + client.client.close(); } else { done(); } } }); }); - const client = new Client(`ws://localhost:${RAW_TEST_PORT}/`, { reconnect: true }); + client = new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, {reconnect: true}); client.subscribe({ query: ` subscription useInfo{ @@ -396,23 +575,69 @@ describe('Client', function() { } `, variables: {}, - }, function(errors: Error[], result: any) { + }, function (errors: Error[], result: any) { assert(false); }); }); - it('should stop trying to reconnect to the server', function(done) { + it('should throw an exception when trying to subscribe when socket is closed', function (done) { + let client: SubscriptionClient = null; + + client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`, {reconnect: true}); + + setTimeout(() => { + client.client.close(); + }, 500); + + setTimeout(() => { + expect(() => { + client.subscribe({ + query: ` + subscription useInfo{ + invalid + } + `, + variables: {}, + }, function (errors: Error[], result: any) { + // nothing + }); + + done(); + }).to.throw(); + }, 1000); + }); + + it('should throw an exception when the sent message is not a valid json', function (done) { + + + setTimeout(() => { + expect(() => { + let client: SubscriptionClient = null; + + wsServer.on('connection', (connection: any) => { + connection.on('message', (message: any) => { + connection.send('invalid json'); + }); + }); + + client = new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`); + done(); + }).to.throw(); + }, 1000); + }); + + it('should stop trying to reconnect to the server', function (done) { let connections = 0; - wsServer.on('connect', (connection: Connection) => { + wsServer.on('connection', (connection: WebSocket) => { connections += 1; if (connections === 1) { - wsServer.unmount(); - wsServer.closeAllConnections(); + wsServer.close(); } else { assert(false); } }); - const client = new Client(`ws://localhost:${RAW_TEST_PORT}/`, { + + const client = new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, { timeout: 100, reconnect: true, reconnectionAttempts: 1, @@ -424,8 +649,7 @@ describe('Client', function() { }); }); -describe('Server', function() { - +describe('Server', function () { let onSubscribeSpy: sinon.SinonSpy; beforeEach(() => { @@ -433,19 +657,147 @@ describe('Server', function() { }); afterEach(() => { - onSubscribeSpy.restore(); + if (onSubscribeSpy) { + onSubscribeSpy.restore(); + } + + if (eventsOptions) { + eventsOptions.onConnect.reset(); + eventsOptions.onDisconnect.reset(); + eventsOptions.onSubscribe.reset(); + eventsOptions.onUnsubscribe.reset(); + } + }); + + it('should throw an exception when creating a server without subscriptionManager', () => { + expect(() => { + new SubscriptionServer({subscriptionManager: undefined}, {server: httpServer}); + }).to.throw(); + }); + + it('should trigger onConnect when client connects and validated', (done) => { + new SubscriptionClient(`ws://localhost:${EVENTS_TEST_PORT}/`); + + setTimeout(() => { + assert(eventsOptions.onConnect.calledOnce); + done(); + }, 200); + }); + + it('should trigger onConnect with the correct connectionParams', (done) => { + const connectionParams: any = { + test: true, + }; + + new SubscriptionClient(`ws://localhost:${EVENTS_TEST_PORT}/`, { + connectionParams: connectionParams, + }); + + setTimeout(() => { + assert(eventsOptions.onConnect.calledOnce); + expect(JSON.stringify(eventsOptions.onConnect.getCall(0).args[0])).to.equal(JSON.stringify(connectionParams)); + done(); + }, 200); + }); + + it('should trigger onDisconnect when client disconnects', (done) => { + const client = new SubscriptionClient(`ws://localhost:${EVENTS_TEST_PORT}/`); + client.client.close(); + + setTimeout(() => { + assert(eventsOptions.onDisconnect.calledOnce); + done(); + }, 200); + }); + + it('should call unsubscribe when client closes the connection', (done) => { + const client = new SubscriptionClient(`ws://localhost:${EVENTS_TEST_PORT}/`); + const spy = sinon.spy(eventsServer, 'unsubscribe'); + + client.subscribe({ + query: `subscription useInfo($id: String) { + user(id: $id) { + id + name + } + }`, + operationName: 'useInfo', + variables: { + id: 3, + }, + }, function (error: any, result: any) { + // nothing + } + ); + + setTimeout(() => { + client.client.close(); + }, 500); + + setTimeout(() => { + assert(spy.calledOnce); + done(); + }, 1000); + }); + + it('should trigger onSubscribe when client subscribes', (done) => { + const client = new SubscriptionClient(`ws://localhost:${EVENTS_TEST_PORT}/`); + client.subscribe({ + query: `subscription useInfo($id: String) { + user(id: $id) { + id + name + } + }`, + operationName: 'useInfo', + variables: { + id: 3, + }, + }, (error: any, result: any) => { + if (error) { + assert(false); + } + }); + + setTimeout(() => { + assert(eventsOptions.onSubscribe.calledOnce); + done(); + }, 200); }); - it('should send correct results to multiple clients with subscriptions', function(done) { + it('should trigger onUnsubscribe when client unsubscribes', (done) => { + const client = new SubscriptionClient(`ws://localhost:${EVENTS_TEST_PORT}/`); + const subId = client.subscribe({ + query: `subscription useInfo($id: String) { + user(id: $id) { + id + name + } + }`, + operationName: 'useInfo', + variables: { + id: 3, + }, + }, function (error: any, result: any) { + //do nothing + }); - const client = new Client(`ws://localhost:${TEST_PORT}/`); - let client1 = new Client(`ws://localhost:${TEST_PORT}/`); + client.unsubscribe(subId); + + setTimeout(() => { + assert(eventsOptions.onUnsubscribe.calledOnce); + done(); + }, 200); + }); + + it('should send correct results to multiple clients with subscriptions', function (done) { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); + let client1 = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); let numResults = 0; - setTimeout( () => { + setTimeout(() => { client.subscribe({ - query: - `subscription useInfo($id: String) { + query: `subscription useInfo($id: String) { user(id: $id) { id name @@ -456,7 +808,7 @@ describe('Server', function() { id: 3, }, - }, function(error, result) { + }, function (error: any, result: any) { if (error) { assert(false); } @@ -472,12 +824,11 @@ describe('Server', function() { }); }, 100); - const client11 = new Client(`ws://localhost:${TEST_PORT}/`); + const client11 = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); let numResults1 = 0; - setTimeout(function() { + setTimeout(function () { client11.subscribe({ - query: - `subscription useInfo($id: String) { + query: `subscription useInfo($id: String) { user(id: $id) { id name @@ -488,18 +839,17 @@ describe('Server', function() { id: 2, }, - }, function(error, result) { - - if (error) { - assert(false); - } - if (result) { - assert.property(result, 'user'); - assert.equal(result.user.id, '2'); - assert.equal(result.user.name, 'Marie'); - numResults1++; - } - // if both error and result are null, this was a SUBSCRIPTION_SUCCESS message. + }, function (error: any, result: any) { + if (error) { + assert(false); + } + if (result) { + assert.property(result, 'user'); + assert.equal(result.user.id, '2'); + assert.equal(result.user.name, 'Marie'); + numResults1++; + } + // if both error and result are null, this was a SUBSCRIPTION_SUCCESS message. }); }, 100); @@ -517,9 +867,9 @@ describe('Server', function() { }); - it('should send a subscription_fail message to client with invalid query', function(done) { - const client1 = new Client(`ws://localhost:${TEST_PORT}/`); - setTimeout(function() { + it('should send a subscription_fail message to client with invalid query', function (done) { + const client1 = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); + setTimeout(function () { client1.client.onmessage = (message: any) => { let messageData = JSON.parse(message.data); assert.equal(messageData.type, SUBSCRIPTION_FAIL); @@ -527,18 +877,17 @@ describe('Server', function() { done(); }; client1.subscribe({ - query: - `subscription useInfo($id: String) { + query: `subscription useInfo($id: String) { user(id: $id) { id birthday } }`, - operationName: 'useInfo', - variables: { - id: 3, - }, - }, function(error, result) { + operationName: 'useInfo', + variables: { + id: 3, + }, + }, function (error: any, result: any) { //do nothing } ); @@ -546,14 +895,13 @@ describe('Server', function() { }); - it('should set up the proper filters when subscribing', function(done) { + it('should set up the proper filters when subscribing', function (done) { let numTriggers = 0; - const client3 = new Client(`ws://localhost:${TEST_PORT}/`); - const client4 = new Client(`ws://localhost:${TEST_PORT}/`); + const client3 = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); + const client4 = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); setTimeout(() => { client3.subscribe({ - query: - `subscription userInfoFilter1($id: String) { + query: `subscription userInfoFilter1($id: String) { userFiltered(id: $id) { id name @@ -563,7 +911,7 @@ describe('Server', function() { variables: { id: 3, }, - }, (error, result) => { + }, (error: any, result: any) => { if (error) { assert(false); } @@ -577,8 +925,7 @@ describe('Server', function() { } ); client4.subscribe({ - query: - `subscription userInfoFilter1($id: String) { + query: `subscription userInfoFilter1($id: String) { userFiltered(id: $id) { id name @@ -588,7 +935,7 @@ describe('Server', function() { variables: { id: 1, }, - }, (error, result) => { + }, (error: any, result: any) => { if (result) { numTriggers += 1; assert.property(result, 'userFiltered'); @@ -603,9 +950,9 @@ describe('Server', function() { ); }, 100); setTimeout(() => { - subscriptionManager.publish('userFiltered', { id: 1 }); - subscriptionManager.publish('userFiltered', { id: 2 }); - subscriptionManager.publish('userFiltered', { id: 3 }); + subscriptionManager.publish('userFiltered', {id: 1}); + subscriptionManager.publish('userFiltered', {id: 2}); + subscriptionManager.publish('userFiltered', {id: 3}); }, 200); setTimeout(() => { assert.equal(numTriggers, 2); @@ -613,17 +960,16 @@ describe('Server', function() { }, 300); }); - it('correctly sets the context in onSubscribe', function(done) { + it('correctly sets the context in onSubscribe', function (done) { const CTX = 'testContext'; - const client3 = new Client(`ws://localhost:${TEST_PORT}/`); + const client3 = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); client3.subscribe({ - query: - `subscription context { + query: `subscription context { context }`, - variables: { }, + variables: {}, context: CTX, - }, (error, result) => { + }, (error: any, result: any) => { client3.unsubscribeAll(); if (error) { assert(false); @@ -640,17 +986,19 @@ describe('Server', function() { }, 100); }); - it('passes through webSocketRequest to onSubscribe', function(done) { - const client = new Client(`ws://localhost:${TEST_PORT}/`); + it('passes through webSocketRequest to onSubscribe', function (done) { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); client.subscribe({ query: ` subscription context { context } `, - variables: { }, - }, (error, result) => { - assert(false); + variables: {}, + }, (error: any, result: any) => { + if (error) { + assert(false); + } }); setTimeout(() => { assert(onSubscribeSpy.calledOnce); @@ -659,22 +1007,21 @@ describe('Server', function() { }, 100); }); - it('does not send more subscription data after client unsubscribes', function() { - const client4 = new Client(`ws://localhost:${TEST_PORT}/`); + it('does not send more subscription data after client unsubscribes', function () { + const client4 = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); setTimeout(() => { let subId = client4.subscribe({ - query: - `subscription useInfo($id: String) { + query: `subscription useInfo($id: String) { user(id: $id) { id name } }`, - operationName: 'useInfo', - variables: { - id: 3, - }, - }, function(error, result) { + operationName: 'useInfo', + variables: { + id: 3, + }, + }, function (error: any, result: any) { //do nothing } ); @@ -691,15 +1038,20 @@ describe('Server', function() { }; }); - it('rejects a client that does not specify a supported protocol', function(done) { - const client = new W3CWebSocket(`ws://localhost:${TEST_PORT}/`); - client.onerror = (message: any) => { - done(); - }; + it('rejects a client that does not specify a supported protocol', function (done) { + const client = new WebSocket(`ws://localhost:${TEST_PORT}/`); + + client.on('close', (code) => { + if (code === 1002) { + done(); + } else { + assert(false); + } + }); }); - it('rejects unparsable message', function(done) { - const client = new W3CWebSocket(`ws://localhost:${TEST_PORT}/`, GRAPHQL_SUBSCRIPTIONS); + it('rejects unparsable message', function (done) { + const client = new WebSocket(`ws://localhost:${TEST_PORT}/`, GRAPHQL_SUBSCRIPTIONS); client.onmessage = (message: any) => { let messageData = JSON.parse(message.data); assert.equal(messageData.type, SUBSCRIPTION_FAIL); @@ -712,8 +1064,8 @@ describe('Server', function() { }; }); - it('rejects nonsense message', function(done) { - const client = new W3CWebSocket(`ws://localhost:${TEST_PORT}/`, GRAPHQL_SUBSCRIPTIONS); + it('rejects nonsense message', function (done) { + const client = new WebSocket(`ws://localhost:${TEST_PORT}/`, GRAPHQL_SUBSCRIPTIONS); client.onmessage = (message: any) => { let messageData = JSON.parse(message.data); assert.equal(messageData.type, SUBSCRIPTION_FAIL); @@ -728,8 +1080,8 @@ describe('Server', function() { it('does not crash on unsub for Object.prototype member', function(done) { // Use websocket because Client.unsubscribe will only take a number. - const client = new W3CWebSocket(`ws://localhost:${TEST_PORT}/`, - GRAPHQL_SUBSCRIPTIONS); + const client = new WebSocket(`ws://localhost:${TEST_PORT}/`, GRAPHQL_SUBSCRIPTIONS); + client.onopen = () => { client.send(JSON.stringify({type: SUBSCRIPTION_END, id: 'toString'})); // Strangely we don't send any acknowledgement for unsubbing from an @@ -739,28 +1091,31 @@ describe('Server', function() { }; }); - it('sends back any type of error', function(done) { - const client = new Client(`ws://localhost:${TEST_PORT}/`); + it('sends back any type of error', function (done) { + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); client.subscribe({ - query: - `invalid useInfo{ + query: `invalid useInfo{ error }`, variables: {}, - }, function(errors, result) { + }, function (errors: any, result: any) { client.unsubscribeAll(); assert.isAbove(errors.length, 0, 'Number of errors is greater than 0.'); done(); }); }); - it('handles errors prior to graphql execution', function(done) { + it('handles errors prior to graphql execution', function (done) { // replace the onSubscribeSpy with a custom handler, the spy will restore // the original method - handlers.onSubscribe = (msg: SubscribeMessage, params: SubscriptionOptions, webSocketRequest: WebSocketRequest) => { - return Promise.resolve(Object.assign({}, params, { context: () => { throw new Error('bad'); } })); + handlers.onSubscribe = (msg: SubscribeMessage, params: SubscriptionOptions, webSocketRequest: WebSocket) => { + return Promise.resolve(Object.assign({}, params, { + context: () => { + throw new Error('bad'); + }, + })); }; - const client = new Client(`ws://localhost:${TEST_PORT}/`); + const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`); client.subscribe({ query: ` subscription context { @@ -784,12 +1139,12 @@ describe('Server', function() { }, 100); }); - it('sends a keep alive signal in the socket', function(done) { - let client = new W3CWebSocket(`ws://localhost:${KEEP_ALIVE_TEST_PORT}/`, GRAPHQL_SUBSCRIPTIONS); + it('sends a keep alive signal in the socket', function (done) { + let client = new WebSocket(`ws://localhost:${KEEP_ALIVE_TEST_PORT}/`, GRAPHQL_SUBSCRIPTIONS); let yieldCount = 0; client.onmessage = (message: any) => { const parsedMessage = JSON.parse(message.data); - if (parsedMessage.type === SUBSCRIPTION_KEEPALIVE) { + if (parsedMessage.type === KEEPALIVE) { yieldCount += 1; if (yieldCount > 1) { client.close(); diff --git a/tsconfig.json b/tsconfig.json index cddbab249..710b454e9 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -16,7 +16,9 @@ "node_modules/@types" ], "types": [ - "@types/node" + "@types/node", + "@types/ws", + "eventemitter3" ] }, "exclude": [ diff --git a/tslint.json b/tslint.json index 4eb3cd80f..a2816e8b2 100644 --- a/tslint.json +++ b/tslint.json @@ -63,7 +63,7 @@ "no-unused-variable": true, "no-use-before-declare": true, "no-var-keyword": true, - "no-var-requires": true, + "no-var-requires": false, "object-literal-sort-keys": false, "one-line": [ true,