diff --git a/src/async-helper.ts b/src/async-helper.ts new file mode 100644 index 000000000..5ffe87284 --- /dev/null +++ b/src/async-helper.ts @@ -0,0 +1,103 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {Message} from './subscriber'; + +/** + * Represents an async function that can process a message and return + * a Promise for the function's completion. + */ +export interface AsyncMessageHandler { + (message: Message): Promise; +} + +/** + * A handler for sub.on('message', x) that can be passed to .on() to do + * the async processing in this class. + */ +export interface StreamMessageHandler { + (message: Message): void; +} + +/** + * When executing an async function, the Node runtime is really getting + * a Promise; these are guaranteed not to complete until another cycle of + * the event loop (at least the micro-loop). This can be problematic for + * ordered queue receipts, since the library doesn't take special pains to + * deliver the messages one at a time to async functions (they all push + * through in one go, which just results in a bunch of outstanding Promises). + * + * This helper acts as a funnel for the subscriber so that it can do the + * normal "push all the messages" as before, but each message will be + * allowed to process fully before the next message is delivered to user code. + * + * This should not be used for non-async handlers. It's not being built + * into the library itself, because it's difficult for us to second-guess + * what users might want in a given situation. This lets you decide explicitly. + * (Also, event handlers for on() are not something we have direct access + * to, so guessing whether it's a Promise and waiting on it would be difficult.) + * + * @example + * ``` + * const {PubSub, AsyncHelper} = require('@google-cloud/pubsub'); + * const pubsub = new PubSub(); + * + * const sub = pubsub.subscription('my-sub'); + * const helper = new AsyncHelper(async (m) => console.log(m)); + * sub.on('message', helper.handler); + * ``` + */ +export class AsyncHelper { + // The queue of messages we need to process in order. + queue: Message[] = []; + + // The "tail" Promise, i.e. the previous processing step (or resolve()). + tailPromise: Promise = Promise.resolve(); + + // The user's handler that will be called to take a message and get back a Promise. + userHandler: AsyncMessageHandler; + + /** + * @param userHandler The async function we'll call for each message. + */ + constructor(userHandler: AsyncMessageHandler) { + this.userHandler = userHandler; + } + + /** + * A handler function that you can pass to .on('message'). + */ + get handler(): StreamMessageHandler { + return this.streamHandler.bind(this); + } + + // Pushes new messages on the queue and starts (or chains) a + // processing step. + private streamHandler(message: Message): void { + this.queue.push(message); + + // This should be either Promise.resolve() (instant callback) + // or the previous work item the user's function returned. + this.tailPromise.then(() => { + const message = this.queue.shift(); + if (!message) { + // No message -> go back to resolve() to signal ready. + this.tailPromise = Promise.resolve(); + } else { + // Message -> chain to the previous tail and replace it. + this.tailPromise = this.userHandler(message); + } + }); + } +} diff --git a/src/index.ts b/src/index.ts index 15b5a1d32..e46f400f5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -175,6 +175,11 @@ export { } from './topic'; export {Duration, TotalOfUnit, DurationLike} from './temporal'; export {DebugMessage} from './debug'; +export { + AsyncHelper, + AsyncMessageHandler, + StreamMessageHandler, +} from './async-helper'; if (process.env.DEBUG_GRPC) { console.info('gRPC logging set to verbose'); diff --git a/test/async-helper.ts b/test/async-helper.ts new file mode 100644 index 000000000..49ceabe2e --- /dev/null +++ b/test/async-helper.ts @@ -0,0 +1,62 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {describe, it} from 'mocha'; +import * as assert from 'assert'; +import * as ah from '../src/async-helper'; +import {Message} from '../src/subscriber'; + +class FakeMessage { + constructor(public id: string) {} +} + +function fakeMessage(id: string) { + return new FakeMessage(id) as unknown as Message; +} + +describe('async-helper', () => { + it('processes new messages', async () => { + const helper = new ah.AsyncHelper(async (m: Message) => { + assert.strictEqual(m.id, '1'); + }); + const handler = helper.handler; + const msg = fakeMessage('1'); + handler(msg); + }); + + it('processes multiple messages in order', async () => { + const items = ['1', '2', '3']; + const helper = new ah.AsyncHelper(async (m: Message) => { + assert.strictEqual(m.id, items.shift()); + }); + const handler = helper.handler; + handler(fakeMessage('1')); + handler(fakeMessage('2')); + handler(fakeMessage('3')); + }); + + it('processes unevenly timed messages in order', async () => { + const items = ['1', '2', '3']; + const helper = new ah.AsyncHelper(async (m: Message) => { + if (m.id === '2') { + await new Promise(r => setTimeout(r, 100)); + } + assert.strictEqual(m.id, items.shift()); + }); + const handler = helper.handler; + handler(fakeMessage('1')); + handler(fakeMessage('2')); + handler(fakeMessage('3')); + }); +});