-
Notifications
You must be signed in to change notification settings - Fork 342
Improvements for Authorization #53
Changes from all commits
662bf29
7a19511
cd8cf46
a248587
8837e13
1e87aaf
aca6b20
40d5ec5
6a60bf0
3c5a933
153f6a6
8e7082b
2ab5711
9eeaf82
b603475
d3911a7
66e7b0a
b268be0
0bda794
bd706c7
f50e074
dcfd6b9
248c662
f15f803
e1681cb
7c8b769
3bf41df
b700388
a90e5f4
b79da71
58203c4
9cddcc2
66dbcb4
d15e9ad
989870a
0fe5c5e
330e10c
e762028
3c72ddf
eaf872d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,10 +9,14 @@ A GraphQL websocket server and client to facilitate GraphQL subscriptions. | |
See [GitHunt-API](https://github.com/apollostack/GitHunt-API) and [GitHunt-React](https://github.com/apollostack/GitHunt-React) for an example server and client integration. | ||
|
||
## Client | ||
### `Constructor(url, options)` | ||
### `Constructor(url, options, connectionCallback)` | ||
- `url: string` : url that the client will connect to | ||
- `options?: Object` : optional object to modify default client behavior | ||
* `timeout: number` : how long the client should wait in ms for a subscription to be started (default 5000 ms) | ||
- `options?: Object` : optional, object to modify default client behavior | ||
* `timeout?: number` : how long the client should wait in ms for a subscription to be started (default 5000 ms)how long the client should wait in ms for a subscription to be started (default 5000 ms) | ||
* `connectionParams?: Object` : object that will be available as first argument of `onConnect` (in server side) | ||
* `reconnect?: boolean` : automatic reconnect in case of connection error | ||
* `reconnectionAttempts?: number` : how much reconnect attempts | ||
* `connectionCallback?: (error) => {}` : optional, callback that called after the first init message, with the error (if there is one) | ||
|
||
### Methods | ||
#### `subscribe(options, handler) => id` | ||
|
@@ -26,18 +30,30 @@ See [GitHunt-API](https://github.com/apollostack/GitHunt-API) and [GitHunt-React | |
- `id: string` : the subscription ID of the subscription to unsubscribe from | ||
|
||
## Server | ||
### `Constructor(options, httpServer)` | ||
### `Constructor(options, socketOptions)` | ||
- `options: {ServerOptions}` | ||
* `subscriptionManager: SubscriptionManager` : GraphQL subscription manager | ||
* `onSubscribe?: (message: SubscribeMessage, params: SubscriptionOptions, webSocketRequest: WebSocketRequest)` : optional method to create custom params that will be used when resolving this subscription | ||
* `keepAlive?: number` : optional interval in ms to send `SUBSCRIPTION_KEEPALIVE` messages to all clients | ||
* `onSubscribe?: (message: SubscribeMessage, params: SubscriptionOptions, webSocket: WebSocket)` : optional method to create custom params that will be used when resolving this subscription | ||
* `onUnsubscribe?: (webSocket: WebSocket)` : optional method that called when a client unsubscribe | ||
* `onConnect?: (connectionParams: Object, webSocket: WebSocket)` : optional method that called when a client connects to the socket, called with the `connectionParams` from the client, if the return value is an object, its elements will be added to the context. return `false` or throw an exception to reject the connection. May return a Promise. | ||
* `onDisconnect?: (webSocket: WebSocket)` : optional method that called when a client disconnects | ||
* `keepAlive?: number` : optional interval in ms to send `KEEPALIVE` messages to all clients | ||
- `socketOptions: {WebSocket.IServerOptions}` : options to pass to the WebSocket object (full docs [here](https://github.com/websockets/ws/blob/master/doc/ws.md)) | ||
* `server?: HttpServer` - existing HTTP server to use (use without `host`/`port`) | ||
* `host?: string` - server host | ||
* `port?: number` - server port | ||
* `path?: string` - endpoint path | ||
|
||
### `addGraphQLSubscriptions(networkInterface, Client)` | ||
A Quick way to add the subscribe and unsubscribe functions to the [network interface](http://dev.apollodata.com/core/network.html#createNetworkInterface) | ||
|
||
## Client-server messages | ||
Each message has a type, as well as associated fields depending on the message type. | ||
### Client -> Server | ||
|
||
#### INIT | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So interesting naming wrinkle here. The existing messages all start with SUBSCRIPTION_ and your new ones don't. Abstractly I think it's great to separate the INIT messages from the SUBSCRIPTION messages. But now SUBSCRIPTION_KEEPALIVE is weirdly named. Since we're doing enough backwards-incompatible stuff here maybe rename that one to KEEPALIVE? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good but there are a couple remaining references to SUBSCRIPTION_KEEPALIVE There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
Client sends this message after connecting, this triggers `onConnect` on the server. | ||
|
||
#### SUBSCRIPTION_START | ||
Client sends this message to start a subscription for a query. | ||
- `query: GraphQLDocument` : GraphQL subscription | ||
|
@@ -50,6 +66,14 @@ Client sends this message to end a subscription. | |
- `id: string` : subscription ID of the subscription to be terminated | ||
|
||
### Server -> Client | ||
|
||
#### INIT_FAIL | ||
The server sends this message if `onConnect` callback returns `false` or throws an exception. after sending this message, the server disconnects the client. | ||
- `payload: Object`: the server side error | ||
|
||
#### INIT_SUCCESS | ||
The server sends this message if `onConnect` callback returns any other value then `false`. | ||
|
||
#### SUBSCRIPTION_SUCCESS | ||
The server sends this message to confirm that it has validated the subscription query and | ||
is subscribed to the triggers. | ||
|
@@ -66,5 +90,5 @@ GraphQL result sent periodically from server to client according to subscription | |
- `payload: GraphQLResult` : GraphQL result from running the subscription | ||
- `id: string` : subscription ID | ||
|
||
#### SUBSCRIPTION_KEEPALIVE | ||
#### KEEPALIVE | ||
Server message sent periodically to keep the client connection alive. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,28 @@ | ||
import * as websocket from 'websocket'; | ||
const W3CWebSocket = (websocket as { [key: string]: any })['w3cwebsocket']; | ||
import * as Backoff from 'backo2'; | ||
import {EventEmitter, ListenerFn} from 'eventemitter3'; | ||
|
||
declare let window: any; | ||
const _global = typeof global !== 'undefined' ? global : (typeof window !== 'undefined' ? window : {}); | ||
const NativeWebSocket = _global.WebSocket || _global.MozWebSocket; | ||
|
||
import { | ||
SUBSCRIPTION_FAIL, | ||
SUBSCRIPTION_DATA, | ||
SUBSCRIPTION_START, | ||
SUBSCRIPTION_SUCCESS, | ||
SUBSCRIPTION_END, | ||
SUBSCRIPTION_KEEPALIVE, | ||
KEEPALIVE, | ||
INIT, | ||
INIT_FAIL, | ||
INIT_SUCCESS, | ||
} from './messageTypes'; | ||
import { GRAPHQL_SUBSCRIPTIONS } from './protocols'; | ||
|
||
import isString = require('lodash.isstring'); | ||
import isObject = require('lodash.isobject'); | ||
|
||
export * from './helpers'; | ||
|
||
export interface SubscriptionOptions { | ||
query: string; | ||
variables?: Object; | ||
|
@@ -23,28 +31,32 @@ export interface SubscriptionOptions { | |
} | ||
|
||
export interface Subscription { | ||
options: SubscriptionOptions, | ||
handler: (error: Error[], result?: any) => void, | ||
options: SubscriptionOptions; | ||
handler: (error: Error[], result?: any) => void; | ||
} | ||
|
||
export interface Subscriptions { | ||
[id: string]: Subscription; | ||
} | ||
|
||
export type ConnectionParams = {[paramName: string]: any}; | ||
|
||
export interface ClientOptions { | ||
connectionParams?: ConnectionParams; | ||
timeout?: number; | ||
reconnect?: boolean; | ||
reconnectionAttempts?: number; | ||
connectionCallback?: (error: Error[], result?: any) => void; | ||
} | ||
|
||
const DEFAULT_SUBSCRIPTION_TIMEOUT = 5000; | ||
|
||
export default class Client { | ||
|
||
export class SubscriptionClient { | ||
public client: any; | ||
public subscriptions: Subscriptions; | ||
private url: string; | ||
private maxId: number; | ||
private connectionParams: ConnectionParams; | ||
private subscriptionTimeout: number; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a pre-existing issue but one worth considering: if you use reconnect, then subscriptionTimeout (which cannot be disabled) seems broken. Like, any subscribe is going to time out if you call it while you're offline, even though you might later reconnect! Will apps actually respond usefully to failed subscriptions by trying to subscribe again later? Tracing the timeout error looks like it would eventually get passed to an onError handler passed to subscribeToMore, which neither your GS PR nor GitHunt-react passes. Hmm... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, need to handle this with |
||
private waitingSubscriptions: {[id: string]: boolean}; // subscriptions waiting for SUBSCRIPTION_SUCCESS | ||
private unsentMessagesQueue: Array<any>; // queued messages while websocket is opening. | ||
|
@@ -53,14 +65,27 @@ export default class Client { | |
private reconnectionAttempts: number; | ||
private reconnectSubscriptions: Subscriptions; | ||
private backoff: any; | ||
private connectionCallback: any; | ||
private eventEmitter: EventEmitter; | ||
private wsImpl: any; | ||
|
||
constructor(url: string, options?: ClientOptions) { | ||
constructor(url: string, options?: ClientOptions, webSocketImpl?: any) { | ||
const { | ||
connectionCallback = undefined, | ||
connectionParams = {}, | ||
timeout = DEFAULT_SUBSCRIPTION_TIMEOUT, | ||
reconnect = false, | ||
reconnectionAttempts = Infinity, | ||
} = (options || {}); | ||
|
||
this.wsImpl = webSocketImpl || NativeWebSocket; | ||
|
||
if (!this.wsImpl) { | ||
throw new Error('Unable to find native implementation, or alternative implementation for WebSocket!'); | ||
} | ||
|
||
this.connectionParams = connectionParams; | ||
this.connectionCallback = connectionCallback; | ||
this.url = url; | ||
this.subscriptions = {}; | ||
this.maxId = 0; | ||
|
@@ -72,9 +97,19 @@ export default class Client { | |
this.reconnecting = false; | ||
this.reconnectionAttempts = reconnectionAttempts; | ||
this.backoff = new Backoff({ jitter: 0.5 }); | ||
this.eventEmitter = new EventEmitter(); | ||
|
||
this.connect(); | ||
} | ||
|
||
public get status() { | ||
return this.client.readyState; | ||
} | ||
|
||
public close() { | ||
this.client.close(); | ||
} | ||
|
||
public subscribe(options: SubscriptionOptions, handler: (error: Error[], result?: any) => void) { | ||
const { query, variables, operationName, context } = options; | ||
|
||
|
@@ -101,14 +136,34 @@ export default class Client { | |
this.subscriptions[subId] = {options, handler}; | ||
this.waitingSubscriptions[subId] = true; | ||
setTimeout( () => { | ||
if (this.waitingSubscriptions[subId]){ | ||
if (this.waitingSubscriptions[subId]) { | ||
handler([new Error('Subscription timed out - no response from server')]); | ||
this.unsubscribe(subId); | ||
} | ||
}, this.subscriptionTimeout); | ||
return subId; | ||
} | ||
|
||
public on(eventName: string, callback: ListenerFn, context?: any): Function { | ||
const handler = this.eventEmitter.on(eventName, callback, context); | ||
|
||
return () => { | ||
handler.off(eventName, callback, context); | ||
}; | ||
} | ||
|
||
public onConnect(callback: ListenerFn, context?: any): Function { | ||
return this.on('connect', callback, context); | ||
} | ||
|
||
public onDisconnect(callback: ListenerFn, context?: any): Function { | ||
return this.on('disconnect', callback, context); | ||
} | ||
|
||
public onReconnect(callback: ListenerFn, context?: any): Function { | ||
return this.on('reconnect', callback, context); | ||
} | ||
|
||
public unsubscribe(id: number) { | ||
delete this.subscriptions[id]; | ||
delete this.waitingSubscriptions[id]; | ||
|
@@ -168,6 +223,7 @@ export default class Client { | |
if (this.backoff.attempts > this.reconnectionAttempts) { | ||
return; | ||
} | ||
|
||
if (!this.reconnecting) { | ||
this.reconnectSubscriptions = this.subscriptions; | ||
this.subscriptions = {}; | ||
|
@@ -176,46 +232,65 @@ export default class Client { | |
} | ||
const delay = this.backoff.duration(); | ||
setTimeout(() => { | ||
this.connect(); | ||
this.connect(true); | ||
}, delay); | ||
} | ||
|
||
private connect() { | ||
this.client = new W3CWebSocket(this.url, GRAPHQL_SUBSCRIPTIONS); | ||
private connect(isReconnect: boolean = false) { | ||
this.client = new this.wsImpl(this.url, GRAPHQL_SUBSCRIPTIONS); | ||
|
||
this.client.onopen = () => { | ||
this.eventEmitter.emit(isReconnect ? 'reconnect' : 'connect'); | ||
this.reconnecting = false; | ||
this.backoff.reset(); | ||
Object.keys(this.reconnectSubscriptions).forEach((key) => { | ||
const { options, handler } = this.reconnectSubscriptions[key]; | ||
this.subscribe(options, handler); | ||
}) | ||
}); | ||
this.unsentMessagesQueue.forEach((message) => { | ||
this.client.send(JSON.stringify(message)); | ||
}); | ||
this.unsentMessagesQueue = []; | ||
|
||
// Send INIT message, no need to wait for connection to success (reduce roundtrips) | ||
this.sendMessage({type: INIT, payload: this.connectionParams}); | ||
}; | ||
|
||
this.client.onclose = () => { | ||
this.eventEmitter.emit('disconnect'); | ||
|
||
this.tryReconnect(); | ||
}; | ||
|
||
this.client.onmessage = (message: { data: string }) => { | ||
this.client.onerror = () => { | ||
this.tryReconnect(); | ||
}; | ||
|
||
this.client.onmessage = ({ data }: {data: any}) => { | ||
let parsedMessage: any; | ||
try { | ||
parsedMessage = JSON.parse(message.data); | ||
parsedMessage = JSON.parse(data); | ||
} catch (e) { | ||
throw new Error('Message must be JSON-parseable.'); | ||
throw new Error(`Message must be JSON-parseable. Got: ${data}`); | ||
} | ||
const subId = parsedMessage.id; | ||
if (parsedMessage.type !== SUBSCRIPTION_KEEPALIVE && !this.subscriptions[subId]) { | ||
if ([KEEPALIVE, INIT_SUCCESS, INIT_FAIL].indexOf(parsedMessage.type) === -1 && !this.subscriptions[subId]) { | ||
this.unsubscribe(subId); | ||
return; | ||
} | ||
|
||
// console.log('MSG', JSON.stringify(parsedMessage, null, 2)); | ||
switch (parsedMessage.type) { | ||
|
||
case INIT_FAIL: | ||
if (this.connectionCallback) { | ||
this.connectionCallback(parsedMessage.payload.error); | ||
} | ||
break; | ||
case INIT_SUCCESS: | ||
if (this.connectionCallback) { | ||
this.connectionCallback(); | ||
} | ||
break; | ||
case SUBSCRIPTION_SUCCESS: | ||
delete this.waitingSubscriptions[subId]; | ||
|
||
|
@@ -234,12 +309,12 @@ export default class Client { | |
} | ||
break; | ||
|
||
case SUBSCRIPTION_KEEPALIVE: | ||
case KEEPALIVE: | ||
break; | ||
|
||
default: | ||
throw new Error('Invalid message type - must be of type `subscription_start`, `subscription_data` or `subscription_keepalive`.'); | ||
throw new Error('Invalid message type!'); | ||
} | ||
}; | ||
} | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is doubled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