Skip to content

Commit

Permalink
0.0.20 renames for conciseness and clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
dillonstreator committed Apr 23, 2024
1 parent 455d170 commit f46bc93
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 44 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<img src="https://codecov.io/gh/dillonstreator/txob/graph/badge.svg?token=E9M7G67VLL"/>
</a>
<a aria-label="NPM version" href="https://www.npmjs.com/package/txob">
<img alt="" src="https://badgen.net/npm/v/txob?v=0.0.19">
<img alt="" src="https://badgen.net/npm/v/txob?v=0.0.20">
</a>
<a aria-label="License" href="https://github.com/dillonstreator/txob/blob/main/LICENSE">
<img alt="" src="https://badgen.net/npm/license/txob">
Expand Down Expand Up @@ -107,11 +107,12 @@ const server = http.createServer(async (req, res) => {
await client.query("BEGIN");

const userId = randomUUID();
// save/create user with userId
// save user with userId
await client.query(`INSERT INTO users (id, email) VALUES ($1, $2)`, [userId, req.body.email]);

// save event to `events` table
await client.query(
`INSERT INTO events (id, type, data, correlation_id) VALUES ( $1, $2, $3, $4 )`,
`INSERT INTO events (id, type, data, correlation_id) VALUES ($1, $2, $3, $4)`,
[
randomUUID(),
eventTypes.UserInvited,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"type": "git",
"url": "git://github.com/dillonstreator/txob.git"
},
"version": "0.0.19",
"version": "0.0.20",
"license": "MIT",
"files": [
"dist",
Expand Down
4 changes: 2 additions & 2 deletions src/mongodb/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const createProcessorClient = <EventType extends string>(
db: string,
collection: string = "events",
): TxOBProcessorClient<EventType> => ({
findReadyToProcessEvents: async (opts) => {
getEventsToProcess: async (opts) => {
const events = (await mongo
.db(db)
.collection(collection)
Expand All @@ -26,7 +26,7 @@ export const createProcessorClient = <EventType extends string>(
transaction: async (fn) => {
await mongo.withSession(async (session): Promise<void> => {
await fn({
findReadyToProcessEventByIdForUpdateSkipLocked: async (
getEventByIdForUpdateSkipLocked: async (
eventId,
opts,
) => {
Expand Down
12 changes: 6 additions & 6 deletions src/pg/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ describe("createProcessorClient", () => {
query: vi.fn(),
};
const client = createProcessorClient(pgClient);
expect(typeof client.findReadyToProcessEvents).toBe("function");
expect(typeof client.getEventsToProcess).toBe("function");
expect(typeof client.transaction).toBe("function");
});
});

describe("findReadyToProcessEvents", () => {
describe("getEventsToProcess", () => {
it("should execute the correct query", async () => {
const rows = [1, 2, 3];
const pgClient = {
Expand All @@ -26,7 +26,7 @@ describe("findReadyToProcessEvents", () => {
maxErrors: 10,
};
const client = createProcessorClient(pgClient);
const result = await client.findReadyToProcessEvents(opts);
const result = await client.getEventsToProcess(opts);
expect(pgClient.query).toHaveBeenCalledOnce();
expect(pgClient.query).toHaveBeenCalledWith(
"SELECT id, errors FROM events WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1",
Expand Down Expand Up @@ -62,7 +62,7 @@ describe("transaction", () => {
expect(pgClient.query).toHaveBeenNthCalledWith(2, "ROLLBACK");
});

describe("findReadyToProcessEventByIdForUpdateSkipLocked", () => {
describe("getEventByIdForUpdateSkipLocked", () => {
it("should execute the correct query", async () => {
const rows = [1, 2, 3];
const pgClient = {
Expand All @@ -77,7 +77,7 @@ describe("transaction", () => {
const client = createProcessorClient(pgClient);
let result: any;
await client.transaction(async (txClient) => {
result = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 });
result = await txClient.getEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 });
});

expect(pgClient.query).toHaveBeenCalledTimes(3);
Expand All @@ -102,7 +102,7 @@ describe("transaction", () => {
const client = createProcessorClient(pgClient);
let result: any;
await client.transaction(async (txClient) => {
result = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 });
result = await txClient.getEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 });
});

expect(pgClient.query).toHaveBeenCalledTimes(3);
Expand Down
6 changes: 3 additions & 3 deletions src/pg/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ interface Querier {
query: Client["query"];
}

// TODO: leverage the signal option that comes in on options for `findReadyToProcessEvents` and `findReadyToProcessEventByIdForUpdateSkipLocked`
// TODO: leverage the signal option that comes in on options for `getEventsToProcess` and `getEventByIdForUpdateSkipLocked`
// to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774

export const createProcessorClient = <EventType extends string>(
querier: Querier,
table: string = "events",
): TxOBProcessorClient<EventType> => ({
findReadyToProcessEvents: async (opts) => {
getEventsToProcess: async (opts) => {
const events = await querier.query<
Pick<TxOBEvent<EventType>, "id" | "errors">
>(
Expand All @@ -25,7 +25,7 @@ export const createProcessorClient = <EventType extends string>(
try {
await querier.query("BEGIN");
await fn({
findReadyToProcessEventByIdForUpdateSkipLocked: async (eventId, opts) => {
getEventByIdForUpdateSkipLocked: async (eventId, opts) => {
const event = await querier.query<TxOBEvent<EventType>>(
`SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`,
[eventId, opts.maxErrors],
Expand Down
38 changes: 19 additions & 19 deletions src/processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import {
import { sleep } from "./sleep";

const mockTxClient = {
findReadyToProcessEventByIdForUpdateSkipLocked: vi.fn(),
getEventByIdForUpdateSkipLocked: vi.fn(),
updateEvent: vi.fn(),
};
const mockClient = {
findReadyToProcessEvents: vi.fn(),
getEventsToProcess: vi.fn(),
transaction: vi.fn(async (fn) => fn(mockTxClient)),
};

Expand All @@ -37,12 +37,12 @@ describe("processEvents", () => {
backoff: () => now,
};
const handlerMap = {};
mockClient.findReadyToProcessEvents.mockImplementation(() => []);
mockClient.getEventsToProcess.mockImplementation(() => []);
processEvents(mockClient, handlerMap, opts);
expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts);
expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts);
expect(mockClient.transaction).not.toHaveBeenCalled();
expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
expect(mockTxClient.getEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
expect(mockTxClient.updateEvent).not.toHaveBeenCalled();
});

Expand Down Expand Up @@ -113,8 +113,8 @@ describe("processEvents", () => {
processed_at: now,
};
const events = [evt1, evt2, evt3, evt4];
mockClient.findReadyToProcessEvents.mockImplementation(() => events);
mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => {
mockClient.getEventsToProcess.mockImplementation(() => events);
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
if (id === evt3.id) return null;

return events.find((e) => e.id === id);
Expand All @@ -128,8 +128,8 @@ describe("processEvents", () => {

await processEvents(mockClient, handlerMap, opts);

expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts);
expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts);

expect(mockClient.transaction).toHaveBeenCalledTimes(3);

Expand All @@ -143,7 +143,7 @@ describe("processEvents", () => {
});
expect(handlerMap.evtType1.handler3).not.toHaveBeenCalled();

expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
3,
);

Expand Down Expand Up @@ -213,8 +213,8 @@ describe("processEvents", () => {
errors: 1,
};
const events = [evt1];
mockClient.findReadyToProcessEvents.mockImplementation(() => events);
mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => {
mockClient.getEventsToProcess.mockImplementation(() => events);
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
return events.find((e) => e.id === id);
});
mockTxClient.updateEvent.mockImplementation(() => {
Expand All @@ -223,16 +223,16 @@ describe("processEvents", () => {

await processEvents(mockClient, handlerMap, opts);

expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts);
expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts);

expect(mockClient.transaction).toHaveBeenCalledTimes(1);

expect(handlerMap.evtType1.handler1).toHaveBeenCalledOnce();
expect(handlerMap.evtType1.handler1).toHaveBeenCalledWith(evt1, {
signal: undefined,
});
expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
1,
);

Expand Down Expand Up @@ -320,15 +320,15 @@ describe("EventProcessor", () => {
backoff: () => now,
};
const handlerMap = {};
mockClient.findReadyToProcessEvents.mockImplementation(() => []);
mockClient.getEventsToProcess.mockImplementation(() => []);
const processor = EventProcessor(mockClient, handlerMap, opts);
processor.start();

await processor.stop();

expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
expect(mockClient.transaction).not.toHaveBeenCalled();
expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
expect(mockTxClient.getEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
expect(mockTxClient.updateEvent).not.toHaveBeenCalled();
});
});
20 changes: 10 additions & 10 deletions src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type TxOBProcessorClientOpts = {
};

export interface TxOBProcessorClient<TxOBEventType extends string> {
findReadyToProcessEvents(
getEventsToProcess(
opts: TxOBProcessorClientOpts,
): Promise<Pick<TxOBEvent<TxOBEventType>, "id" | "errors">[]>;
transaction(
Expand All @@ -53,7 +53,7 @@ export interface TxOBProcessorClient<TxOBEventType extends string> {
}

export interface TxOBTransactionProcessorClient<TxOBEventType extends string> {
findReadyToProcessEventByIdForUpdateSkipLocked(
getEventByIdForUpdateSkipLocked(
eventId: TxOBEvent<TxOBEventType>["id"],
opts: TxOBProcessorClientOpts,
): Promise<TxOBEvent<TxOBEventType> | null>;
Expand Down Expand Up @@ -89,7 +89,7 @@ export const processEvents = async <TxOBEventType extends string>(
...opts,
};

const events = await client.findReadyToProcessEvents(_opts);
const events = await client.getEventsToProcess(_opts);
_opts.logger?.debug(`found ${events.length} events to process`);

// TODO: consider concurrently processing events with max concurrency configuration
Expand All @@ -99,9 +99,9 @@ export const processEvents = async <TxOBEventType extends string>(
}
if (unlockedEvent.errors >= _opts.maxErrors) {
// Potential issue with client configuration on finding unprocessed events
// Events with maximum allowed errors should not be returned from `findReadyToProcessEvents`
// Events with maximum allowed errors should not be returned from `getEventsToProcess`
_opts.logger?.warn(
"unexpected event with max errors returned from `findReadyToProcessEvents`",
"unexpected event with max errors returned from `getEventsToProcess`",
{
eventId: unlockedEvent.id,
errors: unlockedEvent.errors,
Expand All @@ -113,7 +113,7 @@ export const processEvents = async <TxOBEventType extends string>(

try {
await client.transaction(async (txClient) => {
const lockedEvent = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(
const lockedEvent = await txClient.getEventByIdForUpdateSkipLocked(
unlockedEvent.id,
{ signal: _opts.signal, maxErrors: _opts.maxErrors },
);
Expand All @@ -125,8 +125,8 @@ export const processEvents = async <TxOBEventType extends string>(
}

// While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time
// that this processor found the event with `findReadyToProcessEvents` and called `findReadyToProcessEventByIdForUpdateSkipLocked`
// `findReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources
// that this processor found the event with `getEventsToProcess` and called `getEventByIdForUpdateSkipLocked`
// `getEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources
if (lockedEvent.processed_at) {
_opts.logger?.debug("skipping already processed event", {
eventId: lockedEvent.id,
Expand Down Expand Up @@ -235,8 +235,8 @@ export const processEvents = async <TxOBEventType extends string>(
});

// The success of this update is crucial for the processor flow.
// In the event of a failure, any handlers that have successfully executed
// during this invokation will be reinvoked in the subsequent call.
// In the unlikely scenario of a failure to update the event, any handlers that have succeeded
// during this iteration will be reinvoked in the subsequent processor tick.
await retryable(() => txClient.updateEvent(lockedEvent), {
retries: 3,
factor: 2,
Expand Down

0 comments on commit f46bc93

Please sign in to comment.