Skip to content

Commit

Permalink
feat: support parallelize scrapers (#356)
Browse files Browse the repository at this point in the history
* israeli-bank-scrapers@5.2.0

* patch israeli-bank-scrapers to allow passing a browser context

* Use a single browser (and context) for all scrapers

* Better logs

* Use a separate browser context for each scraper

* Move ScraperOptions to own var

* Extract `scrapeAccount`

* Add option to run parallel scrapers
  • Loading branch information
daniel-hauser authored Oct 5, 2024
1 parent 11fcffa commit 0efcc8f
Show file tree
Hide file tree
Showing 10 changed files with 499 additions and 147 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/scrape.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ on:
default: "auto_moneyman"
required: false
description: "The name of the worksheet to write to"
parallelScrapes:
default: "1"
required: false
description: "Number of parallel scrapes to run"
schedule:
- cron: "33 10 * * *"
env:
Expand Down Expand Up @@ -57,6 +61,7 @@ jobs:
-e BUXFER_ACCOUNTS
-e TRANSACTION_HASH_TYPE
-e WEB_POST_URL
-e MAX_PARALLEL_SCRAPERS
${{ env.REGISTRY }}/${{ steps.normalize-repository-name.outputs.repository }}:latest
env:
DEBUG: ""
Expand Down Expand Up @@ -85,3 +90,4 @@ jobs:
BUXFER_ACCOUNTS: ${{ secrets.BUXFER_ACCOUNTS }}
TRANSACTION_HASH_TYPE: ${{ secrets.TRANSACTION_HASH_TYPE }}
WEB_POST_URL: ${{ secrets.WEB_POST_URL }}
MAX_PARALLEL_SCRAPERS: ${{ github.event.inputs.parallelScrapes }}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ Example:
| `TRANSACTION_HASH_TYPE` | `` | The hash type to use for the transaction hash. Can be `moneyman` or empty. The default will be changed to `moneyman` in the upcoming versions |
| `HIDDEN_DEPRECATIONS` | '' | A comma separated list of deprecations to hide |
| `PUPPETEER_EXECUTABLE_PATH` | `undefined` | An ExecutablePath for the scraper. if undefined defaults to system. |
| `MAX_PARALLEL_SCRAPERS` | `1` | The maximum number of parallel scrapers to run |

### Get notified in telegram

Expand Down
180 changes: 97 additions & 83 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
},
"homepage": "https://github.com/daniel-hauser/moneyman#readme",
"dependencies": {
"async": "^3.2.6",
"azure-kusto-data": "^6.0.2",
"azure-kusto-ingest": "^6.0.2",
"buxfer-ts-client": "^1.1.0",
Expand All @@ -38,11 +39,12 @@
"google-auth-library": "^9.14.1",
"google-spreadsheet": "^4.1.4",
"hash-it": "^6.0.0",
"israeli-bank-scrapers": "^5.1.4",
"israeli-bank-scrapers": "^5.2.0",
"telegraf": "^4.16.3",
"ynab": "^2.5.0"
},
"devDependencies": {
"@types/async": "^3.2.24",
"@types/debug": "^4.1.12",
"@types/jest": "^29.5.13",
"husky": "^9.1.6",
Expand Down
274 changes: 274 additions & 0 deletions patches/israeli-bank-scrapers+5.2.0.patch

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions src/browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import puppeteer, {
type Browser,
type PuppeteerLaunchOptions,
} from "puppeteer";
import { createLogger } from "./utils/logger.js";

export const browserArgs = ["--disable-dev-shm-usage", "--no-sandbox"];
export const browserExecutablePath =
process.env.PUPPETEER_EXECUTABLE_PATH || undefined;

const logger = createLogger("browser");

export async function createBrowser(): Promise<Browser> {
const options = {
args: browserArgs,
executablePath: browserExecutablePath,
} satisfies PuppeteerLaunchOptions;

logger("Creating browser", options);
return puppeteer.launch(options);
}
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const {
BUXFER_ACCOUNTS = "",
TRANSACTION_HASH_TYPE = "",
WEB_POST_URL = "",
MAX_PARALLEL_SCRAPERS = "",
} = process.env;

/**
Expand All @@ -34,6 +35,7 @@ export const daysBackToScrape = DAYS_BACK || 10;
export const worksheetName = WORKSHEET_NAME || "_moneyman";
export const futureMonthsToScrape = parseInt(FUTURE_MONTHS, 10);
export const systemTimezone = Intl.DateTimeFormat().resolvedOptions().timeZone;
export const parallelScrapers = MAX_PARALLEL_SCRAPERS || 1;

const accountsToScrape = ACCOUNTS_TO_SCRAPE.split(",")
.filter(Boolean)
Expand Down
135 changes: 87 additions & 48 deletions src/data/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@ import { performance } from "perf_hooks";
import { getAccountTransactions } from "./scrape.js";
import { AccountConfig, AccountScrapeResult } from "../types.js";
import { createLogger } from "../utils/logger.js";
import { createBrowser } from "../browser.js";
import { send, sendError } from "../notifier.js";
import { getFailureScreenShotPath } from "../utils/failureScreenshot.js";
import { ScraperOptions } from "israeli-bank-scrapers";
import { parallelScrapers } from "../config.js";
import { parallelLimit } from "async";
import os from "node:os";

const logger = createLogger("data");
const logger = createLogger("scraper");

export async function scrapeAccounts(
accounts: Array<AccountConfig>,
Expand All @@ -18,67 +25,74 @@ export async function scrapeAccounts(

logger(`scraping %d accounts`, accounts.length);
logger(`start date %s`, startDate.toISOString());

let futureMonths: number | undefined = undefined;
if (!Number.isNaN(futureMonthsToScrape)) {
logger(`months to scrap: %d`, futureMonthsToScrape);
futureMonths = futureMonthsToScrape;
}

const status: Array<string> = [];
const results: Array<AccountScrapeResult> = [];

for (let i = 0; i < accounts.length; i++) {
const account = accounts[i];

logger(`scraping account #${i} (type=${account.companyId})`);
const result = await scrapeAccount(
account,
startDate,
futureMonthsToScrape,
async (message) => {
status[i] = message;
await scrapeStatusChanged?.(status);
},
);

results.push({
companyId: account.companyId,
result,
});
logger(`scraping account #${i} ended`);
logger("Creating a browser");
const browser = await createBrowser();
logger(`Browser created, starting to scrape ${accounts.length} accounts`);

if (Number(parallelScrapers) > 1) {
logger(`Running with ${parallelScrapers} parallel scrapers`);
send(
`System info: ${JSON.stringify(
{
parallelScrapers: Number(parallelScrapers),
availableParallelism: os.availableParallelism(),
totalMemoryGB: (os.totalmem() / 1000000000).toFixed(2),
freeMemoryGB: (os.freemem() / 1000000000).toFixed(2),
cpus: os.cpus().length,
},
null,
2,
)}`,
);
}

logger(`scraping ended`);
const stats = getStats(results);
logger(
`Got ${stats.transactions} transactions from ${stats.accounts} accounts`,
const results = await parallelLimit<AccountConfig, AccountScrapeResult[]>(
accounts.map(
(account, i) => async () =>
scrapeAccount(
logger.extend(`#${i} (${account.companyId})`),
account,
{
browserContext: await browser.createBrowserContext(),
startDate,
companyId: account.companyId,
futureMonthsToScrape: futureMonths,
storeFailureScreenShotPath: getFailureScreenShotPath(
account.companyId,
),
},
async (message, append = false) => {
status[i] = append ? `${status[i]} ${message}` : message;
return scrapeStatusChanged?.(status);
},
),
),
Number(parallelScrapers),
);

const duration = (performance.now() - start) / 1000;
logger(`total duration: ${duration}s`);

logger(`scraping ended, total duration: ${duration.toFixed(1)}s`);
await scrapeStatusChanged?.(status, duration);

return results;
}

export async function scrapeAccount(
account: AccountConfig,
startDate: Date,
futureMonthsToScrape: number,
setStatusMessage: (message: string) => Promise<void>,
) {
let message = "";
const start = performance.now();
const result = await getAccountTransactions(
account,
startDate,
futureMonthsToScrape,
(cid, step) => setStatusMessage((message = `[${cid}] ${step}`)),
);
try {
logger(`closing browser`);
await browser?.close();
} catch (e) {
sendError(e, "browser.close");
logger(`failed to close browser`, e);
}

const duration = (performance.now() - start) / 1000;
logger(`scraping took ${duration.toFixed(1)}s`);
await setStatusMessage(`${message}, took ${duration.toFixed(1)}s`);
return result;
logger(getStats(results));
return results;
}

function getStats(results: Array<AccountScrapeResult>) {
Expand All @@ -99,3 +113,28 @@ function getStats(results: Array<AccountScrapeResult>) {
transactions,
};
}

async function scrapeAccount(
logger: debug.IDebugger,
account: AccountConfig,
scraperOptions: ScraperOptions,
setStatusMessage: (message: string, append?: boolean) => Promise<void>,
) {
logger(`scraping`);

const scraperStart = performance.now();
const result = await getAccountTransactions(
account,
scraperOptions,
(cid, step) => setStatusMessage(`[${cid}] ${step}`),
);

const duration = (performance.now() - scraperStart) / 1000;
logger(`scraping ended, took ${duration.toFixed(1)}s`);
await setStatusMessage(`, took ${duration.toFixed(1)}s`, true);

return {
companyId: account.companyId,
result,
};
}
21 changes: 7 additions & 14 deletions src/data/scrape.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
import { createScraper, ScraperScrapingResult } from "israeli-bank-scrapers";
import {
createScraper,
ScraperOptions,
ScraperScrapingResult,
} from "israeli-bank-scrapers";
import { AccountConfig } from "../types.js";
import { ScraperErrorTypes } from "israeli-bank-scrapers/lib/scrapers/errors.js";
import { createLogger } from "../utils/logger.js";
import { getFailureScreenShotPath } from "../utils/failureScreenshot.js";

const logger = createLogger("scrape");

export async function getAccountTransactions(
account: AccountConfig,
startDate: Date,
futureMonthsToScrape: number,
options: ScraperOptions,
onProgress: (companyId: string, status: string) => void,
): Promise<ScraperScrapingResult> {
logger(`started`);
try {
const scraper = createScraper({
executablePath: process.env.PUPPETEER_EXECUTABLE_PATH || undefined,
startDate,
companyId: account.companyId,
args: ["--disable-dev-shm-usage", "--no-sandbox"],
futureMonthsToScrape: Number.isNaN(futureMonthsToScrape)
? undefined
: futureMonthsToScrape,
storeFailureScreenShotPath: getFailureScreenShotPath(account.companyId),
});
const scraper = createScraper(options);

scraper.onProgress((companyId, { type }) => {
logger(`[${companyId}] ${type}`);
Expand Down
2 changes: 1 addition & 1 deletion src/utils/failureScreenshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export function getFailureScreenShotPath(companyId: string) {
}

const filePath = path.join(companyDir, `${companyId}-${Date.now()}.png`);
logger("getFailureScreenShotPath", { filePath });
logger("getFailureScreenShotPath %o", filePath);

return filePath;
}
Expand Down

0 comments on commit 0efcc8f

Please sign in to comment.