Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/feat(serve): WebSockets for GW/Client comm #7766

Merged
merged 8 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/@graphql-mesh_serve-cli-7766-dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@graphql-mesh/serve-cli": patch
---
dependencies updates:
- Added dependency [`graphql-ws@^5.16.0` ↗︎](https://www.npmjs.com/package/graphql-ws/v/5.16.0) (to `dependencies`)
- Added dependency [`ws@^8.18.0` ↗︎](https://www.npmjs.com/package/ws/v/8.18.0) (to `dependencies`)
7 changes: 7 additions & 0 deletions .changeset/lazy-spies-watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@graphql-mesh/transport-http-callback': minor
'@graphql-mesh/serve-runtime': minor
'@graphql-mesh/serve-cli': minor
---

Fix/Implement WebSockets support for the Gateway and Client communication
Original file line number Diff line number Diff line change
@@ -1,55 +1,82 @@
import { setTimeout } from 'timers/promises';
import { createClient, type Client } from 'graphql-sse';
import {
createClient as createSSEClient,
type Client as SSEClient,
type ClientOptions as SSEClientOptions,
} from 'graphql-sse';
import {
createClient as createWSClient,
type Client as WSClient,
type ClientOptions as WSClientOptions,
} from 'graphql-ws';
import webSocketImpl from 'ws';
import { getLocalHostName } from '@e2e/opts';
import { createTenv, getAvailablePort } from '@e2e/tenv';
import { fetch } from '@whatwg-node/fetch';
import { TOKEN } from './services/products/server';

const { composeWithApollo, service, serve } = createTenv(__dirname);

let client: Client | null = null;
let client: WSClient | SSEClient | null = null;

afterEach(() => client?.dispose());

it('should subscribe and resolve via websockets', async () => {
const supergraphFile = await composeWithApollo([
await service('products'),
await service('reviews'),
]);
const { port } = await serve({ supergraph: supergraphFile });

client = createClient({
url: `http://localhost:${port}/graphql`,
retryAttempts: 0,
headers: {
Authorization: TOKEN,
},
fetchFn: fetch,
});
const sub = client.iterate({
query: /* GraphQL */ `
subscription OnProductPriceChanged {
productPriceChanged {
# Defined in Products subgraph
name
price
reviews {
# Defined in Reviews subgraph
score
const subscriptionsClientFactories = [
['SSE', createSSEClient],
['WS', createWSClient],
] as [
string,
(opts: Partial<SSEClientOptions> & Partial<WSClientOptions>) => SSEClient | WSClient,
][];

subscriptionsClientFactories.forEach(([protocol, createClient]) => {
if (protocol === 'WS' && process.version.startsWith('v18')) {
return;
}
describe(`with ${protocol}`, () => {
const headers = {
authorization: TOKEN,
};
it('should subscribe and resolve via websockets', async () => {
const supergraphFile = await composeWithApollo([
await service('products'),
await service('reviews'),
]);
const { port } = await serve({ supergraph: supergraphFile });

client = createClient({
url: `http://localhost:${port}/graphql`,
retryAttempts: 0,
headers,
connectionParams: headers,
fetchFn: fetch,
webSocketImpl,
});
const sub = client.iterate({
query: /* GraphQL */ `
subscription OnProductPriceChanged {
productPriceChanged {
# Defined in Products subgraph
name
price
reviews {
# Defined in Reviews subgraph
score
}
}
}
`,
});

const msgs = [];
for await (const msg of sub) {
msgs.push(msg);
if (msgs.length >= 3) {
break;
}
}
`,
});

const msgs = [];
for await (const msg of sub) {
msgs.push(msg);
if (msgs.length >= 3) {
break;
}
}

expect(msgs).toMatchInlineSnapshot(`
expect(msgs).toMatchInlineSnapshot(`
[
{
"data": {
Expand Down Expand Up @@ -95,111 +122,111 @@ it('should subscribe and resolve via websockets', async () => {
},
]
`);
});
});

