diff --git a/gateway-js/CHANGELOG.md b/gateway-js/CHANGELOG.md index 879ffba2c..838ad5df2 100644 --- a/gateway-js/CHANGELOG.md +++ b/gateway-js/CHANGELOG.md @@ -4,6 +4,7 @@ > The changes noted within this `vNEXT` section have not been released yet. New PRs and commits which introduce changes should include an entry in this `vNEXT` section as part of their development. When a release is being prepared, a new header will be (manually) created below and the appropriate changes within that release will be moved into the new section. +- **If you are on v2.18 or v2.19 of Apollo Server, you should upgrade to Apollo Server v2.20 before upgrading to this version**, or your Node process may not shut down properly after stopping your Apollo Server.. Code that calls `ApolloGateway.load` is now expected to call `ApolloGateway.stop`. If you don't do that and you're using managed federation or `experimental_pollInterval`, the background polling interval will now keep a Node process alive rather than allowing it to exit if it's the only remaining event loop handler. Generally, `ApolloServer` is what calls `ApolloGateway.load`, and if you use at least v2.20.0 of Apollo Server, `ApolloServer.stop()` will invoke `ApolloGateway.stop()`. There's a bit of a hack where ApolloGateway does the old behavior if it believes that it is being called by a version of Apollo Server older than v2.18.0. So if you are manually calling `ApolloGateway.load` from your code, make sure to call `ApolloGateway.stop` when you're done, and don't use this version with Apollo Server v2.18 or v2.19. [PR #452](https://github.com/apollographql/federation/pull/452) [apollo-server Issue #4428](https://github.com/apollographql/apollo-server/issues/4428) - Simplify startup code paths. This is technically only intended to be an internal restructure, but it's substantial enough to warrant a changelog entry for observability in case of any unexpected behavioral changes. [PR #440](https://github.com/apollographql/federation/pull/440) ## v0.22.0 diff --git a/gateway-js/src/__tests__/gateway/lifecycle-hooks.test.ts b/gateway-js/src/__tests__/gateway/lifecycle-hooks.test.ts index 881915525..ab5add7f2 100644 --- a/gateway-js/src/__tests__/gateway/lifecycle-hooks.test.ts +++ b/gateway-js/src/__tests__/gateway/lifecycle-hooks.test.ts @@ -64,6 +64,7 @@ describe('lifecycle hooks', () => { expect(experimental_updateServiceDefinitions).toBeCalled(); expect(gateway.schema!.getType('Furniture')).toBeDefined(); + await gateway.stop(); }); it('calls experimental_didFailComposition with a bad config', async () => { @@ -181,6 +182,8 @@ describe('lifecycle hooks', () => { // second call should have previous info in the second arg expect(secondCall[1]!.schema).toBeDefined(); expect(secondCall[1]!.compositionMetadata!.schemaHash).toEqual('hash1'); + + await gateway.stop(); }); it('uses default service definition updater', async () => { @@ -196,6 +199,8 @@ describe('lifecycle hooks', () => { // updater, it has to use the default. If there's a valid schema, then // the loader had to have been called. expect(schema.getType('User')).toBeDefined(); + + await gateway.stop(); }); it('warns when polling on the default fetcher', async () => { @@ -229,11 +234,12 @@ describe('lifecycle hooks', () => { const schemaChangeCallback = jest.fn(() => resolve()); gateway.onSchemaChange(schemaChangeCallback); - gateway.load(); + await gateway.load(); await schemaChangeBlocker; expect(schemaChangeCallback).toBeCalledTimes(1); + await gateway.stop(); }); it('calls experimental_didResolveQueryPlan when executor is called', async () => { @@ -261,5 +267,6 @@ describe('lifecycle hooks', () => { }); expect(experimental_didResolveQueryPlan).toBeCalled(); + await gateway.stop(); }); }); diff --git a/gateway-js/src/__tests__/integration/configuration.test.ts b/gateway-js/src/__tests__/integration/configuration.test.ts index 974ca96e2..016efb1ec 100644 --- a/gateway-js/src/__tests__/integration/configuration.test.ts +++ b/gateway-js/src/__tests__/integration/configuration.test.ts @@ -50,8 +50,15 @@ beforeEach(() => { }); describe('gateway configuration warnings', () => { + let gateway: ApolloGateway | null = null; + afterEach(async () => { + if (gateway) { + await gateway.stop(); + gateway = null; + } + }); it('warns when both csdl and studio configuration are provided', async () => { - const gateway = new ApolloGateway({ + gateway = new ApolloGateway({ csdl: getTestingCsdl(), logger, }); @@ -69,7 +76,7 @@ describe('gateway configuration warnings', () => { it('conflicting configurations are warned about when present', async () => { mockSDLQuerySuccess(service); - const gateway = new ApolloGateway({ + gateway = new ApolloGateway({ serviceList: [{ name: 'accounts', url: service.url }], logger, }); @@ -92,7 +99,7 @@ describe('gateway configuration warnings', () => { mockImplementingServicesSuccess(service); mockRawPartialSchemaSuccess(service); - const gateway = new ApolloGateway({ + gateway = new ApolloGateway({ logger, }); diff --git a/gateway-js/src/__tests__/integration/networkRequests.test.ts b/gateway-js/src/__tests__/integration/networkRequests.test.ts index a0b04500a..472bc5ae2 100644 --- a/gateway-js/src/__tests__/integration/networkRequests.test.ts +++ b/gateway-js/src/__tests__/integration/networkRequests.test.ts @@ -67,6 +67,7 @@ const updatedService: MockService = { let fetcher: typeof fetch; let logger: Logger; +let gateway: ApolloGateway | null = null; beforeEach(() => { if (!nock.isActive()) nock.activate(); @@ -92,16 +93,20 @@ beforeEach(() => { }; }); -afterEach(() => { +afterEach(async () => { expect(nock.isDone()).toBeTruthy(); nock.cleanAll(); nock.restore(); + if (gateway) { + await gateway.stop(); + gateway = null; + } }); it('Queries remote endpoints for their SDLs', async () => { mockSDLQuerySuccess(service); - const gateway = new ApolloGateway({ + gateway = new ApolloGateway({ serviceList: [{ name: 'accounts', url: service.url }], logger, }); @@ -116,7 +121,7 @@ it('Extracts service definitions from remote storage', async () => { mockImplementingServicesSuccess(service); mockRawPartialSchemaSuccess(service); - const gateway = new ApolloGateway({ logger }); + gateway = new ApolloGateway({ logger }); await gateway.load({ apollo: { keyHash: apiKeyHash, graphId, graphVariant: 'current' }, @@ -163,7 +168,7 @@ it.skip('Rollsback to a previous schema when triggered', async () => { .mockImplementationOnce(() => secondResolve()) .mockImplementationOnce(() => thirdResolve()); - const gateway = new ApolloGateway({ logger }); + gateway = new ApolloGateway({ logger }); // @ts-ignore for testing purposes, a short pollInterval is ideal so we'll override here gateway.experimental_pollInterval = 100; @@ -204,7 +209,7 @@ it(`Retries GCS (up to ${GCS_RETRY_COUNT} times) on failure for each request and failNTimes(GCS_RETRY_COUNT, () => mockRawPartialSchema(service)); mockRawPartialSchemaSuccess(service); - const gateway = new ApolloGateway({ fetcher, logger }); + gateway = new ApolloGateway({ fetcher, logger }); await gateway.load({ apollo: { keyHash: apiKeyHash, graphId, graphVariant: 'current' }, @@ -255,7 +260,7 @@ describe('Downstream service health checks', () => { mockSDLQuerySuccess(service); mockServiceHealthCheckSuccess(service); - const gateway = new ApolloGateway({ + gateway = new ApolloGateway({ logger, serviceList: [{ name: 'accounts', url: service.url }], serviceHealthCheck: true, @@ -293,7 +298,7 @@ describe('Downstream service health checks', () => { mockServiceHealthCheckSuccess(service); - const gateway = new ApolloGateway({ serviceHealthCheck: true, logger }); + gateway = new ApolloGateway({ serviceHealthCheck: true, logger }); await gateway.load({ apollo: { keyHash: apiKeyHash, graphId, graphVariant: 'current' }, @@ -352,7 +357,7 @@ describe('Downstream service health checks', () => { .mockImplementationOnce(() => resolve1()) .mockImplementationOnce(() => resolve2()); - const gateway = new ApolloGateway({ + gateway = new ApolloGateway({ serviceHealthCheck: true, logger, }); @@ -396,7 +401,7 @@ describe('Downstream service health checks', () => { let resolve: () => void; const schemaChangeBlocker = new Promise((res) => (resolve = res)); - const gateway = new ApolloGateway({ serviceHealthCheck: true, logger }); + gateway = new ApolloGateway({ serviceHealthCheck: true, logger }); // @ts-ignore for testing purposes, a short pollInterval is ideal so we'll override here gateway.experimental_pollInterval = 100; diff --git a/gateway-js/src/index.ts b/gateway-js/src/index.ts index 95ea03388..4595c07b0 100644 --- a/gateway-js/src/index.ts +++ b/gateway-js/src/index.ts @@ -42,7 +42,12 @@ import { CompositionMetadata, } from './loadServicesFromStorage'; -import { serializeQueryPlan, QueryPlan, OperationContext, WasmPointer } from './QueryPlan'; +import { + serializeQueryPlan, + QueryPlan, + OperationContext, + WasmPointer, +} from './QueryPlan'; import { GraphQLDataSource } from './datasources/types'; import { RemoteGraphQLDataSource } from './datasources/RemoteGraphQLDataSource'; import { getVariableValues } from 'graphql/execution/values'; @@ -112,6 +117,17 @@ export const HEALTH_CHECK_QUERY = export const SERVICE_DEFINITION_QUERY = 'query __ApolloGetServiceDefinition__ { _service { sdl } }'; +type GatewayState = + | { phase: 'initialized' } + | { phase: 'loaded' } + | { phase: 'stopping'; stoppingDonePromise: Promise } + | { phase: 'stopped' } + | { + phase: 'waiting to poll'; + pollWaitTimer: NodeJS.Timer; + doneWaiting: () => void; + } + | { phase: 'polling'; pollingDonePromise: Promise }; export class ApolloGateway implements GraphQLService { public schema?: GraphQLSchema; protected serviceMap: DataSourceMap = Object.create(null); @@ -119,7 +135,6 @@ export class ApolloGateway implements GraphQLService { private logger: Logger; protected queryPlanStore: InMemoryLRUCache; private apolloConfig?: ApolloConfig; - private pollingTimer?: NodeJS.Timer; private onSchemaChangeListeners = new Set(); private serviceDefinitions: ServiceDefinition[] = []; private compositionMetadata?: CompositionMetadata; @@ -129,6 +144,8 @@ export class ApolloGateway implements GraphQLService { private parsedCsdl?: DocumentNode; private fetcher: typeof fetch; + private state: GatewayState; + // Observe query plan, service info, and operation info prior to execution. // The information made available here will give insight into the resulting // query plan and the inputs that generated it. @@ -179,6 +196,8 @@ export class ApolloGateway implements GraphQLService { if (isDynamicConfig(this.config)) { this.issueDynamicWarningsIfApplicable(); } + + this.state = { phase: 'initialized' }; } private initLogger() { @@ -243,6 +262,11 @@ export class ApolloGateway implements GraphQLService { apollo?: ApolloConfig; engine?: GraphQLServiceEngineConfig; }) { + if (this.state.phase !== 'initialized') { + throw Error( + 'ApolloGateway.load called in surprising state ${this.state}', + ); + } if (options?.apollo) { this.apolloConfig = options.apollo; } else if (options?.engine) { @@ -254,12 +278,40 @@ export class ApolloGateway implements GraphQLService { }; } + // Before @apollo/gateway v0.23, ApolloGateway didn't expect stop() to be + // called after it started. The only thing that stop() did at that point was + // cancel the poll timer, and so to prevent that timer from keeping an + // otherwise-finished Node process alive, ApolloGateway unconditionally + // called unref() on that timeout. As part of making the ApolloGateway + // lifecycle more predictable and concrete (and to allow for a future where + // there are other reasons to make sure to explicitly stop your gateway), + // v0.23 tries to avoid calling unref(). + // + // Apollo Server v2.20 and newer calls gateway.stop() from its stop() + // method, so as long as you're using v2.20, ApolloGateway won't keep + // running after you stop your server, and your Node process can shut down. + // To make this change a bit less backwards-incompatible, we detect if it + // looks like you're using an older version of Apollo Server; if so, we + // still call unref(). Specifically: Apollo Server has always passed an + // options object to load(), and before v2.18 it did not pass the `apollo` + // key on it. So if we detect that particular pattern, we assume we're with + // pre-v2.18 Apollo Server and we still call unref(). So this will be a + // behavior change only for: + // - non-Apollo-Server uses of ApolloGateway (where you can add your own + // call to gateway.stop()) + // - Apollo Server v2.18 and v2.19 (where you can either do the small + // compatible upgrade or add your own call to gateway.stop()) + // - if you don't call stop() on your ApolloServer (but in that case other + // things like usage reporting will also stop shutdown, so you should fix + // that) + const unrefTimer = !!options && !options.apollo; + this.maybeWarnOnConflictingConfig(); // Handles initial assignment of `this.schema`, `this.queryPlannerPointer` isStaticConfig(this.config) ? this.loadStatic(this.config) - : await this.loadDynamic(); + : await this.loadDynamic(unrefTimer); const mode = isManagedConfig(this.config) ? 'managed' : 'unmanaged'; this.logger.info( @@ -272,7 +324,7 @@ export class ApolloGateway implements GraphQLService { return { schema: this.schema!, - executor: this.executor + executor: this.executor, }; } @@ -288,22 +340,21 @@ export class ApolloGateway implements GraphQLService { this.schema = schema; this.parsedCsdl = parse(composedSdl); this.queryPlannerPointer = getQueryPlanner(composedSdl); + this.state = { phase: 'loaded' }; } // Asynchronously load a dynamically configured schema. `this.updateComposition` // is responsible for updating the class instance's schema and query planner. - private async loadDynamic() { + private async loadDynamic(unrefTimer: boolean) { await this.updateComposition(); + this.state = { phase: 'loaded' }; if (this.shouldBeginPolling()) { - this.pollServices(); + this.pollServices(unrefTimer); } } private shouldBeginPolling() { - return ( - (isManagedConfig(this.config) || this.experimental_pollInterval) && - !this.pollingTimer - ); + return isManagedConfig(this.config) || this.experimental_pollInterval; } protected async updateComposition(): Promise { @@ -473,7 +524,7 @@ export class ApolloGateway implements GraphQLService { } throw Error( "A valid schema couldn't be composed. The following composition errors were found:\n" + - errors.map(e => '\t' + e.message).join('\n'), + errors.map((e) => '\t' + e.message).join('\n'), ); } else { const { composedSdl } = compositionResult; @@ -545,28 +596,81 @@ export class ApolloGateway implements GraphQLService { }; } - private async pollServices() { - if (this.pollingTimer) clearTimeout(this.pollingTimer); + // This function waits an appropriate amount, updates composition, and calls itself + // again. Note that it is an async function whose Promise is not actually awaited; + // it should never throw itself other than due to a bug in its state machine. + private async pollServices(unrefTimer: boolean) { + switch (this.state.phase) { + case 'stopping': + case 'stopped': + return; + case 'initialized': + throw Error('pollServices should not be called before load!'); + case 'polling': + throw Error( + 'pollServices should not be called while in the middle of polling!', + ); + case 'waiting to poll': + throw Error( + 'pollServices should not be called while already waiting to poll!', + ); + case 'loaded': + // This is the normal case. + break; + default: + throw new UnreachableCaseError(this.state); + } - // Sleep for the specified pollInterval before kicking off another round of polling - await new Promise((res) => { - this.pollingTimer = setTimeout( - () => res(), - this.experimental_pollInterval || 10000, - ); - // Prevent the Node.js event loop from remaining active (and preventing, - // e.g. process shutdown) by calling `unref` on the `Timeout`. For more - // information, see https://nodejs.org/api/timers.html#timers_timeout_unref. - this.pollingTimer?.unref(); + // Transition into 'waiting to poll' and set a timer. The timer resolves the + // Promise we're awaiting here; note that calling stop() also can resolve + // that Promise. + await new Promise((doneWaiting) => { + this.state = { + phase: 'waiting to poll', + doneWaiting, + pollWaitTimer: setTimeout(() => { + // Note that we might be in 'stopped', in which case we just do + // nothing. + if (this.state.phase == 'waiting to poll') { + this.state.doneWaiting(); + } + }, this.experimental_pollInterval || 10000), + }; + if (unrefTimer) { + this.state.pollWaitTimer.unref(); + } }); + // If we've been stopped, then we're done. The cast here is because TS + // doesn't understand that this.state can change during the await + // (https://github.com/microsoft/TypeScript/issues/9998). + if ((this.state as GatewayState).phase !== 'waiting to poll') { + return; + } + + let pollingDone: () => void; + this.state = { + phase: 'polling', + pollingDonePromise: new Promise((res) => { + pollingDone = res; + }), + }; + try { await this.updateComposition(); } catch (err) { this.logger.error((err && err.message) || err); } - this.pollServices(); + if (this.state.phase === 'polling') { + // If we weren't stopped, we should transition back to the initial 'loading' state and trigger + // another call to itself. (Do that in a setImmediate to avoid unbounded stack sizes.) + this.state = { phase: 'loaded' }; + setImmediate(() => this.pollServices(unrefTimer)); + } + + // Whether we were stopped or not, let any concurrent stop() call finish. + pollingDone!(); } private createAndCacheDataSource( @@ -632,9 +736,9 @@ export class ApolloGateway implements GraphQLService { if (!canUseManagedConfig) { throw new Error( 'When a manual configuration is not provided, gateway requires an Apollo ' + - 'configuration. See https://www.apollographql.com/docs/apollo-server/federation/managed-federation/ ' + - 'for more information. Manual configuration options include: ' + - '`serviceList`, `csdl`, and `experimental_updateServiceDefinitions`.', + 'configuration. See https://www.apollographql.com/docs/apollo-server/federation/managed-federation/ ' + + 'for more information. Manual configuration options include: ' + + '`serviceList`, `csdl`, and `experimental_updateServiceDefinitions`.', ); } @@ -813,10 +917,62 @@ export class ApolloGateway implements GraphQLService { return errors || []; } + // Stops all processes involved with the gateway (for now, just background + // schema polling). Can be called multiple times safely. Once it (async) + // returns, all gateway background activity will be finished. public async stop() { - if (this.pollingTimer) { - clearTimeout(this.pollingTimer); - this.pollingTimer = undefined; + switch (this.state.phase) { + case 'initialized': + throw Error( + 'ApolloGateway.stop does not need to be called before ApolloGateway.load', + ); + case 'stopped': + // Calls to stop() are idempotent. + return; + case 'stopping': + await this.state.stoppingDonePromise; + // The cast here is because TS doesn't understand that this.state can + // change during the await + // (https://github.com/microsoft/TypeScript/issues/9998). + if ((this.state as GatewayState).phase !== 'stopped') { + throw Error( + `Expected to be stopped when done stopping, but instead ${this.state.phase}`, + ); + } + return; + case 'loaded': + this.state = { phase: 'stopped' }; // nothing to do (we're not polling) + return; + case 'waiting to poll': { + // If we're waiting to poll, we can synchronously transition to fully stopped. + // We will terminate the current pollServices call and it will succeed quickly. + const doneWaiting = this.state.doneWaiting; + clearTimeout(this.state.pollWaitTimer); + this.state = { phase: 'stopped' }; + doneWaiting(); + return; + } + case 'polling': { + // We're in the middle of running updateComposition. We need to go into 'stopping' + // mode and let this run complete. First we set things up so that any concurrent + // calls to stop() will wait until we let them finish. (Those concurrent calls shouldn't + // just wait on pollingDonePromise themselves because we want to make sure we fully + // transition to state='stopped' before the other call returns.) + const pollingDonePromise = this.state.pollingDonePromise; + let stoppingDone: () => void; + this.state = { + phase: 'stopping', + stoppingDonePromise: new Promise((res) => { + stoppingDone = res; + }), + }; + await pollingDonePromise; + this.state = { phase: 'stopped' }; + stoppingDone!(); + return; + } + default: + throw new UnreachableCaseError(this.state); } } } @@ -829,16 +985,14 @@ function approximateObjectSize(obj: T): number { // planning would be lost. Instead we set a resolver for each field // in order to counteract GraphQLExtensions preventing a defaultFieldResolver // from doing the same job -function wrapSchemaWithAliasResolver( - schema: GraphQLSchema, -): GraphQLSchema { +function wrapSchemaWithAliasResolver(schema: GraphQLSchema): GraphQLSchema { const typeMap = schema.getTypeMap(); - Object.keys(typeMap).forEach(typeName => { + Object.keys(typeMap).forEach((typeName) => { const type = typeMap[typeName]; if (isObjectType(type) && !isIntrospectionType(type)) { const fields = type.getFields(); - Object.keys(fields).forEach(fieldName => { + Object.keys(fields).forEach((fieldName) => { const field = fields[fieldName]; field.resolve = defaultFieldResolverWithAliasSupport; }); @@ -847,6 +1001,15 @@ function wrapSchemaWithAliasResolver( return schema; } +// Throw this in places that should be unreachable (because all other cases have +// been handled, reducing the type of the argument to `never`). TypeScript will +// complain if in fact there is a valid type for the argument. +class UnreachableCaseError extends Error { + constructor(val: never) { + super(`Unreachable case: ${val}`); + } +} + export { buildQueryPlan, executeQueryPlan,