Skip to content

Commit

Permalink
Merge pull request #403 from apollostack/basic_alignment
Browse files Browse the repository at this point in the history
Basic polling query alignment
  • Loading branch information
Sashko Stubailo authored Jul 20, 2016
2 parents 83b578f + 961da3a commit 5ab05a7
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Expect active development and potentially significant breaking changes in the `0.x` track. We'll try to be diligent about releasing a `1.0` version in a timely fashion (ideally within 3 to 6 months), to signal the start of a more stable API.

### vNEXT
- - Integrated the scheduler so that polling queries on the same polling interval are batched together. [PR #403](https://github.com/apollostack/apollo-client/pull/403) and [Issue #401](https://github.com/apollostack/apollo-client/issues/401).

- Fixed a bug where fetching a query without an id and then later with an id resulted in an orphaned node within the store. [Issue #344](https://github.com/apollostack/apollo-client/issues/344) and [PR #389](https://github.com/apollostack/apollo-client/pull/389).
- Fix typings for some refactored types, `ObservableQuery` and `WatchQueryOptions`. [PR #428](https://github.com/apollostack/apollo-client/pull/428)
Expand Down
61 changes: 44 additions & 17 deletions src/ObservableQuery.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,49 @@
import assign = require('lodash.assign');
import { WatchQueryOptions } from './watchQueryOptions';

import { Observable, Observer } from './util/Observable';

import { ApolloQueryResult } from './index';
import {
QueryScheduler,
} from './scheduler';

import { WatchQueryOptions } from './watchQueryOptions';
import {
QueryManager,
} from './QueryManager';

import { QueryManager } from './QueryManager';
import {
ApolloQueryResult,
} from './index';

import assign = require('lodash.assign');

export class ObservableQuery extends Observable<ApolloQueryResult> {
public refetch: (variables?: any) => Promise<ApolloQueryResult>;
public stopPolling: () => void;
public startPolling: (p: number) => void;
public options: WatchQueryOptions;
public queryManager: QueryManager;
public queryId: string;
private queryId: string;
private scheduler: QueryScheduler;
private queryManager: QueryManager;

constructor({
queryManager,
scheduler,
options,
shouldSubscribe = true,
}: {
queryManager: QueryManager,
scheduler: QueryScheduler,
options: WatchQueryOptions,
shouldSubscribe?: boolean,
}) {

const queryManager = scheduler.queryManager;
const queryId = queryManager.generateQueryId();
const isPollingQuery = !!options.pollInterval;

const subscriberFunction = (observer: Observer<ApolloQueryResult>) => {
const retQuerySubscription = {
unsubscribe: () => {
if (isPollingQuery) {
scheduler.stopPollingQuery(queryId);
}
queryManager.stopQuery(queryId);
},
};
Expand All @@ -39,15 +53,27 @@ export class ObservableQuery extends Observable<ApolloQueryResult> {
queryManager.addQuerySubscription(queryId, retQuerySubscription);
}

if (isPollingQuery) {
if (options.noFetch) {
throw new Error('noFetch option should not use query polling.');
}

this.scheduler.startPollingQuery(
options,
queryId
);
}
queryManager.startQuery(
queryId,
options,
queryManager.queryListenerForObserver(queryId, options, observer)
);

return retQuerySubscription;
};
super(subscriberFunction);
this.options = options;
this.scheduler = scheduler;
this.queryManager = queryManager;
this.queryId = queryId;

Expand All @@ -65,21 +91,22 @@ export class ObservableQuery extends Observable<ApolloQueryResult> {
};

this.stopPolling = () => {
if (this.queryManager.pollingTimers[this.queryId]) {
clearInterval(this.queryManager.pollingTimers[this.queryId]);
this.queryManager.stopQuery(this.queryId);
if (isPollingQuery) {
this.scheduler.stopPollingQuery(this.queryId);
}
};

this.startPolling = (pollInterval) => {
if (this.options.noFetch) {
throw new Error('noFetch option should not use query polling.');
}
this.queryManager.pollingTimers[this.queryId] = setInterval(() => {
const pollingOptions = assign({}, this.options) as WatchQueryOptions;
// subsequent fetches from polling always reqeust new data
pollingOptions.forceFetch = true;
this.queryManager.fetchQuery(this.queryId, pollingOptions);
}, pollInterval);

if (isPollingQuery) {
this.scheduler.stopPollingQuery(this.queryId);
}
options.pollInterval = pollInterval;
this.scheduler.startPollingQuery(this.options, this.queryId, false);
};
}

Expand Down
32 changes: 8 additions & 24 deletions src/QueryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
} from './networkInterface';

import forOwn = require('lodash.forown');
import assign = require('lodash.assign');
import isEqual = require('lodash.isequal');

import {
Expand Down Expand Up @@ -88,6 +87,8 @@ export type QueryListener = (queryStoreValue: QueryStoreValue) => void;

export class QueryManager {
public pollingTimers: {[queryId: string]: NodeJS.Timer | any}; //oddity in Typescript
public scheduler: QueryScheduler;

private networkInterface: NetworkInterface;
private store: ApolloStore;
private reduxRootKey: string;
Expand All @@ -99,7 +100,6 @@ export class QueryManager {

private idCounter = 0;

private scheduler: QueryScheduler;
private batcher: QueryBatcher;
private batchInterval: number;

Expand Down Expand Up @@ -336,7 +336,7 @@ export class QueryManager {
getQueryDefinition(options.query);

let observableQuery = new ObservableQuery({
queryManager: this,
scheduler: this.scheduler,
options: options,
shouldSubscribe: shouldSubscribe,
});
Expand Down Expand Up @@ -468,18 +468,11 @@ export class QueryManager {

public startQuery(queryId: string, options: WatchQueryOptions, listener: QueryListener) {
this.queryListeners[queryId] = listener;
this.fetchQuery(queryId, options);

if (options.pollInterval) {
if (options.noFetch) {
throw new Error('noFetch option should not use query polling.');
}
this.pollingTimers[queryId] = setInterval(() => {
const pollingOptions = assign({}, options) as WatchQueryOptions;
// subsequent fetches from polling always reqeust new data
pollingOptions.forceFetch = true;
this.fetchQuery(queryId, pollingOptions);
}, options.pollInterval);
// If the pollInterval is present, the scheduler has already taken care of firing the first
// fetch so we don't have to worry about it here.
if (!options.pollInterval) {
this.fetchQuery(queryId, options);
}

return queryId;
Expand All @@ -489,16 +482,7 @@ export class QueryManager {
// XXX in the future if we should cancel the request
// so that it never tries to return data
delete this.queryListeners[queryId];

// if we have a polling interval running, stop it
if (this.pollingTimers[queryId]) {
clearInterval(this.pollingTimers[queryId]);
}

this.store.dispatch({
type: 'APOLLO_QUERY_STOP',
queryId,
});
this.stopQueryInStore(queryId);
}

private collectResultBehaviorsFromUpdateQueries(
Expand Down
121 changes: 88 additions & 33 deletions src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,20 @@ export class QueryScheduler {
// Map going from queryIds to query options that are in flight.
public inFlightQueries: { [queryId: string]: WatchQueryOptions };

// Map going from query ids to the query options associated with those queries. Contains all of
// the queries, both in flight and not in flight.
public registeredQueries: { [queryId: string]: WatchQueryOptions };

// Map going from polling interval with to the query ids that fire on that interval.
// These query ids are associated with a set of options in the this.registeredQueries.
public intervalQueries: { [interval: number]: string[] };

// We use this instance to actually fire queries (i.e. send them to the batching
// mechanism).
private queryManager: QueryManager;
public queryManager: QueryManager;

// Map going from queryIds to polling timers.
private pollingTimers: { [queryId: string]: NodeJS.Timer | any }; // oddity in Typescript
// Map going from polling interval widths to polling timers.
private pollingTimers: { [interval: number]: NodeJS.Timer | any }; // oddity in Typescript

constructor({
queryManager,
Expand All @@ -38,6 +46,8 @@ export class QueryScheduler {
this.queryManager = queryManager;
this.pollingTimers = {};
this.inFlightQueries = {};
this.registeredQueries = {};
this.intervalQueries = {};
}

public checkInFlight(queryId: string) {
Expand All @@ -57,51 +67,96 @@ export class QueryScheduler {
});
}

public startPollingQuery(options: WatchQueryOptions, listener: QueryListener,
queryId?: string): string {
if (!queryId) {
queryId = this.queryManager.generateQueryId();
// The firstFetch option is used to denote whether we want to fire off a
// "first fetch" before we start polling. If startPollingQuery() is being called
// from an existing ObservableQuery, the first fetch has already been fired which
// means that firstFetch should be false.
public startPollingQuery(
options: WatchQueryOptions,
queryId?: string,
firstFetch: boolean = true,
listener?: QueryListener
): string {
if (!options.pollInterval) {
throw new Error('Attempted to start a polling query without a polling interval.');
}
// Fire an initial fetch before we start the polling query
this.fetchQuery(queryId, options);
this.queryManager.addQueryListener(queryId, listener);

this.pollingTimers[queryId] = setInterval(() => {
const pollingOptions = assign({}, options) as WatchQueryOptions;
pollingOptions.forceFetch = true;
this.registeredQueries[queryId] = options;

// We only fire the query if another instance of this same polling query isn't
// already in flight. See top of this file for the reasoning as to why we do this.
if (!this.checkInFlight(queryId)) {
this.fetchQuery(queryId, pollingOptions);
}
}, options.pollInterval);
// Fire an initial fetch before we start the polling query
if (firstFetch) {
this.fetchQuery(queryId, options);
}

if (listener) {
this.queryManager.addQueryListener(queryId, listener);
}
this.addQueryOnInterval(queryId, options);

return queryId;
}

public stopPollingQuery(queryId: string) {
// TODO should cancel in flight request so that there is no
// further data returned.
this.queryManager.removeQueryListener(queryId);
// Remove the query options from one of the registered queries.
// The polling function will then take care of not firing it anymore.
delete this.registeredQueries[queryId];
}

if (this.pollingTimers[queryId]) {
clearInterval(this.pollingTimers[queryId]);
}
// Fires the all of the queries on a particular interval. Called on a setInterval.
public fetchQueriesOnInterval(interval: number) {
this.intervalQueries[interval] = this.intervalQueries[interval].filter((queryId) => {
// If queryOptions can't be found from registeredQueries, it means that this queryId
// is no longer registered and should be removed from the list of queries firing on this
// interval.
if (!this.registeredQueries.hasOwnProperty(queryId)) {
return false;
}

// Don't fire this instance of the polling query is one of the instances is already in
// flight.
if (this.checkInFlight(queryId)) {
return true;
}

// Fire a APOLLO_STOP_QUERY state change to the underlying store.
this.queryManager.stopQueryInStore(queryId);
const queryOptions = this.registeredQueries[queryId];
const pollingOptions = assign({}, queryOptions) as WatchQueryOptions;
pollingOptions.forceFetch = true;
this.fetchQuery(queryId, pollingOptions);
return true;
});

if (this.intervalQueries[interval].length === 0) {
clearInterval(this.pollingTimers[interval]);
}
}

// Tell the QueryScheduler to schedule the queries fired by a polling query.
public registerPollingQuery(options: WatchQueryOptions): ObservableQuery {
if (!options.pollInterval) {
throw new Error('Tried to register a non-polling query with the scheduler.');
// Adds a query on a particular interval to this.intervalQueries and then fires
// that query with all the other queries executing on that interval. Note that the query id
// and query options must have been added to this.registeredQueries before this function is called.
public addQueryOnInterval(queryId: string, queryOptions: WatchQueryOptions) {
const interval = queryOptions.pollInterval;

// If there are other queries on this interval, this query will just fire with those
// and we don't need to create a new timer.
if (this.intervalQueries.hasOwnProperty(interval.toString())) {
this.intervalQueries[interval].push(queryId);
} else {
this.intervalQueries[interval] = [queryId];
// set up the timer for the function that will handle this interval
this.pollingTimers[interval] = setInterval(() => {
this.fetchQueriesOnInterval(interval);
}, interval);
}
}

// Used only for unit testing.
public registerPollingQuery(queryOptions: WatchQueryOptions): ObservableQuery {
if (!queryOptions.pollInterval) {
throw new Error('Attempted to register a non-polling query with the scheduler.');
}
return new ObservableQuery({
queryManager: this.queryManager,
options: options,
scheduler: this,
options: queryOptions,
});
}

Expand Down
Loading

0 comments on commit 5ab05a7

Please sign in to comment.