diff --git a/README.md b/README.md
index 5dbf16e..bd6b5f1 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@
-
+
@@ -107,11 +107,12 @@ const server = http.createServer(async (req, res) => {
await client.query("BEGIN");
const userId = randomUUID();
- // save/create user with userId
+ // save user with userId
+ await client.query(`INSERT INTO users (id, email) VALUES ($1, $2)`, [userId, req.body.email]);
// save event to `events` table
await client.query(
- `INSERT INTO events (id, type, data, correlation_id) VALUES ( $1, $2, $3, $4 )`,
+ `INSERT INTO events (id, type, data, correlation_id) VALUES ($1, $2, $3, $4)`,
[
randomUUID(),
eventTypes.UserInvited,
diff --git a/package.json b/package.json
index 354743f..cfd3e80 100644
--- a/package.json
+++ b/package.json
@@ -15,7 +15,7 @@
"type": "git",
"url": "git://github.com/dillonstreator/txob.git"
},
- "version": "0.0.19",
+ "version": "0.0.20",
"license": "MIT",
"files": [
"dist",
diff --git a/src/mongodb/client.ts b/src/mongodb/client.ts
index 4e9cfe4..b936ff9 100644
--- a/src/mongodb/client.ts
+++ b/src/mongodb/client.ts
@@ -13,7 +13,7 @@ export const createProcessorClient = (
db: string,
collection: string = "events",
): TxOBProcessorClient => ({
- findReadyToProcessEvents: async (opts) => {
+ getEventsToProcess: async (opts) => {
const events = (await mongo
.db(db)
.collection(collection)
@@ -26,7 +26,7 @@ export const createProcessorClient = (
transaction: async (fn) => {
await mongo.withSession(async (session): Promise => {
await fn({
- findReadyToProcessEventByIdForUpdateSkipLocked: async (
+ getEventByIdForUpdateSkipLocked: async (
eventId,
opts,
) => {
diff --git a/src/pg/client.test.ts b/src/pg/client.test.ts
index 7012397..91c934c 100644
--- a/src/pg/client.test.ts
+++ b/src/pg/client.test.ts
@@ -7,12 +7,12 @@ describe("createProcessorClient", () => {
query: vi.fn(),
};
const client = createProcessorClient(pgClient);
- expect(typeof client.findReadyToProcessEvents).toBe("function");
+ expect(typeof client.getEventsToProcess).toBe("function");
expect(typeof client.transaction).toBe("function");
});
});
-describe("findReadyToProcessEvents", () => {
+describe("getEventsToProcess", () => {
it("should execute the correct query", async () => {
const rows = [1, 2, 3];
const pgClient = {
@@ -26,7 +26,7 @@ describe("findReadyToProcessEvents", () => {
maxErrors: 10,
};
const client = createProcessorClient(pgClient);
- const result = await client.findReadyToProcessEvents(opts);
+ const result = await client.getEventsToProcess(opts);
expect(pgClient.query).toHaveBeenCalledOnce();
expect(pgClient.query).toHaveBeenCalledWith(
"SELECT id, errors FROM events WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1",
@@ -62,7 +62,7 @@ describe("transaction", () => {
expect(pgClient.query).toHaveBeenNthCalledWith(2, "ROLLBACK");
});
- describe("findReadyToProcessEventByIdForUpdateSkipLocked", () => {
+ describe("getEventByIdForUpdateSkipLocked", () => {
it("should execute the correct query", async () => {
const rows = [1, 2, 3];
const pgClient = {
@@ -77,7 +77,7 @@ describe("transaction", () => {
const client = createProcessorClient(pgClient);
let result: any;
await client.transaction(async (txClient) => {
- result = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 });
+ result = await txClient.getEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 });
});
expect(pgClient.query).toHaveBeenCalledTimes(3);
@@ -102,7 +102,7 @@ describe("transaction", () => {
const client = createProcessorClient(pgClient);
let result: any;
await client.transaction(async (txClient) => {
- result = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 });
+ result = await txClient.getEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 });
});
expect(pgClient.query).toHaveBeenCalledTimes(3);
diff --git a/src/pg/client.ts b/src/pg/client.ts
index 41c5792..5e0557a 100644
--- a/src/pg/client.ts
+++ b/src/pg/client.ts
@@ -5,14 +5,14 @@ interface Querier {
query: Client["query"];
}
-// TODO: leverage the signal option that comes in on options for `findReadyToProcessEvents` and `findReadyToProcessEventByIdForUpdateSkipLocked`
+// TODO: leverage the signal option that comes in on options for `getEventsToProcess` and `getEventByIdForUpdateSkipLocked`
// to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774
export const createProcessorClient = (
querier: Querier,
table: string = "events",
): TxOBProcessorClient => ({
- findReadyToProcessEvents: async (opts) => {
+ getEventsToProcess: async (opts) => {
const events = await querier.query<
Pick, "id" | "errors">
>(
@@ -25,7 +25,7 @@ export const createProcessorClient = (
try {
await querier.query("BEGIN");
await fn({
- findReadyToProcessEventByIdForUpdateSkipLocked: async (eventId, opts) => {
+ getEventByIdForUpdateSkipLocked: async (eventId, opts) => {
const event = await querier.query>(
`SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`,
[eventId, opts.maxErrors],
diff --git a/src/processor.test.ts b/src/processor.test.ts
index 039e7e3..3c26f1d 100644
--- a/src/processor.test.ts
+++ b/src/processor.test.ts
@@ -9,11 +9,11 @@ import {
import { sleep } from "./sleep";
const mockTxClient = {
- findReadyToProcessEventByIdForUpdateSkipLocked: vi.fn(),
+ getEventByIdForUpdateSkipLocked: vi.fn(),
updateEvent: vi.fn(),
};
const mockClient = {
- findReadyToProcessEvents: vi.fn(),
+ getEventsToProcess: vi.fn(),
transaction: vi.fn(async (fn) => fn(mockTxClient)),
};
@@ -37,12 +37,12 @@ describe("processEvents", () => {
backoff: () => now,
};
const handlerMap = {};
- mockClient.findReadyToProcessEvents.mockImplementation(() => []);
+ mockClient.getEventsToProcess.mockImplementation(() => []);
processEvents(mockClient, handlerMap, opts);
- expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
- expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts);
+ expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
+ expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts);
expect(mockClient.transaction).not.toHaveBeenCalled();
- expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
+ expect(mockTxClient.getEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
expect(mockTxClient.updateEvent).not.toHaveBeenCalled();
});
@@ -113,8 +113,8 @@ describe("processEvents", () => {
processed_at: now,
};
const events = [evt1, evt2, evt3, evt4];
- mockClient.findReadyToProcessEvents.mockImplementation(() => events);
- mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => {
+ mockClient.getEventsToProcess.mockImplementation(() => events);
+ mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
if (id === evt3.id) return null;
return events.find((e) => e.id === id);
@@ -128,8 +128,8 @@ describe("processEvents", () => {
await processEvents(mockClient, handlerMap, opts);
- expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
- expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts);
+ expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
+ expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts);
expect(mockClient.transaction).toHaveBeenCalledTimes(3);
@@ -143,7 +143,7 @@ describe("processEvents", () => {
});
expect(handlerMap.evtType1.handler3).not.toHaveBeenCalled();
- expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
+ expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
3,
);
@@ -213,8 +213,8 @@ describe("processEvents", () => {
errors: 1,
};
const events = [evt1];
- mockClient.findReadyToProcessEvents.mockImplementation(() => events);
- mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => {
+ mockClient.getEventsToProcess.mockImplementation(() => events);
+ mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
return events.find((e) => e.id === id);
});
mockTxClient.updateEvent.mockImplementation(() => {
@@ -223,8 +223,8 @@ describe("processEvents", () => {
await processEvents(mockClient, handlerMap, opts);
- expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
- expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts);
+ expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
+ expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts);
expect(mockClient.transaction).toHaveBeenCalledTimes(1);
@@ -232,7 +232,7 @@ describe("processEvents", () => {
expect(handlerMap.evtType1.handler1).toHaveBeenCalledWith(evt1, {
signal: undefined,
});
- expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
+ expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
1,
);
@@ -320,15 +320,15 @@ describe("EventProcessor", () => {
backoff: () => now,
};
const handlerMap = {};
- mockClient.findReadyToProcessEvents.mockImplementation(() => []);
+ mockClient.getEventsToProcess.mockImplementation(() => []);
const processor = EventProcessor(mockClient, handlerMap, opts);
processor.start();
await processor.stop();
- expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
+ expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
expect(mockClient.transaction).not.toHaveBeenCalled();
- expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
+ expect(mockTxClient.getEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
expect(mockTxClient.updateEvent).not.toHaveBeenCalled();
});
});
diff --git a/src/processor.ts b/src/processor.ts
index 4951c04..106bf83 100644
--- a/src/processor.ts
+++ b/src/processor.ts
@@ -42,7 +42,7 @@ type TxOBProcessorClientOpts = {
};
export interface TxOBProcessorClient {
- findReadyToProcessEvents(
+ getEventsToProcess(
opts: TxOBProcessorClientOpts,
): Promise, "id" | "errors">[]>;
transaction(
@@ -53,7 +53,7 @@ export interface TxOBProcessorClient {
}
export interface TxOBTransactionProcessorClient {
- findReadyToProcessEventByIdForUpdateSkipLocked(
+ getEventByIdForUpdateSkipLocked(
eventId: TxOBEvent["id"],
opts: TxOBProcessorClientOpts,
): Promise | null>;
@@ -89,7 +89,7 @@ export const processEvents = async (
...opts,
};
- const events = await client.findReadyToProcessEvents(_opts);
+ const events = await client.getEventsToProcess(_opts);
_opts.logger?.debug(`found ${events.length} events to process`);
// TODO: consider concurrently processing events with max concurrency configuration
@@ -99,9 +99,9 @@ export const processEvents = async (
}
if (unlockedEvent.errors >= _opts.maxErrors) {
// Potential issue with client configuration on finding unprocessed events
- // Events with maximum allowed errors should not be returned from `findReadyToProcessEvents`
+ // Events with maximum allowed errors should not be returned from `getEventsToProcess`
_opts.logger?.warn(
- "unexpected event with max errors returned from `findReadyToProcessEvents`",
+ "unexpected event with max errors returned from `getEventsToProcess`",
{
eventId: unlockedEvent.id,
errors: unlockedEvent.errors,
@@ -113,7 +113,7 @@ export const processEvents = async (
try {
await client.transaction(async (txClient) => {
- const lockedEvent = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(
+ const lockedEvent = await txClient.getEventByIdForUpdateSkipLocked(
unlockedEvent.id,
{ signal: _opts.signal, maxErrors: _opts.maxErrors },
);
@@ -125,8 +125,8 @@ export const processEvents = async (
}
// While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time
- // that this processor found the event with `findReadyToProcessEvents` and called `findReadyToProcessEventByIdForUpdateSkipLocked`
- // `findReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources
+ // that this processor found the event with `getEventsToProcess` and called `getEventByIdForUpdateSkipLocked`
+ // `getEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources
if (lockedEvent.processed_at) {
_opts.logger?.debug("skipping already processed event", {
eventId: lockedEvent.id,
@@ -235,8 +235,8 @@ export const processEvents = async (
});
// The success of this update is crucial for the processor flow.
- // In the event of a failure, any handlers that have successfully executed
- // during this invokation will be reinvoked in the subsequent call.
+ // In the unlikely scenario of a failure to update the event, any handlers that have succeeded
+ // during this iteration will be reinvoked in the subsequent processor tick.
await retryable(() => txClient.updateEvent(lockedEvent), {
retries: 3,
factor: 2,