From f46bc931c7a563d14fdb83b48f1301d962032ae2 Mon Sep 17 00:00:00 2001 From: dillonstreator Date: Tue, 23 Apr 2024 09:18:56 -0500 Subject: [PATCH] 0.0.20 renames for conciseness and clarity --- README.md | 7 ++++--- package.json | 2 +- src/mongodb/client.ts | 4 ++-- src/pg/client.test.ts | 12 ++++++------ src/pg/client.ts | 6 +++--- src/processor.test.ts | 38 +++++++++++++++++++------------------- src/processor.ts | 20 ++++++++++---------- 7 files changed, 45 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 5dbf16e..bd6b5f1 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ - + @@ -107,11 +107,12 @@ const server = http.createServer(async (req, res) => { await client.query("BEGIN"); const userId = randomUUID(); - // save/create user with userId + // save user with userId + await client.query(`INSERT INTO users (id, email) VALUES ($1, $2)`, [userId, req.body.email]); // save event to `events` table await client.query( - `INSERT INTO events (id, type, data, correlation_id) VALUES ( $1, $2, $3, $4 )`, + `INSERT INTO events (id, type, data, correlation_id) VALUES ($1, $2, $3, $4)`, [ randomUUID(), eventTypes.UserInvited, diff --git a/package.json b/package.json index 354743f..cfd3e80 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "type": "git", "url": "git://github.com/dillonstreator/txob.git" }, - "version": "0.0.19", + "version": "0.0.20", "license": "MIT", "files": [ "dist", diff --git a/src/mongodb/client.ts b/src/mongodb/client.ts index 4e9cfe4..b936ff9 100644 --- a/src/mongodb/client.ts +++ b/src/mongodb/client.ts @@ -13,7 +13,7 @@ export const createProcessorClient = ( db: string, collection: string = "events", ): TxOBProcessorClient => ({ - findReadyToProcessEvents: async (opts) => { + getEventsToProcess: async (opts) => { const events = (await mongo .db(db) .collection(collection) @@ -26,7 +26,7 @@ export const createProcessorClient = ( transaction: async (fn) => { await mongo.withSession(async (session): Promise => { await fn({ - findReadyToProcessEventByIdForUpdateSkipLocked: async ( + getEventByIdForUpdateSkipLocked: async ( eventId, opts, ) => { diff --git a/src/pg/client.test.ts b/src/pg/client.test.ts index 7012397..91c934c 100644 --- a/src/pg/client.test.ts +++ b/src/pg/client.test.ts @@ -7,12 +7,12 @@ describe("createProcessorClient", () => { query: vi.fn(), }; const client = createProcessorClient(pgClient); - expect(typeof client.findReadyToProcessEvents).toBe("function"); + expect(typeof client.getEventsToProcess).toBe("function"); expect(typeof client.transaction).toBe("function"); }); }); -describe("findReadyToProcessEvents", () => { +describe("getEventsToProcess", () => { it("should execute the correct query", async () => { const rows = [1, 2, 3]; const pgClient = { @@ -26,7 +26,7 @@ describe("findReadyToProcessEvents", () => { maxErrors: 10, }; const client = createProcessorClient(pgClient); - const result = await client.findReadyToProcessEvents(opts); + const result = await client.getEventsToProcess(opts); expect(pgClient.query).toHaveBeenCalledOnce(); expect(pgClient.query).toHaveBeenCalledWith( "SELECT id, errors FROM events WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1", @@ -62,7 +62,7 @@ describe("transaction", () => { expect(pgClient.query).toHaveBeenNthCalledWith(2, "ROLLBACK"); }); - describe("findReadyToProcessEventByIdForUpdateSkipLocked", () => { + describe("getEventByIdForUpdateSkipLocked", () => { it("should execute the correct query", async () => { const rows = [1, 2, 3]; const pgClient = { @@ -77,7 +77,7 @@ describe("transaction", () => { const client = createProcessorClient(pgClient); let result: any; await client.transaction(async (txClient) => { - result = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 }); + result = await txClient.getEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 }); }); expect(pgClient.query).toHaveBeenCalledTimes(3); @@ -102,7 +102,7 @@ describe("transaction", () => { const client = createProcessorClient(pgClient); let result: any; await client.transaction(async (txClient) => { - result = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 }); + result = await txClient.getEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 }); }); expect(pgClient.query).toHaveBeenCalledTimes(3); diff --git a/src/pg/client.ts b/src/pg/client.ts index 41c5792..5e0557a 100644 --- a/src/pg/client.ts +++ b/src/pg/client.ts @@ -5,14 +5,14 @@ interface Querier { query: Client["query"]; } -// TODO: leverage the signal option that comes in on options for `findReadyToProcessEvents` and `findReadyToProcessEventByIdForUpdateSkipLocked` +// TODO: leverage the signal option that comes in on options for `getEventsToProcess` and `getEventByIdForUpdateSkipLocked` // to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774 export const createProcessorClient = ( querier: Querier, table: string = "events", ): TxOBProcessorClient => ({ - findReadyToProcessEvents: async (opts) => { + getEventsToProcess: async (opts) => { const events = await querier.query< Pick, "id" | "errors"> >( @@ -25,7 +25,7 @@ export const createProcessorClient = ( try { await querier.query("BEGIN"); await fn({ - findReadyToProcessEventByIdForUpdateSkipLocked: async (eventId, opts) => { + getEventByIdForUpdateSkipLocked: async (eventId, opts) => { const event = await querier.query>( `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`, [eventId, opts.maxErrors], diff --git a/src/processor.test.ts b/src/processor.test.ts index 039e7e3..3c26f1d 100644 --- a/src/processor.test.ts +++ b/src/processor.test.ts @@ -9,11 +9,11 @@ import { import { sleep } from "./sleep"; const mockTxClient = { - findReadyToProcessEventByIdForUpdateSkipLocked: vi.fn(), + getEventByIdForUpdateSkipLocked: vi.fn(), updateEvent: vi.fn(), }; const mockClient = { - findReadyToProcessEvents: vi.fn(), + getEventsToProcess: vi.fn(), transaction: vi.fn(async (fn) => fn(mockTxClient)), }; @@ -37,12 +37,12 @@ describe("processEvents", () => { backoff: () => now, }; const handlerMap = {}; - mockClient.findReadyToProcessEvents.mockImplementation(() => []); + mockClient.getEventsToProcess.mockImplementation(() => []); processEvents(mockClient, handlerMap, opts); - expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce(); - expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts); + expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce(); + expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts); expect(mockClient.transaction).not.toHaveBeenCalled(); - expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); + expect(mockTxClient.getEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); expect(mockTxClient.updateEvent).not.toHaveBeenCalled(); }); @@ -113,8 +113,8 @@ describe("processEvents", () => { processed_at: now, }; const events = [evt1, evt2, evt3, evt4]; - mockClient.findReadyToProcessEvents.mockImplementation(() => events); - mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => { + mockClient.getEventsToProcess.mockImplementation(() => events); + mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => { if (id === evt3.id) return null; return events.find((e) => e.id === id); @@ -128,8 +128,8 @@ describe("processEvents", () => { await processEvents(mockClient, handlerMap, opts); - expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce(); - expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts); + expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce(); + expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts); expect(mockClient.transaction).toHaveBeenCalledTimes(3); @@ -143,7 +143,7 @@ describe("processEvents", () => { }); expect(handlerMap.evtType1.handler3).not.toHaveBeenCalled(); - expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( + expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( 3, ); @@ -213,8 +213,8 @@ describe("processEvents", () => { errors: 1, }; const events = [evt1]; - mockClient.findReadyToProcessEvents.mockImplementation(() => events); - mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => { + mockClient.getEventsToProcess.mockImplementation(() => events); + mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => { return events.find((e) => e.id === id); }); mockTxClient.updateEvent.mockImplementation(() => { @@ -223,8 +223,8 @@ describe("processEvents", () => { await processEvents(mockClient, handlerMap, opts); - expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce(); - expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts); + expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce(); + expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts); expect(mockClient.transaction).toHaveBeenCalledTimes(1); @@ -232,7 +232,7 @@ describe("processEvents", () => { expect(handlerMap.evtType1.handler1).toHaveBeenCalledWith(evt1, { signal: undefined, }); - expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( + expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( 1, ); @@ -320,15 +320,15 @@ describe("EventProcessor", () => { backoff: () => now, }; const handlerMap = {}; - mockClient.findReadyToProcessEvents.mockImplementation(() => []); + mockClient.getEventsToProcess.mockImplementation(() => []); const processor = EventProcessor(mockClient, handlerMap, opts); processor.start(); await processor.stop(); - expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce(); + expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce(); expect(mockClient.transaction).not.toHaveBeenCalled(); - expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); + expect(mockTxClient.getEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); expect(mockTxClient.updateEvent).not.toHaveBeenCalled(); }); }); diff --git a/src/processor.ts b/src/processor.ts index 4951c04..106bf83 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -42,7 +42,7 @@ type TxOBProcessorClientOpts = { }; export interface TxOBProcessorClient { - findReadyToProcessEvents( + getEventsToProcess( opts: TxOBProcessorClientOpts, ): Promise, "id" | "errors">[]>; transaction( @@ -53,7 +53,7 @@ export interface TxOBProcessorClient { } export interface TxOBTransactionProcessorClient { - findReadyToProcessEventByIdForUpdateSkipLocked( + getEventByIdForUpdateSkipLocked( eventId: TxOBEvent["id"], opts: TxOBProcessorClientOpts, ): Promise | null>; @@ -89,7 +89,7 @@ export const processEvents = async ( ...opts, }; - const events = await client.findReadyToProcessEvents(_opts); + const events = await client.getEventsToProcess(_opts); _opts.logger?.debug(`found ${events.length} events to process`); // TODO: consider concurrently processing events with max concurrency configuration @@ -99,9 +99,9 @@ export const processEvents = async ( } if (unlockedEvent.errors >= _opts.maxErrors) { // Potential issue with client configuration on finding unprocessed events - // Events with maximum allowed errors should not be returned from `findReadyToProcessEvents` + // Events with maximum allowed errors should not be returned from `getEventsToProcess` _opts.logger?.warn( - "unexpected event with max errors returned from `findReadyToProcessEvents`", + "unexpected event with max errors returned from `getEventsToProcess`", { eventId: unlockedEvent.id, errors: unlockedEvent.errors, @@ -113,7 +113,7 @@ export const processEvents = async ( try { await client.transaction(async (txClient) => { - const lockedEvent = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked( + const lockedEvent = await txClient.getEventByIdForUpdateSkipLocked( unlockedEvent.id, { signal: _opts.signal, maxErrors: _opts.maxErrors }, ); @@ -125,8 +125,8 @@ export const processEvents = async ( } // While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time - // that this processor found the event with `findReadyToProcessEvents` and called `findReadyToProcessEventByIdForUpdateSkipLocked` - // `findReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources + // that this processor found the event with `getEventsToProcess` and called `getEventByIdForUpdateSkipLocked` + // `getEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources if (lockedEvent.processed_at) { _opts.logger?.debug("skipping already processed event", { eventId: lockedEvent.id, @@ -235,8 +235,8 @@ export const processEvents = async ( }); // The success of this update is crucial for the processor flow. - // In the event of a failure, any handlers that have successfully executed - // during this invokation will be reinvoked in the subsequent call. + // In the unlikely scenario of a failure to update the event, any handlers that have succeeded + // during this iteration will be reinvoked in the subsequent processor tick. await retryable(() => txClient.updateEvent(lockedEvent), { retries: 3, factor: 2,