Skip to content

Commit

Permalink
feat: if output files already exist do not overwrite them.
Browse files Browse the repository at this point in the history
Instead continue with the missing files
  • Loading branch information
blacha committed Jan 27, 2020
1 parent f1b4d21 commit ab1b861
Showing 1 changed file with 82 additions and 19 deletions.
101 changes: 82 additions & 19 deletions packages/cog/src/cli/actions/action.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import * as aws from 'aws-sdk';
import * as ulid from 'ulid';
import { CogJob } from '../../cog';
import { FileOperator } from '../../file/file';
const JobQueue = 'CogBatchJobQueue';
const JobDefinition = 'CogBatchJob';

export class ActionBatchJob extends CommandLineAction {
private job?: CommandLineStringParameter;
Expand All @@ -19,6 +21,48 @@ export class ActionBatchJob extends CommandLineAction {
});
}

async batchOne(
job: CogJob,
batch: AWS.Batch,
quadKey: string,
isCommit: boolean,
): Promise<{ jobName: string; jobId: string }> {
const jobName = `Cog-${job.name}-${quadKey}`;
if (!isCommit || this.job?.value == null) {
return { jobName, jobId: '' };
}

const batchJob = await batch
.submitJob({
jobName,
jobQueue: JobQueue,
jobDefinition: JobDefinition,
containerOverrides: {
command: ['-V', 'cog', '--job', this.job.value, '--commit', '--quadkey', quadKey],
},
})
.promise();
return { jobName, jobId: batchJob.jobId };
}
async batchAll(job: CogJob, batch: AWS.Batch, isCommit: boolean): Promise<{ jobName: string; jobId: string }> {
const jobName = `Cog-${job.name}`;
if (!isCommit || this.job?.value == null) {
return { jobName, jobId: '' };
}
const batchJob = await batch
.submitJob({
jobName,
jobQueue: JobQueue,
jobDefinition: JobDefinition,
arrayProperties: { size: job.quadkeys.length },
containerOverrides: {
command: ['-V', 'cog', '--job', this.job.value, '--commit'],
},
})
.promise();
return { jobName, jobId: batchJob.jobId };
}

async onExecute(): Promise<void> {
if (this.job?.value == null) {
throw new Error('Failed to read parameters');
Expand All @@ -32,30 +76,49 @@ export class ActionBatchJob extends CommandLineAction {

const isCommit = this.commit?.value ?? false;

const batch = new aws.Batch({ region });
const jobName = `Cog-${job.name}`;
const jobQueue = 'CogBatchJobQueue';
const jobDefinition = 'CogBatchJob';
logger.info({ jobs: job.quadkeys.length, jobName, jobQueue, jobDefinition }, 'JobSubmit');
const outputFs = FileOperator.create(job.output);

let isPartial = false;
let todoCount = job.quadkeys.length;
const stats = await Promise.all(
job.quadkeys.map(async quadKey => {
const targetPath = FileOperator.join(job.output.path, `${job.id}/${quadKey}.tiff`);
const exists = await outputFs.exists(targetPath);
if (exists) {
logger.info({ targetPath }, 'FileExists');
isPartial = true;
todoCount--;
}
return { quadKey, exists };
}),
);

logger.info(
{
jobTotal: job.quadkeys.length,
jobLeft: todoCount,
jobQueue: JobQueue,
jobDefinition: JobDefinition,
isPartial,
},
'JobSubmit',
);

const batch = new aws.Batch({ region });
if (isPartial) {
const toSubmit = stats.filter(f => f.exists == false).map(c => c.quadKey);
for (const quadKey of toSubmit) {
const jobStatus = await this.batchOne(job, batch, quadKey, isCommit);
logger.info(jobStatus, 'JobSubmitted');
}
} else {
const jobStatus = await this.batchAll(job, batch, isCommit);
logger.info(jobStatus, 'JobSubmitted');
}
if (!isCommit) {
logger.warn('DryRun:Done');
return;
}
// TODO these names are taken from the deployment script maybe they should be looked up
const batchJob = await batch
.submitJob({
jobName,
jobQueue,
jobDefinition,
arrayProperties: { size: job.quadkeys.length },
containerOverrides: {
command: ['-V', 'cog', '--job', this.job.value, '--commit'],
},
})
.promise();

logger.info({ batch: batchJob }, 'JobSubmitted');
}

protected onDefineParameters(): void {
Expand Down

0 comments on commit ab1b861

Please sign in to comment.