Skip to content

Commit

Permalink
feat(core): parallelize cache restoration
Browse files Browse the repository at this point in the history
  • Loading branch information
vsavkin committed Oct 4, 2021
1 parent 7bef953 commit b79072a
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 110 deletions.
177 changes: 104 additions & 73 deletions packages/workspace/src/tasks-runner/cache.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import { appRootPath } from '@nrwl/tao/src/utils/app-root';
import { Task } from '@nrwl/devkit';
import { exists, lstat, readdir } from 'fs';
import {
existsSync,
mkdirSync,
readFileSync,
writeFileSync,
lstatSync,
unlinkSync,
} from 'fs';
import { removeSync, ensureDirSync, copySync, readdirSync } from 'fs-extra';
import { join, resolve, sep } from 'path';
ensureDirSync,
mkdir,
unlink,
ensureDir,
writeFile,
readFile,
remove,
copy,
} from 'fs-extra';
import { dirname, join, resolve, sep } from 'path';
import { DefaultTasksRunnerOptions } from './default-tasks-runner';
import { spawn } from 'child_process';
import { cacheDirectory } from '../utilities/cache-directory';
import { writeToFile } from '../utilities/fileutils';

const util = require('util');

const readFileAsync = util.promisify(readFile);
const existsAsync = util.promisify(exists);
const lstatAsync = util.promisify(lstat);
const readdirAsync = util.promisify(readdir);

