diff --git a/package-lock.json b/package-lock.json index 925f9c6..a0caecb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,9 +10,11 @@ "license": "MIT", "dependencies": { "@google-cloud/pubsub": "^3.7.1", + "ajv": "^8.12.0", "cloudevents": "< 8", "date-fns": "^2.30.0", - "env-var": "^7.3.1" + "env-var": "^7.3.1", + "json-schema-to-ts": "^2.9.1" }, "devDependencies": { "@jest/globals": "^29.6.0", @@ -882,6 +884,22 @@ "url": "https://opencollective.com/eslint" } }, + "node_modules/@eslint/eslintrc/node_modules/ajv": { + "version": "6.12.6", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", + "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", + "dev": true, + "dependencies": { + "fast-deep-equal": "^3.1.1", + "fast-json-stable-stringify": "^2.0.0", + "json-schema-traverse": "^0.4.1", + "uri-js": "^4.2.2" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/epoberezkin" + } + }, "node_modules/@eslint/eslintrc/node_modules/globals": { "version": "13.20.0", "resolved": "https://registry.npmjs.org/globals/-/globals-13.20.0.tgz", @@ -897,6 +915,12 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/@eslint/eslintrc/node_modules/json-schema-traverse": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", + "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", + "dev": true + }, "node_modules/@eslint/eslintrc/node_modules/type-fest": { "version": "0.20.2", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", @@ -4770,8 +4794,7 @@ "node_modules/@types/json-schema": { "version": "7.0.12", "resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.12.tgz", - "integrity": "sha512-Hr5Jfhc9eYOQNPYO5WLDq/n4jqijdHNlDXjuAQkkt+mWdQR+XJToOHrsD4cPaMXpn6KO7y2+wM8AZEs8VpBLVA==", - "dev": true + "integrity": "sha512-Hr5Jfhc9eYOQNPYO5WLDq/n4jqijdHNlDXjuAQkkt+mWdQR+XJToOHrsD4cPaMXpn6KO7y2+wM8AZEs8VpBLVA==" }, "node_modules/@types/json5": { "version": "0.0.29", @@ -5285,14 +5308,13 @@ } }, "node_modules/ajv": { - "version": "6.12.6", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", - "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", - "dev": true, + "version": "8.12.0", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.12.0.tgz", + "integrity": "sha512-sRu1kpcO9yLtYxBKvqfTeh9KzZEwO3STyX1HT+4CaDzC6HpTGYhIhPIzj9XuKU7KYDwnaeh5hcOwjy1QuJzBPA==", "dependencies": { "fast-deep-equal": "^3.1.1", - "fast-json-stable-stringify": "^2.0.0", - "json-schema-traverse": "^0.4.1", + "json-schema-traverse": "^1.0.0", + "require-from-string": "^2.0.2", "uri-js": "^4.2.2" }, "funding": { @@ -5316,26 +5338,6 @@ } } }, - "node_modules/ajv-formats/node_modules/ajv": { - "version": "8.12.0", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.12.0.tgz", - "integrity": "sha512-sRu1kpcO9yLtYxBKvqfTeh9KzZEwO3STyX1HT+4CaDzC6HpTGYhIhPIzj9XuKU7KYDwnaeh5hcOwjy1QuJzBPA==", - "dependencies": { - "fast-deep-equal": "^3.1.1", - "json-schema-traverse": "^1.0.0", - "require-from-string": "^2.0.2", - "uri-js": "^4.2.2" - }, - "funding": { - "type": "github", - "url": "https://github.com/sponsors/epoberezkin" - } - }, - "node_modules/ajv-formats/node_modules/json-schema-traverse": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", - "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==" - }, "node_modules/align-spaces": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/align-spaces/-/align-spaces-1.0.4.tgz", @@ -6316,26 +6318,6 @@ "node": ">=16 <=20" } }, - "node_modules/cloudevents/node_modules/ajv": { - "version": "8.12.0", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.12.0.tgz", - "integrity": "sha512-sRu1kpcO9yLtYxBKvqfTeh9KzZEwO3STyX1HT+4CaDzC6HpTGYhIhPIzj9XuKU7KYDwnaeh5hcOwjy1QuJzBPA==", - "dependencies": { - "fast-deep-equal": "^3.1.1", - "json-schema-traverse": "^1.0.0", - "require-from-string": "^2.0.2", - "uri-js": "^4.2.2" - }, - "funding": { - "type": "github", - "url": "https://github.com/sponsors/epoberezkin" - } - }, - "node_modules/cloudevents/node_modules/json-schema-traverse": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", - "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==" - }, "node_modules/co": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/co/-/co-4.6.0.tgz", @@ -8304,6 +8286,22 @@ "node": ">=10" } }, + "node_modules/eslint/node_modules/ajv": { + "version": "6.12.6", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", + "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", + "dev": true, + "dependencies": { + "fast-deep-equal": "^3.1.1", + "fast-json-stable-stringify": "^2.0.0", + "json-schema-traverse": "^0.4.1", + "uri-js": "^4.2.2" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/epoberezkin" + } + }, "node_modules/eslint/node_modules/ansi-styles": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz", @@ -8438,6 +8436,12 @@ "node": ">=8" } }, + "node_modules/eslint/node_modules/json-schema-traverse": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", + "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", + "dev": true + }, "node_modules/eslint/node_modules/supports-color": { "version": "7.2.0", "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz", @@ -12101,11 +12105,23 @@ "integrity": "sha512-xyFwyhro/JEof6Ghe2iz2NcXoj2sloNsWr/XsERDK/oiPCfaNhl5ONfp+jQdAZRQQ0IJWNzH9zIZF7li91kh2w==", "dev": true }, + "node_modules/json-schema-to-ts": { + "version": "2.9.1", + "resolved": "https://registry.npmjs.org/json-schema-to-ts/-/json-schema-to-ts-2.9.1.tgz", + "integrity": "sha512-8MNpRGERlCUWYeJwsWkMrJ0MWzBz49dfqpG+n9viiIlP4othaahbiaNQZuBzmPxRLUhOv1QJMCzW5WE8nHFGIQ==", + "dependencies": { + "@babel/runtime": "^7.18.3", + "@types/json-schema": "^7.0.9", + "ts-algebra": "^1.2.0" + }, + "engines": { + "node": ">=16" + } + }, "node_modules/json-schema-traverse": { - "version": "0.4.1", - "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", - "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", - "dev": true + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", + "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==" }, "node_modules/json-stable-stringify-without-jsonify": { "version": "1.0.1", @@ -14531,22 +14547,6 @@ "node": ">=16" } }, - "node_modules/putout/node_modules/ajv": { - "version": "8.12.0", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.12.0.tgz", - "integrity": "sha512-sRu1kpcO9yLtYxBKvqfTeh9KzZEwO3STyX1HT+4CaDzC6HpTGYhIhPIzj9XuKU7KYDwnaeh5hcOwjy1QuJzBPA==", - "dev": true, - "dependencies": { - "fast-deep-equal": "^3.1.1", - "json-schema-traverse": "^1.0.0", - "require-from-string": "^2.0.2", - "uri-js": "^4.2.2" - }, - "funding": { - "type": "github", - "url": "https://github.com/sponsors/epoberezkin" - } - }, "node_modules/putout/node_modules/ansi-styles": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz", @@ -14621,12 +14621,6 @@ "node": ">=8" } }, - "node_modules/putout/node_modules/json-schema-traverse": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", - "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==", - "dev": true - }, "node_modules/putout/node_modules/locate-path": { "version": "7.2.0", "resolved": "https://registry.npmjs.org/locate-path/-/locate-path-7.2.0.tgz", @@ -16385,28 +16379,6 @@ "node": ">=10.0.0" } }, - "node_modules/table/node_modules/ajv": { - "version": "8.12.0", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.12.0.tgz", - "integrity": "sha512-sRu1kpcO9yLtYxBKvqfTeh9KzZEwO3STyX1HT+4CaDzC6HpTGYhIhPIzj9XuKU7KYDwnaeh5hcOwjy1QuJzBPA==", - "dev": true, - "dependencies": { - "fast-deep-equal": "^3.1.1", - "json-schema-traverse": "^1.0.0", - "require-from-string": "^2.0.2", - "uri-js": "^4.2.2" - }, - "funding": { - "type": "github", - "url": "https://github.com/sponsors/epoberezkin" - } - }, - "node_modules/table/node_modules/json-schema-traverse": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", - "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==", - "dev": true - }, "node_modules/tapable": { "version": "2.2.1", "resolved": "https://registry.npmjs.org/tapable/-/tapable-2.2.1.tgz", @@ -16531,6 +16503,11 @@ "node": ">=6" } }, + "node_modules/ts-algebra": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/ts-algebra/-/ts-algebra-1.2.0.tgz", + "integrity": "sha512-kMuJJd8B2N/swCvIvn1hIFcIOrLGbWl9m/J6O3kHx9VRaevh00nvgjPiEGaRee7DRaAczMYR2uwWvXU22VFltw==" + }, "node_modules/ts-jest": { "version": "29.1.1", "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.1.1.tgz", diff --git a/package.json b/package.json index 30775b7..5ee0fd8 100644 --- a/package.json +++ b/package.json @@ -38,9 +38,11 @@ }, "dependencies": { "@google-cloud/pubsub": "^3.7.1", + "ajv": "^8.12.0", "cloudevents": "< 8", "date-fns": "^2.30.0", - "env-var": "^7.3.1" + "env-var": "^7.3.1", + "json-schema-to-ts": "^2.9.1" }, "devDependencies": { "@jest/globals": "^29.6.0", diff --git a/src/lib/googlePubSub.spec.ts b/src/lib/googlePubSub.spec.ts index 80a38c4..bc21b32 100644 --- a/src/lib/googlePubSub.spec.ts +++ b/src/lib/googlePubSub.spec.ts @@ -6,6 +6,7 @@ import envVar from 'env-var'; import { configureMockEnvVars } from '../testUtils/envVars.js'; import { mockSpy } from '../testUtils/jest.js'; import { EVENT } from '../testUtils/stubs.js'; +import { jsonSerialise } from '../testUtils/json.js'; const mockPublishMessage = mockSpy(jest.fn().mockResolvedValue(undefined)); const mockTopic = jest.fn().mockReturnValue({ publishMessage: mockPublishMessage }); @@ -15,7 +16,7 @@ jest.unstable_mockModule('@google-cloud/pubsub', () => ({ topic: mockTopic, }), })); -const { makeGooglePubSubEmitter } = await import('./googlePubSub.js'); +const { makeGooglePubSubEmitter, convertGooglePubSubMessage } = await import('./googlePubSub.js'); const CE_GPUBSUB_TOPIC = 'the topic'; const mockEnvVars = configureMockEnvVars({ CE_GPUBSUB_TOPIC }); @@ -93,12 +94,7 @@ describe('makeGooglePubSubEmitter', () => { expect(mockPublishMessage).toHaveBeenCalledWith( expect.objectContaining({ - attributes: expect.objectContaining({ ceSpecVersion: EVENT.specversion }), - }), - ); - expect(mockPublishMessage).not.toHaveBeenCalledWith( - expect.objectContaining({ - attributes: expect.objectContaining({ specversion: expect.anything() }), + attributes: expect.objectContaining({ specversion: EVENT.specversion }), }), ); }); @@ -110,12 +106,7 @@ describe('makeGooglePubSubEmitter', () => { expect(mockPublishMessage).toHaveBeenCalledWith( expect.objectContaining({ - attributes: expect.objectContaining({ ceDataSchema: event.dataschema }), - }), - ); - expect(mockPublishMessage).not.toHaveBeenCalledWith( - expect.objectContaining({ - attributes: expect.objectContaining({ dataschema: expect.anything() }), + attributes: expect.objectContaining({ dataschema: event.dataschema }), }), ); }); @@ -125,7 +116,7 @@ describe('makeGooglePubSubEmitter', () => { expect(mockPublishMessage).not.toHaveBeenCalledWith( expect.objectContaining({ - attributes: expect.objectContaining({ ceDataSchema: expect.anything() }), + attributes: expect.objectContaining({ dataschema: expect.anything() }), }), ); }); @@ -137,12 +128,7 @@ describe('makeGooglePubSubEmitter', () => { expect(mockPublishMessage).toHaveBeenCalledWith( expect.objectContaining({ - attributes: expect.objectContaining({ ceDataContentType: event.datacontenttype }), - }), - ); - expect(mockPublishMessage).not.toHaveBeenCalledWith( - expect.objectContaining({ - attributes: expect.objectContaining({ datacontenttype: expect.anything() }), + attributes: expect.objectContaining({ datacontenttype: event.datacontenttype }), }), ); }); @@ -175,12 +161,7 @@ describe('makeGooglePubSubEmitter', () => { expect(mockPublishMessage).toHaveBeenCalledWith( expect.objectContaining({ - attributes: expect.objectContaining({ ceSubject: event.subject }), - }), - ); - expect(mockPublishMessage).not.toHaveBeenCalledWith( - expect.objectContaining({ - attributes: expect.objectContaining({ subject: expect.anything() }), + attributes: expect.objectContaining({ subject: event.subject }), }), ); }); @@ -190,7 +171,7 @@ describe('makeGooglePubSubEmitter', () => { expect(mockPublishMessage).not.toHaveBeenCalledWith( expect.objectContaining({ - attributes: expect.objectContaining({ ceSubject: expect.anything() }), + attributes: expect.objectContaining({ subject: expect.anything() }), }), ); }); @@ -200,12 +181,7 @@ describe('makeGooglePubSubEmitter', () => { expect(mockPublishMessage).toHaveBeenCalledWith( expect.objectContaining({ - attributes: expect.objectContaining({ ceSource: EVENT.source }), - }), - ); - expect(mockPublishMessage).not.toHaveBeenCalledWith( - expect.objectContaining({ - attributes: expect.objectContaining({ source: expect.anything() }), + attributes: expect.objectContaining({ source: EVENT.source }), }), ); }); @@ -215,23 +191,19 @@ describe('makeGooglePubSubEmitter', () => { expect(mockPublishMessage).toHaveBeenCalledWith( expect.objectContaining({ - attributes: expect.objectContaining({ ceType: EVENT.type }), - }), - ); - expect(mockPublishMessage).not.toHaveBeenCalledWith( - expect.objectContaining({ - attributes: expect.objectContaining({ type: expect.anything() }), + attributes: expect.objectContaining({ type: EVENT.type }), }), ); }); test('Extension attributes should be stored as custom attributes', async () => { - const event = EVENT.cloneWith({ foo: 'bar' }); + const extensionAttributes = { foo: 'bar' }; + const event = EVENT.cloneWith(extensionAttributes); await makeGooglePubSubEmitter()(event); expect(mockPublishMessage).toHaveBeenCalledWith( - expect.objectContaining({ attributes: expect.objectContaining({ foo: event.foo }) }), + expect.objectContaining({ attributes: expect.objectContaining(extensionAttributes) }), ); }); }); @@ -249,3 +221,219 @@ describe('makeGooglePubSubEmitter', () => { expect(mockTopic).toHaveBeenCalledWith(CE_GPUBSUB_TOPIC); }); }); + +describe('convertGooglePubSubMessage', () => { + const headers = {}; + + const requestBody = { + message: { + data: EVENT.data_base64!, + messageId: EVENT.id, + publishTime: EVENT.time!, + + attributes: { + type: EVENT.type, + source: EVENT.source, + }, + }, + + subscription: 'projects/myproject/subscriptions/mysubscription', + }; + const requestBodySerialised = jsonSerialise(requestBody); + + function copyBodyWithAttribute(attribute: string, value: string | undefined): Buffer { + const body = { + ...requestBody, + + message: { + ...requestBody.message, + attributes: { ...requestBody.message.attributes, [attribute]: value }, + }, + }; + return jsonSerialise(body); + } + + test('Request body should be refused if it is malformed JSON', () => { + expect(() => convertGooglePubSubMessage(headers, Buffer.from('malformed'))).toThrowWithMessage( + Error, + 'Request body is not valid JSON', + ); + }); + + test('Request body should be refused if message field is missing', () => { + const invalidRequestBody = { ...requestBody, message: undefined }; + const invalidRequestBodySerialised = jsonSerialise(invalidRequestBody); + + expect(() => + convertGooglePubSubMessage(headers, invalidRequestBodySerialised), + ).toThrowWithMessage(Error, 'Request body is not a valid PubSub message'); + }); + + describe('Id', () => { + test('Should be taken from messageId', () => { + const event = convertGooglePubSubMessage(headers, requestBodySerialised); + + expect(event.id).toBe(requestBody.message.messageId); + }); + + test('Message should be refused if messageId is missing', () => { + const invalidRequestBody = { + ...requestBody, + message: { ...requestBody.message, messageId: undefined }, + }; + const invalidRequestBodySerialised = jsonSerialise(invalidRequestBody); + + expect(() => + convertGooglePubSubMessage(headers, invalidRequestBodySerialised), + ).toThrowWithMessage(Error, 'Request body is not a valid PubSub message'); + }); + }); + + describe('Time', () => { + test('Should be taken from publishTime', () => { + const event = convertGooglePubSubMessage(headers, requestBodySerialised); + + expect(event.time).toBe(requestBody.message.publishTime); + }); + + test('Message should be refused if publishTime is missing', () => { + const invalidRequestBody = { + ...requestBody, + message: { ...requestBody.message, publishTime: undefined }, + }; + const invalidRequestBodySerialised = jsonSerialise(invalidRequestBody); + + expect(() => + convertGooglePubSubMessage(headers, invalidRequestBodySerialised), + ).toThrowWithMessage(Error, 'Request body is not a valid PubSub message'); + }); + }); + + describe('Data', () => { + test('Should be taken from data', () => { + const event = convertGooglePubSubMessage(headers, requestBodySerialised); + + expect(event.data).toMatchObject(EVENT.data!); + }); + + test('Message should be refused if data is missing', () => { + const invalidRequestBody = { + ...requestBody, + message: { ...requestBody.message, data: undefined }, + }; + const invalidRequestBodySerialised = jsonSerialise(invalidRequestBody); + + expect(() => + convertGooglePubSubMessage(headers, invalidRequestBodySerialised), + ).toThrowWithMessage(Error, 'Request body is not a valid PubSub message'); + }); + }); + + test('Event type should be taken from type attribute', () => { + const event = convertGooglePubSubMessage(headers, requestBodySerialised); + + expect(event.type).toBe(EVENT.type); + }); + + test('Event source should be taken from source attribute', () => { + const event = convertGooglePubSubMessage(headers, requestBodySerialised); + + expect(event.source).toBe(EVENT.source); + }); + + describe('Spec version', () => { + test('Should be taken from specversion attribute', () => { + const specVersion = '0.3'; + const body = copyBodyWithAttribute('specversion', specVersion); + + const event = convertGooglePubSubMessage(headers, body); + + expect(event.specversion).toBe(specVersion); + }); + + test('Should be default to 1.0 if absent', () => { + const body = copyBodyWithAttribute('specversion', undefined); + + const event = convertGooglePubSubMessage(headers, body); + + expect(event.specversion).toBe('1.0'); + }); + }); + + describe('Subject', () => { + test('Should be taken from subject attribute', () => { + const subject = 'my-subject'; + const body = copyBodyWithAttribute('subject', subject); + + const event = convertGooglePubSubMessage(headers, body); + + expect(event.subject).toBe(subject); + }); + + test('Should be undefined if subject attribute is absent', () => { + const body = copyBodyWithAttribute('subject', undefined); + + const event = convertGooglePubSubMessage(headers, body); + + expect(event.subject).toBeUndefined(); + }); + }); + + describe('Data content type', () => { + test('Should be taken from datacontenttype attribute', () => { + const dataContentType = 'application/json'; + const body = copyBodyWithAttribute('datacontenttype', dataContentType); + + const event = convertGooglePubSubMessage(headers, body); + + expect(event.datacontenttype).toBe(dataContentType); + }); + + test('Should be undefined if datacontenttype attribute is absent', () => { + const body = copyBodyWithAttribute('datacontenttype', undefined); + + const event = convertGooglePubSubMessage(headers, body); + + expect(event.datacontenttype).toBeUndefined(); + }); + }); + + describe('Data schema', () => { + test('Should be taken from dataschema attribute', () => { + const dataSchema = 'https://example.com/schema'; + const body = copyBodyWithAttribute('dataschema', dataSchema); + + const event = convertGooglePubSubMessage(headers, body); + + expect(event.dataschema).toBe(dataSchema); + }); + + test('Should be undefined if dataschema attribute is absent', () => { + const body = copyBodyWithAttribute('dataschema', undefined); + + const event = convertGooglePubSubMessage(headers, body); + + expect(event.dataschema).toBeUndefined(); + }); + }); + + test('Extension attributes should be taken from message.attributes', () => { + const body = copyBodyWithAttribute('extension', 'value'); + + const event = convertGooglePubSubMessage(headers, body); + + expect(event.extension).toBe('value'); + }); + + test.each(['source', 'type'])( + 'Message should be refused if attribute %s is missing', + (attributeName) => { + const body = copyBodyWithAttribute(attributeName, undefined); + + expect(() => convertGooglePubSubMessage(headers, body)).toThrowWithMessage( + Error, + 'Request body is not a valid PubSub message', + ); + }, + ); +}); diff --git a/src/lib/googlePubSub.ts b/src/lib/googlePubSub.ts index d16873f..58a1de7 100644 --- a/src/lib/googlePubSub.ts +++ b/src/lib/googlePubSub.ts @@ -1,13 +1,16 @@ -import type { CloudEvent, EmitterFunction } from 'cloudevents'; import { PubSub } from '@google-cloud/pubsub'; import { google } from '@google-cloud/pubsub/build/protos/protos.js'; +import { CloudEvent, type CloudEventV1, type EmitterFunction, type Headers } from 'cloudevents'; import { getUnixTime } from 'date-fns'; import envVar from 'env-var'; +import { compileSchema } from '../utils/ajv.js'; + import IPubsubMessage = google.pubsub.v1.IPubsubMessage; const CLIENT = new PubSub(); +const CE_DATA_ATTRS = ['data', 'data_base64']; const CE_BUILTIN_ATTRS = [ 'specversion', 'type', @@ -17,10 +20,45 @@ const CE_BUILTIN_ATTRS = [ 'time', 'datacontenttype', 'dataschema', - 'data', - 'data_base64', + ...CE_DATA_ATTRS, ]; +const pubSubBody = { + type: 'object', + + properties: { + message: { + type: 'object', + + properties: { + attributes: { + type: 'object', + + properties: { + specversion: { type: 'string' }, + source: { type: 'string' }, + type: { type: 'string' }, + subject: { type: 'string' }, + datacontenttype: { type: 'string' }, + dataschema: { type: 'string' }, + }, + + required: ['source', 'type'], + }, + + data: { type: 'string' }, + messageId: { type: 'string' }, + publishTime: { type: 'string' }, + }, + + required: ['attributes', 'data', 'messageId', 'publishTime'], + }, + }, + + required: ['message'], +} as const; +const isPubSubBody = compileSchema(pubSubBody); + function convertData(event: CloudEvent): Buffer | string { if (event.data instanceof Buffer || typeof event.data === 'string') { return event.data; @@ -44,24 +82,28 @@ function suppressUndefined(obj: { [key: string]: string | undefined }): { [key: .reduce((acc, [key, value]) => ({ ...acc, [key]: value }), {}); } -function convertEventToMessage(event: CloudEvent): IPubsubMessage { - const publishTime = convertEventTimeToPublishTime(event); - const ceAttributes = { - ceSpecVersion: event.specversion, - ceDataContentType: event.datacontenttype, - ceDataSchema: event.dataschema, - ceType: event.type, - ceSubject: event.subject, - ceSource: event.source, - }; - const extensionAttributes = Object.entries(event) +function filterExtensionAttributes(obj: { [key: string]: unknown }) { + return Object.entries(obj) .filter(([key]) => !CE_BUILTIN_ATTRS.includes(key)) .reduce((acc, [key, value]) => ({ ...acc, [key]: value }), {}); +} + +function getMessageAttributesFromEvent(event: CloudEvent) { + const ceAttributes = Object.entries(event) + .filter(([key]) => CE_BUILTIN_ATTRS.includes(key) && !CE_DATA_ATTRS.includes(key)) + .reduce((acc, [key, value]) => ({ ...acc, [key]: value }), {}); + const extensionAttributes = filterExtensionAttributes(event); + return suppressUndefined({ ...ceAttributes, ...extensionAttributes }); +} + +function convertEventToMessage(event: CloudEvent): IPubsubMessage { + const publishTime = convertEventTimeToPublishTime(event); + const attributes = getMessageAttributesFromEvent(event); return { data: convertData(event), messageId: event.id, publishTime, - attributes: suppressUndefined({ ...ceAttributes, ...extensionAttributes }), + attributes, }; } @@ -73,3 +115,32 @@ export function makeGooglePubSubEmitter(): EmitterFunction { await topic.publishMessage(message); }; } + +export function convertGooglePubSubMessage(_headers: Headers, body: Buffer): CloudEventV1 { + const bodyString = body.toString(); + let bodyJson: unknown; + try { + bodyJson = JSON.parse(bodyString); + } catch { + throw new Error('Request body is not valid JSON'); + } + + if (isPubSubBody(bodyJson)) { + const { message } = bodyJson; + const extensionAttributes = filterExtensionAttributes(message.attributes); + return new CloudEvent({ + specversion: message.attributes.specversion, + id: message.messageId, + source: message.attributes.source, + type: message.attributes.type, + subject: message.attributes.subject, + time: message.publishTime, + datacontenttype: message.attributes.datacontenttype, + dataschema: message.attributes.dataschema, + data: Buffer.from(message.data, 'base64'), + ...extensionAttributes, + }); + } + + throw new Error('Request body is not a valid PubSub message'); +} diff --git a/src/lib/receivers.spec.ts b/src/lib/receivers.spec.ts index 8b39e18..919ffe3 100644 --- a/src/lib/receivers.spec.ts +++ b/src/lib/receivers.spec.ts @@ -7,6 +7,15 @@ jest.unstable_mockModule('./ceBinary.js', () => { return { convertCeBinaryMessage: mockCeBinaryConverter }; }); +const mockGooglePubSubConverter = Symbol('mockGooglePubSubConverter'); +let wasGooglePubSubImported = false; +jest.unstable_mockModule('./googlePubSub.ts', () => { + wasGooglePubSubImported = true; + return { + convertGooglePubSubMessage: mockGooglePubSubConverter, + }; +}); + const { makeReceiver } = await import('./receivers.js'); describe('makeReceiver', () => { @@ -15,12 +24,20 @@ describe('makeReceiver', () => { await makeReceiver('ce-http-binary'); expect(wasCeImported).toBeTrue(); + expect(wasGooglePubSubImported).toBeFalse(); + + await makeReceiver('google-pubsub'); + expect(wasGooglePubSubImported).toBeTrue(); }); test('CloudEvents binary receiver should be returned if requested', async () => { await expect(makeReceiver('ce-http-binary')).resolves.toBe(mockCeBinaryConverter); }); + test('Google PubSub receiver should be returned if requested', async () => { + await expect(makeReceiver('google-pubsub')).resolves.toBe(mockGooglePubSubConverter); + }); + test('Unsupported transport should be refused', async () => { await expect(makeReceiver('unsupported')).rejects.toThrowWithMessage( Error, diff --git a/src/lib/receivers.ts b/src/lib/receivers.ts index 5786d46..c95aaa8 100644 --- a/src/lib/receivers.ts +++ b/src/lib/receivers.ts @@ -6,6 +6,9 @@ export async function makeReceiver(transport: string): Promise { if (transport === 'ce-http-binary') { const { convertCeBinaryMessage } = await import('./ceBinary.js'); receiver = convertCeBinaryMessage; + } else if (transport === 'google-pubsub') { + const { convertGooglePubSubMessage } = await import('./googlePubSub.js'); + receiver = convertGooglePubSubMessage; } else { throw new Error(`Unsupported receiver type (${transport})`); } diff --git a/src/testUtils/json.ts b/src/testUtils/json.ts new file mode 100644 index 0000000..c4a78d7 --- /dev/null +++ b/src/testUtils/json.ts @@ -0,0 +1,3 @@ +export function jsonSerialise(obj: any): Buffer { + return Buffer.from(JSON.stringify(obj)); +} diff --git a/src/utils/ajv.ts b/src/utils/ajv.ts new file mode 100644 index 0000000..49d28bb --- /dev/null +++ b/src/utils/ajv.ts @@ -0,0 +1,6 @@ +import Ajv from 'ajv'; +import { type $Compiler, wrapCompilerAsTypeGuard } from 'json-schema-to-ts'; + +const AJV = new Ajv(); +const $compile: $Compiler = (schema) => AJV.compile(schema); +export const compileSchema = wrapCompilerAsTypeGuard($compile);