Skip to content

Commit

Permalink
* Moved filter functionality to the SubscriptionsManager.
Browse files Browse the repository at this point in the history
* Added pubsub argument to SubscriptionManager to allow for different kinds of PubSub Engines
* Updated typescript dependencies to 2.0.0 to allow for the current graphql typings to work. see microsoft/TypeScript#7279
  • Loading branch information
davidyaha committed Sep 2, 2016
1 parent a80b84c commit 17d4cf4
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 37 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"mocha": "^3.0.0",
"remap-istanbul": "^0.6.4",
"tslint": "^3.13.0",
"typescript": "^1.8.10",
"typescript": "^2.0.0",
"typings": "^1.3.2"
},
"typings": "dist/index.d.ts",
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export { FilteredPubSub, SubscriptionManager } from './pubsub';
export { PubSub, SubscriptionManager } from './pubsub';
46 changes: 29 additions & 17 deletions src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// This is basically just event emitters wrapped with a function that filters messages.
//
import { EventEmitter } from 'events';
import graphql, {
import graphql, {
GraphQLSchema,
GraphQLError,
validate,
Expand All @@ -21,7 +21,13 @@ import {
subscriptionHasSingleRootField
} from './validation';

export class FilteredPubSub {
export interface PubSubEngine {
publish(triggerName: string, payload: any): boolean
subscribe(triggerName: string, onMessage: Function): number
unsubscribe(subId: number)
}

export class PubSub implements PubSubEngine {
private ee: EventEmitter;
private subscriptions: {[key: string]: [string, Function]};
private subIdCounter: number;
Expand All @@ -32,27 +38,28 @@ export class FilteredPubSub {
this.subIdCounter = 0;
}

public publish(triggerName: string, payload: any){
public publish(triggerName: string, payload: any): boolean {
this.ee.emit(triggerName, payload);
// Not using the value returned from emit method because it gives
// irrelevant false when there are no listeners.
return true;
}

public subscribe(triggerName: string, filterFunc: Function, handler: Function): number{
// notify handler only if filterFunc returns true
const onMessage = (data) => filterFunc(data) ? handler(data) : null
public subscribe(triggerName: string, onMessage: Function): number {
this.ee.addListener(triggerName, onMessage);
this.subIdCounter = this.subIdCounter + 1;
this.subscriptions[this.subIdCounter] = [triggerName, onMessage];
return this.subIdCounter;
return this.subIdCounter;
}

public unsubscribe(subId: number): void {
public unsubscribe(subId: number) {
const [triggerName, onMessage] = this.subscriptions[subId];
delete this.subscriptions[subId];
this.ee.removeListener(triggerName, onMessage);
}
}

export class ValidationError extends Error{
export class ValidationError extends Error {
errors: Array<GraphQLError>;
message: string;

Expand All @@ -75,21 +82,23 @@ export interface SubscriptionOptions {

// This manages actual GraphQL subscriptions.
export class SubscriptionManager {
private pubsub: FilteredPubSub;
private pubsub: PubSubEngine;
private schema: GraphQLSchema;
private setupFunctions: { [subscriptionName: string]: Function };
private subscriptions: { [externalId: number]: Array<number>};
private maxSubscriptionId: number;

constructor(options: { schema: GraphQLSchema, setupFunctions: {[subscriptionName: string]: Function} }){
this.pubsub = new FilteredPubSub();
constructor(options: { schema: GraphQLSchema,
setupFunctions: {[subscriptionName: string]: Function},
pubsub: PubSubEngine }){
this.pubsub = options.pubsub;
this.schema = options.schema;
this.setupFunctions = options.setupFunctions || {};
this.subscriptions = {};
this.maxSubscriptionId = 0;
}

public publish(triggerName: string, payload: any){
public publish(triggerName: string, payload: any) {
this.pubsub.publish(triggerName, payload);
}

Expand Down Expand Up @@ -126,7 +135,7 @@ export class SubscriptionManager {
const fields = this.schema.getSubscriptionType().getFields();
rootField.arguments.forEach( arg => {
// we have to get the one arg's definition from the schema
const argDefinition = fields[subscriptionName].args.filter(
const argDefinition = fields[subscriptionName].args.filter(
argDef => argDef.name === arg.name.value
)[0];
args[argDefinition.name] = valueFromAST(arg.value, argDefinition.type, options.variables);
Expand All @@ -146,7 +155,7 @@ export class SubscriptionManager {
Object.keys(triggerMap).forEach( triggerName => {
// 2. generate the filter function and the handler function
const onMessage = rootValue => {
// rootValue is the payload sent by the event emitter / trigger
// rootValue is the payload sent by the event emitter / trigger
// by convention this is the value returned from the mutation resolver

try {
Expand All @@ -166,9 +175,12 @@ export class SubscriptionManager {
}
}

const isTriggering: Function = triggerMap[triggerName];

// 3. subscribe and return the subscription id
this.subscriptions[externalSubscriptionId].push(
this.pubsub.subscribe(triggerName, triggerMap[triggerName], onMessage)
// Will run the onMessage function only if the message passes the filter function.
this.pubsub.subscribe(triggerName, (data) => isTriggering(data) && onMessage(data))
);
});
return externalSubscriptionId;
Expand All @@ -180,4 +192,4 @@ export class SubscriptionManager {
this.pubsub.unsubscribe(internalId);
});
}
}
}
35 changes: 17 additions & 18 deletions src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,32 @@ import {
} from 'graphql';

import {
FilteredPubSub,
PubSub,
SubscriptionManager,
} from '../pubsub';

import { subscriptionHasSingleRootField } from '../validation';

describe('FilteredPubSub', function() {
describe('PubSub', function() {
it('can subscribe and is called when events happen', function(done) {
const ps = new FilteredPubSub();
ps.subscribe('a', () => true, payload => {
const ps = new PubSub();
ps.subscribe('a', payload => {
expect(payload).to.equals('test');
done();
});
ps.publish('a', 'test');
});

it('can filter events that get sent to subscribers', function(done) {
const ps = new FilteredPubSub();
ps.subscribe('a', payload => payload !== 'bad', payload => {
expect(payload).to.equals('good');
done();
});
ps.publish('a', 'bad');
ps.publish('a', 'good');
const succeed = ps.publish('a', 'test');
expect(succeed).to.be.true;
});

it('can unsubscribe', function(done) {
const ps = new FilteredPubSub();
const subId = ps.subscribe('a', () => true, payload => {
const ps = new PubSub();
const subId = ps.subscribe('a', payload => {
assert(false);
});
ps.unsubscribe(subId);
ps.publish('a', 'test');
const succeed = ps.publish('a', 'test');
expect(succeed).to.be.true; // True because publish success is not
// indicated by trigger having subscriptions
done(); // works because pubsub is synchronous
});
});
Expand Down Expand Up @@ -112,6 +105,7 @@ describe('SubscriptionManager', function() {
};
},
},
pubsub: new PubSub(),
});
it('throws an error if query is not valid', function() {
const query = 'query a{ testInt }';
Expand Down Expand Up @@ -234,6 +228,11 @@ describe('SubscriptionManager', function() {
setTimeout(done, 30);
});

it('throws an error when trying to unsubscribe from unknown id', function () {
expect(() => subManager.unsubscribe(123))
.to.throw('undefined');
});

it('calls the error callback if there is an execution error', function(done) {
const query = `subscription X($uga: Boolean!){
testSubscription @skip(if: $uga)
Expand Down

0 comments on commit 17d4cf4

Please sign in to comment.