export type CachedResult = {
terminalOutput: string;
Expand Down Expand Up @@ -56,7 +64,7 @@ export class Cache {
}

async get(task: Task): Promise<CachedResult> {
const res = this.getFromLocalDir(task);
const res = await this.getFromLocalDir(task);

// didn't find it locally but we have a remote cache
if (!res && this.options.remoteCache) {
Expand All @@ -79,141 +87,161 @@ export class Cache {
const tdCommit = join(this.cachePath, `${task.hash}.commit`);

// might be left overs from partially-completed cache invocations
removeSync(tdCommit);
removeSync(td);
await remove(tdCommit);
await remove(td);

mkdirSync(td);
writeFileSync(
await mkdir(td);
await writeFile(
join(td, 'terminalOutput'),
terminalOutput ?? 'no terminal output'
);

mkdirSync(join(td, 'outputs'));
outputs.forEach((f) => {
await mkdir(join(td, 'outputs'));
for (const f of outputs) {
const src = join(this.root, f);
if (existsSync(src)) {
if (await existsAsync(src)) {
const cached = join(td, 'outputs', f);
// Ensure parent directory is created if src is a file
const isFile = lstatSync(src).isFile();
const isFile = (await lstatAsync(src)).isFile();
const directory = isFile ? resolve(cached, '..') : cached;
ensureDirSync(directory);
await ensureDir(directory);

copySync(src, cached);
await copy(src, cached);
}
});
}
// we need this file to account for partial writes to the cache folder.
// creating this file is atomic, whereas creating a folder is not.
// so if the process gets terminated while we are copying stuff into cache,
// the cache entry won't be used.
writeFileSync(join(td, 'code'), code.toString());
writeFileSync(tdCommit, 'true');
await writeFile(join(td, 'code'), code.toString());
await writeFile(tdCommit, 'true');

if (this.options.remoteCache) {
await this.options.remoteCache.store(task.hash, this.cachePath);
}
}

copyFilesFromCache(
async copyFilesFromCache(
hash: string,
cachedResult: CachedResult,
outputs: string[]
) {
this.removeRecordedOutputsHashes(outputs);
outputs.forEach((f) => {
await this.removeRecordedOutputsHashes(outputs);
for (const f of outputs) {
const cached = join(cachedResult.outputsPath, f);
if (existsSync(cached)) {
const isFile = lstatSync(cached).isFile();
if (await existsAsync(cached)) {
const isFile = (await lstatAsync(cached)).isFile();
const src = join(this.root, f);
removeSync(src);
await remove(src);

// Ensure parent directory is created if src is a file
const directory = isFile ? resolve(src, '..') : src;
ensureDirSync(directory);
copySync(cached, src);
await ensureDir(directory);
await copy(cached, src);
}
});
this.recordOutputsHash(outputs, hash);
}
await this.recordOutputsHash(outputs, hash);
}

temporaryOutputPath(task: Task) {
return join(this.terminalOutputsDir, task.hash);
}

removeRecordedOutputsHashes(outputs: string[]): void {
outputs.forEach((output) => {
async removeRecordedOutputsHashes(outputs: string[]): Promise<void> {
for (const output of outputs) {
const hashFile = this.getFileNameWithLatestRecordedHashForOutput(output);
try {
unlinkSync(hashFile);
await unlink(hashFile);
} catch (e) {}
});
}
}

recordOutputsHash(outputs: string[], hash: string): void {
outputs.forEach((output) => {
async recordOutputsHash(outputs: string[], hash: string): Promise<void> {
for (const output of outputs) {
const hashFile = this.getFileNameWithLatestRecordedHashForOutput(output);
try {
writeToFile(hashFile, hash);
await ensureDir(dirname(hashFile));
await writeFile(hashFile, hash);
} catch {}
});
}
}

shouldCopyOutputsFromCache(
async shouldCopyOutputsFromCache(
taskWithCachedResult: TaskWithCachedResult,
outputs: string[]
): boolean {
): Promise<boolean> {
return (
this.areLatestOutputsHashesDifferentThanTaskHash(
(await this.areLatestOutputsHashesDifferentThanTaskHash(
outputs,
taskWithCachedResult.task.hash
) || this.isAnyOutputMissing(taskWithCachedResult.cachedResult, outputs)
)) ||
(await this.isAnyOutputMissing(
taskWithCachedResult.cachedResult,
outputs
))
);
}

private areLatestOutputsHashesDifferentThanTaskHash(
private async areLatestOutputsHashesDifferentThanTaskHash(
outputs: string[],
hash: string
) {
return outputs.some(
(output) => this.getLatestRecordedHashForTask(output) !== hash
);
for (let output of outputs) {
if ((await this.getLatestRecordedHashForTask(output)) !== hash)
return true;
}
return false;
}

private getLatestRecordedHashForTask(output: string): string | null {
private async getLatestRecordedHashForTask(
output: string
): Promise<string | null> {
try {
return readFileSync(
this.getFileNameWithLatestRecordedHashForOutput(output)
return (
await readFileAsync(
this.getFileNameWithLatestRecordedHashForOutput(output)
)
).toString();
} catch (e) {
return null;
}
}

private isAnyOutputMissing(
private async isAnyOutputMissing(
cachedResult: CachedResult,
outputs: string[]
): boolean {
return outputs.some((output) => {
): Promise<boolean> {
for (let output of outputs) {
const cacheOutputPath = join(cachedResult.outputsPath, output);
const rootOutputPath = join(this.root, output);

if (existsSync(cacheOutputPath) && lstatSync(cacheOutputPath).isFile()) {
return (
existsSync(join(cachedResult.outputsPath, output)) &&
!existsSync(join(this.root, output))
);
if (
(await existsAsync(cacheOutputPath)) &&
(await lstatAsync(cacheOutputPath)).isFile()
) {
if (
(await existsAsync(join(cachedResult.outputsPath, output))) &&
!(await existsAsync(join(this.root, output)))
) {
return true;
}
}

const haveDifferentAmountOfFiles =
existsSync(cacheOutputPath) &&
existsSync(rootOutputPath) &&
readdirSync(cacheOutputPath).length !==
readdirSync(rootOutputPath).length;
(await existsAsync(cacheOutputPath)) &&
(await existsAsync(rootOutputPath)) &&
(await readdirAsync(cacheOutputPath)).length !==
(await readdirAsync(rootOutputPath)).length;

return (
(existsSync(cacheOutputPath) && !existsSync(rootOutputPath)) ||
if (
((await existsAsync(cacheOutputPath)) &&
!(await existsAsync(rootOutputPath))) ||
haveDifferentAmountOfFiles
);
});
) {
return true;
}
}
return false;
}

private getFileNameWithLatestRecordedHashForOutput(output: string): string {
Expand All @@ -223,15 +251,18 @@ export class Cache {
);
}

private getFromLocalDir(task: Task) {
private async getFromLocalDir(task: Task) {
const tdCommit = join(this.cachePath, `${task.hash}.commit`);
const td = join(this.cachePath, task.hash);

if (existsSync(tdCommit)) {
const terminalOutput = readFileSync(join(td, 'terminalOutput'), 'utf-8');
if (await existsAsync(tdCommit)) {
const terminalOutput = await readFile(
join(td, 'terminalOutput'),
'utf-8'
);
let code = 0;
try {
code = Number(readFileSync(join(td, 'code'), 'utf-8'));
code = Number(await readFile(join(td, 'code'), 'utf-8'));
} catch (e) {}
return {
terminalOutput,
Expand Down
Loading

0 comments on commit b79072a

Please sign in to comment.