it('should recycle websocket connections', async () => {
const supergraphFile = await composeWithApollo([
await service('products'),
await service('reviews'),
]);
const { port } = await serve({ supergraph: supergraphFile });

client = createClient({
url: `http://localhost:${port}/graphql`,
retryAttempts: 0,
headers: {
Authorization: TOKEN,
},
fetchFn: fetch,
});
it('should recycle websocket connections', async () => {
const supergraphFile = await composeWithApollo([
await service('products'),
await service('reviews'),
]);
const { port } = await serve({ supergraph: supergraphFile });

client = createClient({
url: `http://localhost:${port}/graphql`,
retryAttempts: 0,
headers,
connectionParams: headers,
fetchFn: fetch,
webSocketImpl,
});

const query = /* GraphQL */ `
subscription OnProductPriceChanged {
productPriceChanged {
price
}
}
`;
for (let i = 0; i < 5; i++) {
// connect
for await (const msg of client.iterate({ query })) {
expect(msg).toMatchObject({
data: expect.any(Object),
});
break; // complete subscription on first received message
}
// disconnect

const query = /* GraphQL */ `
subscription OnProductPriceChanged {
productPriceChanged {
price
await setTimeout(300); // wait a bit and subscribe again (lazyCloseTimeout is 3 seconds)
}
}
`;
for (let i = 0; i < 5; i++) {
// connect
for await (const msg of client.iterate({ query })) {
expect(msg).toMatchObject({
data: expect.any(Object),
});
break; // complete subscription on first received message
}
// disconnect

await setTimeout(300); // wait a bit and subscribe again (lazyCloseTimeout is 3 seconds)
}
// the "products" service will crash if multiple websockets were connected breaking the loop above with an error
});

// the "products" service will crash if multiple websockets were connected breaking the loop above with an error
});
it('should subscribe and resolve via http callbacks', async () => {
const supergraphFile = await composeWithApollo([
await service('products'),
await service('reviews'),
]);

// Get a random available port
const availablePort = await getAvailablePort();

const publicUrl = `http://${getLocalHostName()}:${availablePort}`;
await serve({
supergraph: supergraphFile,
port: availablePort,
env: {
PUBLIC_URL: publicUrl,
},
});
client = createClient({
url: `${publicUrl}/graphql`,
retryAttempts: 0,
fetchFn: fetch,
webSocketImpl,
});
const sub = client.iterate({
query: /* GraphQL */ `
subscription CountDown {
countdown(from: 4)
}
`,
});

it('should subscribe and resolve via http callbacks', async () => {
if (process.version.startsWith('v18')) {
return;
}
const supergraphFile = await composeWithApollo([
await service('products'),
await service('reviews'),
]);

// Get a random available port
const availablePort = await getAvailablePort();

const publicUrl = `http://localhost:${availablePort}`;
await serve({
supergraph: supergraphFile,
port: availablePort,
env: {
PUBLIC_URL: publicUrl,
},
});
client = createClient({
url: `${publicUrl}/graphql`,
retryAttempts: 0,
fetchFn: fetch,
});
const sub = client.iterate({
query: /* GraphQL */ `
subscription CountDown {
countdown(from: 4)
const msgs = [];
for await (const msg of sub) {
expect(msg).toMatchObject({
data: expect.any(Object),
});
msgs.push(msg);
if (msgs.length >= 4) {
break;
}
}
`,
});

const msgs = [];
for await (const msg of sub) {
expect(msg).toMatchObject({
data: expect.any(Object),
expect(msgs).toMatchObject([
{
data: {
countdown: 4,
},
},
{
data: {
countdown: 3,
},
},
{
data: {
countdown: 2,
},
},
{
data: {
countdown: 1,
},
},
]);
});
msgs.push(msg);
if (msgs.length >= 4) {
break;
}
}

expect(msgs).toMatchObject([
{
data: {
countdown: 4,
},
},
{
data: {
countdown: 3,
},
},
{
data: {
countdown: 2,
},
},
{
data: {
countdown: 1,
},
},
]);
});
});
1 change: 0 additions & 1 deletion e2e/federation-subscriptions-passthrough/gateway.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {

export const gatewayConfig = defineConfig({
webhooks: true,
maskedErrors: false,
transportEntries: {
products: {
options: {
Expand Down
13 changes: 7 additions & 6 deletions e2e/utils/tenv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -833,14 +833,15 @@ export function getAvailablePort(): Promise<number> {
const server = createServer();
return new Promise((resolve, reject) => {
try {
server.once('error', reject);
server.listen(0, () => {
try {
const addressInfo = server.address() as AddressInfo;
const addressInfo = server.address() as AddressInfo;
server.close(err => {
if (err) {
reject(err);
}
resolve(addressInfo.port);
server.close();
} catch (err) {
reject(err);
}
});
});
} catch (err) {
reject(err);
Expand Down
4 changes: 3 additions & 1 deletion packages/serve-cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@
"@graphql-tools/utils": "^10.5.5",
"commander": "^12.0.0",
"dotenv": "^16.3.1",
"parse-duration": "^1.1.0"
"graphql-ws": "^5.16.0",
"parse-duration": "^1.1.0",
"ws": "^8.18.0"
},
"devDependencies": {
"@parcel/watcher": "^2.3.0",
Expand Down
1 change: 1 addition & 0 deletions packages/serve-cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@
'Apollo API key to use to authenticate with the managed federation up link',
).env('APOLLO_KEY'),
)
.option('--disable-websockets', 'Disable WebSockets support');
.addOption(

Check failure on line 290 in packages/serve-cli/src/cli.ts

View workflow job for this annotation

GitHub Actions / deployment

Declaration or statement expected.

Check failure on line 290 in packages/serve-cli/src/cli.ts

View workflow job for this annotation

GitHub Actions / typecheck

Declaration or statement expected.

Check failure on line 290 in packages/serve-cli/src/cli.ts

View workflow job for this annotation

GitHub Actions / release / snapshot

Declaration or statement expected.

Check failure on line 290 in packages/serve-cli/src/cli.ts

View workflow job for this annotation

GitHub Actions / integration / node 18

Declaration or statement expected.

Check failure on line 290 in packages/serve-cli/src/cli.ts

View workflow job for this annotation

GitHub Actions / apollo-federation-compatibility

Declaration or statement expected.

Check failure on line 290 in packages/serve-cli/src/cli.ts

View workflow job for this annotation

GitHub Actions / integration / node 20

Declaration or statement expected.

Check failure on line 290 in packages/serve-cli/src/cli.ts

View workflow job for this annotation

GitHub Actions / integration / node 22

Declaration or statement expected.
new Option('--jit', 'Enable Just-In-Time compilation of GraphQL documents')
.env('JIT')
.argParser(value => {
Expand Down
Loading
Loading