Skip to content

Commit

Permalink
fix: async/await issues with the new queue (#319)
Browse files Browse the repository at this point in the history
  • Loading branch information
kamtschatka authored Jul 21, 2024
1 parent 4c23ea9 commit c5c62de
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 25 deletions.
4 changes: 2 additions & 2 deletions apps/workers/crawlerWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -653,13 +653,13 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {

// Enqueue openai job (if not set, assume it's true for backward compatibility)
if (job.data.runInference !== false) {
OpenAIQueue.enqueue({
await OpenAIQueue.enqueue({
bookmarkId,
});
}

// Update the search index
triggerSearchReindex(bookmarkId);
await triggerSearchReindex(bookmarkId);

// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
Expand Down
2 changes: 1 addition & 1 deletion apps/workers/openaiWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,5 +397,5 @@ async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
await connectTags(bookmarkId, tags, bookmark.userId);

// Update the search index
triggerSearchReindex(bookmarkId);
await triggerSearchReindex(bookmarkId);
}
14 changes: 7 additions & 7 deletions packages/queue/runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ describe("SqiteQueueRunner", () => {
barrier,
);

queue.enqueue({ increment: 1 });
queue.enqueue({ increment: 2 });
queue.enqueue({ increment: 3 });
await queue.enqueue({ increment: 1 });
await queue.enqueue({ increment: 2 });
await queue.enqueue({ increment: 3 });

expect(await queue.stats()).toEqual({
pending: 3,
Expand Down Expand Up @@ -215,9 +215,9 @@ describe("SqiteQueueRunner", () => {
barrier.allowParticipantsToProceed();
const { runner, results } = buildRunner(queue, defaultRunnerOpts, barrier);

queue.enqueue({ increment: 1, succeedAfter: 2 });
queue.enqueue({ increment: 1, succeedAfter: 10 });
queue.enqueue({ increment: 3, succeedAfter: 0 });
await queue.enqueue({ increment: 1, succeedAfter: 2 });
await queue.enqueue({ increment: 1, succeedAfter: 10 });
await queue.enqueue({ increment: 3, succeedAfter: 0 });

const runnerPromise = runner.runUntilEmpty();

Expand Down Expand Up @@ -256,7 +256,7 @@ describe("SqiteQueueRunner", () => {
barrier,
);

queue.enqueue({ increment: 1, blockForSec: 10 });
await queue.enqueue({ increment: 1, blockForSec: 10 });
await runner.runUntilEmpty();

expect(await queue.stats()).toEqual({
Expand Down
8 changes: 4 additions & 4 deletions packages/shared/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>(
},
);

export function triggerSearchReindex(bookmarkId: string) {
SearchIndexingQueue.enqueue({
export async function triggerSearchReindex(bookmarkId: string) {
await SearchIndexingQueue.enqueue({
bookmarkId,
type: "index",
});
}

export function triggerSearchDeletion(bookmarkId: string) {
SearchIndexingQueue.enqueue({
export async function triggerSearchDeletion(bookmarkId: string) {
await SearchIndexingQueue.enqueue({
bookmarkId: bookmarkId,
type: "delete",
});
Expand Down
10 changes: 5 additions & 5 deletions packages/trpc/routers/bookmarks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ export const bookmarksAppRouter = router({
break;
}
}
triggerSearchReindex(bookmark.id);
await triggerSearchReindex(bookmark.id);
return bookmark;
}),

Expand Down Expand Up @@ -353,7 +353,7 @@ export const bookmarksAppRouter = router({
message: "Bookmark not found",
});
}
triggerSearchReindex(input.bookmarkId);
await triggerSearchReindex(input.bookmarkId);
return res[0];
}),

Expand All @@ -379,7 +379,7 @@ export const bookmarksAppRouter = router({
message: "Bookmark not found",
});
}
triggerSearchReindex(input.bookmarkId);
await triggerSearchReindex(input.bookmarkId);
}),

deleteBookmark: authedProcedure
Expand All @@ -405,7 +405,7 @@ export const bookmarksAppRouter = router({
eq(bookmarks.id, input.bookmarkId),
),
);
triggerSearchDeletion(input.bookmarkId);
await triggerSearchDeletion(input.bookmarkId);
if (deleted.changes > 0 && bookmark) {
await cleanupAssetForBookmark({
asset: bookmark.asset,
Expand Down Expand Up @@ -747,7 +747,7 @@ export const bookmarksAppRouter = router({
})),
)
.onConflictDoNothing();
triggerSearchReindex(input.bookmarkId);
await triggerSearchReindex(input.bookmarkId);
return {
bookmarkId: input.bookmarkId,
attached: allIds,
Expand Down
14 changes: 8 additions & 6 deletions packages/trpc/routers/tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ export const tagsAppRouter = router({
if (res.changes == 0) {
throw new TRPCError({ code: "NOT_FOUND" });
}
affectedBookmarks.forEach(({ bookmarkId }) =>
triggerSearchReindex(bookmarkId),
await Promise.all(
affectedBookmarks.map(({ bookmarkId }) =>
triggerSearchReindex(bookmarkId),
),
);
}),
deleteUnused: authedProcedure
Expand Down Expand Up @@ -185,11 +187,11 @@ export const tagsAppRouter = router({
},
},
);
await Promise.all([
await Promise.all(
affectedBookmarks
.map((b) => b.bookmarkId)
.map((id) => triggerSearchReindex(id)),
]);
);
} catch (e) {
// Best Effort attempt to reindex affected bookmarks
console.error("Failed to reindex affected bookmarks", e);
Expand Down Expand Up @@ -304,9 +306,9 @@ export const tagsAppRouter = router({
);

try {
await Promise.all([
await Promise.all(
affectedBookmarks.map((id) => triggerSearchReindex(id)),
]);
);
} catch (e) {
// Best Effort attempt to reindex affected bookmarks
console.error("Failed to reindex affected bookmarks", e);
Expand Down

0 comments on commit c5c62de

Please sign in to comment.