Skip to content

Commit

Permalink
fix: cleanup worker stuff from memory storage to fix vitest (#2004)
Browse files Browse the repository at this point in the history
Closes #1999
  • Loading branch information
vladfrangu authored Jul 24, 2023
1 parent d0810d3 commit d2e098c
Show file tree
Hide file tree
Showing 16 changed files with 63 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,28 @@ import { writeFile } from 'node:fs';
import { writeFile as writeFileP } from 'node:fs/promises';
import { resolve } from 'node:path';
import { setTimeout } from 'node:timers/promises';
import { parentPort } from 'node:worker_threads';

import log from '@apify/log';
import { ensureDir } from 'fs-extra';
import { lock } from 'proper-lockfile';

import type { WorkerReceivedMessage, WorkerUpdateMetadataMessage } from '../utils';
import type { BackgroundHandlerReceivedMessage, BackgroundHandlerUpdateMetadataMessage } from '../utils';

const workerLog = log.child({ prefix: 'MemoryStorageWorker' });
const backgroundHandlerLog = log.child({ prefix: 'MemoryStorageBackgroundHandler' });

export async function handleMessage(message: WorkerReceivedMessage & { messageId: string }) {
export async function handleMessage(message: BackgroundHandlerReceivedMessage) {
switch (message.action) {
case 'update-metadata':
await updateMetadata(message);
break;
default:
// We're keeping this to make eslint happy + in the event we add a new action without adding checks for it
// we should be aware of them
workerLog.warning(`Unknown worker message action ${(message as WorkerReceivedMessage).action}`);
backgroundHandlerLog.warning(`Unknown background handler message action ${(message as BackgroundHandlerReceivedMessage).action}`);
}

parentPort?.postMessage({
type: 'ack',
messageId: message.messageId,
});
}

async function updateMetadata(message: WorkerUpdateMetadataMessage) {
async function updateMetadata(message: BackgroundHandlerUpdateMetadataMessage) {
// Skip writing the actual metadata file. This is done after ensuring the directory exists so we have the directory present
if (!message.writeMetadata) {
return;
Expand Down
40 changes: 40 additions & 0 deletions packages/memory-storage/src/background-handler/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { randomUUID } from 'node:crypto';

import { handleMessage } from './fs-utils';
import type { BackgroundHandlerReceivedMessage } from '../utils';

/**
* A map of promises that are created when a background task is scheduled.
* This is used in MemoryStorage#teardown to wait for all tasks to finish executing before exiting the process.
* @internal
*/
export const promiseMap: Map<string, {
promise: Promise<void>;
resolve: () => void;
}> = new Map();

export function scheduleBackgroundTask(message: BackgroundHandlerReceivedMessage) {
const id = randomUUID();

let promiseResolve: () => void;
const promise = new Promise<void>((res) => {
promiseResolve = res;
});

promiseMap.set(id, {
promise,
resolve: promiseResolve!,
});

void handleBackgroundMessage({
...message,
messageId: id,
});
}

async function handleBackgroundMessage(message: BackgroundHandlerReceivedMessage & { messageId: string }) {
await handleMessage(message);

promiseMap.get(message.messageId)?.resolve();
promiseMap.delete(message.messageId);
}
2 changes: 1 addition & 1 deletion packages/memory-storage/src/fs/dataset/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { dirname, resolve } from 'node:path';
import { AsyncQueue } from '@sapphire/async-queue';
import { ensureDir } from 'fs-extra';

import { lockAndWrite } from '../../workers/worker-utils';
import { lockAndWrite } from '../../background-handler/fs-utils';
import type { StorageImplementation } from '../common';

import type { CreateStorageImplementationOptions } from './index';
Expand Down
2 changes: 1 addition & 1 deletion packages/memory-storage/src/fs/key-value-store/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import { basename } from 'node:path/win32';
import { AsyncQueue } from '@sapphire/async-queue';
import { ensureDir } from 'fs-extra';

import { lockAndWrite } from '../../background-handler/fs-utils';
import type { InternalKeyRecord } from '../../resource-clients/key-value-store';
import { memoryStorageLog } from '../../utils';
import { lockAndWrite } from '../../workers/worker-utils';
import type { StorageImplementation } from '../common';

import type { CreateStorageImplementationOptions } from '.';
Expand Down
2 changes: 1 addition & 1 deletion packages/memory-storage/src/fs/request-queue/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { dirname, resolve } from 'node:path';
import { AsyncQueue } from '@sapphire/async-queue';
import { ensureDir } from 'fs-extra';

import { lockAndWrite } from '../../background-handler/fs-utils';
import type { InternalRequest } from '../../resource-clients/request-queue';
import { lockAndWrite } from '../../workers/worker-utils';
import type { StorageImplementation } from '../common';

import type { CreateStorageImplementationOptions } from '.';
Expand Down
4 changes: 1 addition & 3 deletions packages/memory-storage/src/memory-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import type { Dictionary } from '@crawlee/types';
import { s } from '@sapphire/shapeshift';
import { ensureDirSync, move, moveSync, pathExistsSync } from 'fs-extra';

import { promiseMap } from './background-handler/index';
import { DatasetClient } from './resource-clients/dataset';
import { DatasetCollectionClient } from './resource-clients/dataset-collection';
import { KeyValueStoreClient } from './resource-clients/key-value-store';
import { KeyValueStoreCollectionClient } from './resource-clients/key-value-store-collection';
import { RequestQueueClient } from './resource-clients/request-queue';
import { RequestQueueCollectionClient } from './resource-clients/request-queue-collection';
import { initWorkerIfNeeded, promiseMap } from './workers/instance';

export interface MemoryStorageOptions {
/**
Expand Down Expand Up @@ -74,8 +74,6 @@ export class MemoryStorage implements storage.StorageClient {
this.writeMetadata = options.writeMetadata ?? process.env.DEBUG?.includes('*') ?? process.env.DEBUG?.includes('crawlee:memory-storage') ?? false;
this.persistStorage = options.persistStorage
?? (process.env.CRAWLEE_PERSIST_STORAGE ? !['false', '0', ''].includes(process.env.CRAWLEE_PERSIST_STORAGE!) : true);

initWorkerIfNeeded();
}

datasets(): storage.DatasetCollectionClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import type * as storage from '@crawlee/types';
import { s } from '@sapphire/shapeshift';

import { DatasetClient } from './dataset';
import { scheduleBackgroundTask } from '../background-handler';
import { findOrCacheDatasetByPossibleId } from '../cache-helpers';
import type { MemoryStorage } from '../index';
import { sendWorkerMessage } from '../workers/instance';

export interface DatasetCollectionClientOptions {
baseStorageDirectory: string;
Expand Down Expand Up @@ -52,7 +52,7 @@ export class DatasetCollectionClient implements storage.DatasetCollectionClient
// Schedule the worker to write to the disk
const datasetInfo = newStore.toDatasetInfo();

sendWorkerMessage({
scheduleBackgroundTask({
action: 'update-metadata',
entityType: 'datasets',
entityDirectory: newStore.datasetDirectory,
Expand Down
4 changes: 2 additions & 2 deletions packages/memory-storage/src/resource-clients/dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import { s } from '@sapphire/shapeshift';
import { move } from 'fs-extra';

import { BaseClient } from './common/base-client';
import { scheduleBackgroundTask } from '../background-handler';
import { findOrCacheDatasetByPossibleId } from '../cache-helpers';
import { StorageTypes } from '../consts';
import type { StorageImplementation } from '../fs/common';
import { createDatasetStorageImplementation } from '../fs/dataset';
import type { MemoryStorage } from '../index';
import { sendWorkerMessage } from '../workers/instance';

/**
* This is what API returns in the x-apify-pagination-limit
Expand Down Expand Up @@ -257,7 +257,7 @@ export class DatasetClient<Data extends Dictionary = Dictionary> extends BaseCli
}

const data = this.toDatasetInfo();
sendWorkerMessage({
scheduleBackgroundTask({
action: 'update-metadata',
data,
entityType: 'datasets',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import type * as storage from '@crawlee/types';
import { s } from '@sapphire/shapeshift';

import { KeyValueStoreClient } from './key-value-store';
import { scheduleBackgroundTask } from '../background-handler';
import { findOrCacheKeyValueStoreByPossibleId } from '../cache-helpers';
import type { MemoryStorage } from '../index';
import { sendWorkerMessage } from '../workers/instance';

export interface KeyValueStoreCollectionClientOptions {
baseStorageDirectory: string;
Expand Down Expand Up @@ -52,7 +52,7 @@ export class KeyValueStoreCollectionClient implements storage.KeyValueStoreColle
// Schedule the worker to write to the disk
const kvStoreInfo = newStore.toKeyValueStoreInfo();

sendWorkerMessage({
scheduleBackgroundTask({
action: 'update-metadata',
entityType: 'keyValueStores',
entityDirectory: newStore.keyValueStoreDirectory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import { move } from 'fs-extra';
import mime from 'mime-types';

import { BaseClient } from './common/base-client';
import { scheduleBackgroundTask } from '../background-handler';
import { maybeParseBody } from '../body-parser';
import { findOrCacheKeyValueStoreByPossibleId } from '../cache-helpers';
import { DEFAULT_API_PARAM_LIMIT, StorageTypes } from '../consts';
import type { StorageImplementation } from '../fs/common';
import { createKeyValueStorageImplementation } from '../fs/key-value-store';
import type { MemoryStorage } from '../index';
import { isBuffer, isStream } from '../utils';
import { sendWorkerMessage } from '../workers/instance';

const DEFAULT_LOCAL_FILE_EXTENSION = 'bin';

Expand Down Expand Up @@ -331,7 +331,7 @@ export class KeyValueStoreClient extends BaseClient {
}

const data = this.toKeyValueStoreInfo();
sendWorkerMessage({
scheduleBackgroundTask({
action: 'update-metadata',
data,
entityType: 'keyValueStores',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import type * as storage from '@crawlee/types';
import { s } from '@sapphire/shapeshift';

import { RequestQueueClient } from './request-queue';
import { scheduleBackgroundTask } from '../background-handler';
import { findRequestQueueByPossibleId } from '../cache-helpers';
import type { MemoryStorage } from '../index';
import { sendWorkerMessage } from '../workers/instance';

export interface RequestQueueCollectionClientOptions {
baseStorageDirectory: string;
Expand Down Expand Up @@ -52,7 +52,7 @@ export class RequestQueueCollectionClient implements storage.RequestQueueCollect
// Schedule the worker to write to the disk
const queueInfo = newStore.toRequestQueueInfo();

sendWorkerMessage({
scheduleBackgroundTask({
action: 'update-metadata',
entityType: 'requestQueues',
entityDirectory: newStore.requestQueueDirectory,
Expand Down
4 changes: 2 additions & 2 deletions packages/memory-storage/src/resource-clients/request-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import { s } from '@sapphire/shapeshift';
import { move } from 'fs-extra';

import { BaseClient } from './common/base-client';
import { scheduleBackgroundTask } from '../background-handler';
import { findRequestQueueByPossibleId } from '../cache-helpers';
import { StorageTypes } from '../consts';
import type { StorageImplementation } from '../fs/common';
import { createRequestQueueStorageImplementation } from '../fs/request-queue';
import type { MemoryStorage } from '../index';
import { purgeNullsFromObject, uniqueKeyToRequestId } from '../utils';
import { sendWorkerMessage } from '../workers/instance';

const requestShape = s.object({
id: s.string,
Expand Down Expand Up @@ -514,7 +514,7 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue
}

const data = this.toRequestQueueInfo();
sendWorkerMessage({
scheduleBackgroundTask({
action: 'update-metadata',
data,
entityType: 'requestQueues',
Expand Down
10 changes: 2 additions & 8 deletions packages/memory-storage/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,9 @@ export function isStream(value: any): boolean {

export const memoryStorageLog = defaultLog.child({ prefix: 'MemoryStorage' });

export interface WorkerData {
datasetsDirectory: string;
keyValueStoresDirectory: string;
requestQueuesDirectory: string;
}

export type WorkerReceivedMessage = WorkerUpdateMetadataMessage;
export type BackgroundHandlerReceivedMessage = BackgroundHandlerUpdateMetadataMessage;

export type WorkerUpdateMetadataMessage =
export type BackgroundHandlerUpdateMetadataMessage =
| MetadataUpdate<'datasets', storage.DatasetInfo>
| MetadataUpdate<'keyValueStores', storage.KeyValueStoreInfo>
| MetadataUpdate<'requestQueues', storage.RequestQueueInfo>;
Expand Down

This file was deleted.

17 changes: 0 additions & 17 deletions packages/memory-storage/src/workers/file-storage-worker.ts

This file was deleted.

69 changes: 0 additions & 69 deletions packages/memory-storage/src/workers/instance.ts

This file was deleted.

0 comments on commit d2e098c

Please sign in to comment.