Skip to content

Commit

Permalink
feat(lambda-tiler): Import api for import imagery jobs. (#2170)
Browse files Browse the repository at this point in the history
* Import api for import imagery jobs.

* Hash whole JobCreationContext as id.

* Add value for new evn vairiables.

* Simplify the find imagery process.

* Unit test for import imagery api

* Update test.

* Some refinments.

* Fix typo and remove cache tag.

* Insert config job item into dynamodb.

* Remove unused import
  • Loading branch information
Wentao-Kuang authored May 10, 2022
1 parent 79e8845 commit 76b6175
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 0 deletions.
10 changes: 10 additions & 0 deletions packages/_infra/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ export interface BaseMapsConfig {

/** Public URL base that tiles are served from */
PublicUrlBase: string;

/** AWS role config bucket */
AwsRoleConfigBucket: string;

/** ImportImageryBucket */
ImportImageryBucket: string;
}

export const BaseMapsProdConfig: BaseMapsConfig = {
Expand All @@ -23,6 +29,8 @@ export const BaseMapsProdConfig: BaseMapsConfig = {
AlbPublicDns: 'int.tiles.basemaps.linz.govt.nz',
CloudFrontDns: ['basemaps.linz.govt.nz', 'tiles.basemaps.linz.govt.nz'],
PublicUrlBase: 'https://basemaps.linz.govt.nz',
AwsRoleConfigBucket: 'linz-bucket-config',
ImportImageryBucket: 'linz-basemaps-cache',
};

export const BaseMapsDevConfig: BaseMapsConfig = {
Expand All @@ -31,6 +39,8 @@ export const BaseMapsDevConfig: BaseMapsConfig = {
AlbPublicDns: 'dev.int.tiles.basemaps.linz.govt.nz',
CloudFrontDns: ['dev.basemaps.linz.govt.nz', 'tiles.dev.basemaps.linz.govt.nz'],
PublicUrlBase: 'https://dev.basemaps.linz.govt.nz',
AwsRoleConfigBucket: 'linz-bucket-config',
ImportImageryBucket: 'basemaps-cog-test',
};

export function getConfig(): BaseMapsConfig {
Expand Down
1 change: 1 addition & 0 deletions packages/lambda-tiler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"types": "./build/index.d.ts",
"license": "MIT",
"dependencies": {
"@basemaps/cli": "^6.24.2",
"@basemaps/config": "^6.24.2",
"@basemaps/geo": "^6.24.2",
"@basemaps/lambda": "^6.7.0",
Expand Down
140 changes: 140 additions & 0 deletions packages/lambda-tiler/src/__test__/tile.import.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import { JobCreationContext } from '@basemaps/cli/build/cog/cog.stac.job';
import { Nztm2000Tms } from '@basemaps/geo';
import { Config, Env, fsa, LogConfig } from '@basemaps/shared';
import o from 'ospec';
import { createHash } from 'crypto';
import sinon from 'sinon';
import { LambdaAlbRequest, LambdaHttpRequest } from '@linzjs/lambda';
import { Context } from 'aws-lambda';
import { Import } from '../routes/import.js';
import { RoleConfig } from '../import/imagery.find.js';
import { CogJobFactory } from '@basemaps/cli';
import { ConfigProcessingJob } from '@basemaps/config';

o.spec('Import', () => {
const sandbox = sinon.createSandbox();
const outputBucket = 'testOutputBucket';
const configBucket = 'testConfigBucket';
const origConfigBucket = process.env[Env.AwsRoleConfigBucket];
const origOutputBucket = process.env[Env.ImportImageryBucket];
o.beforeEach(() => {
process.env[Env.AwsRoleConfigBucket] = configBucket;
process.env[Env.ImportImageryBucket] = outputBucket;
});

o.afterEach(() => {
process.env[Env.AwsRoleConfigBucket] = origConfigBucket;
process.env[Env.ImportImageryBucket] = origOutputBucket;
sandbox.restore();
});

const tileMatrix = Nztm2000Tms;
const bucket = 'testSourceBucket';
const path = `s3://${bucket}/imagery/`;
const role: RoleConfig = {
bucket,
accountId: '123456',
roleArn: 'arn:aws:iam::123456:role/read-role',
};

const files = [`${path}/1.tiff`, `${path}/2.tiff`];
async function* listFiles(): AsyncGenerator<string, any, unknown> {
for (const key in files) yield files[key];
}

const ctx: JobCreationContext = {
override: {
projection: tileMatrix.projection,
resampling: {
warp: 'bilinear',
overview: 'lanczos',
},
},
outputLocation: { type: 's3' as const, path: `s3://${outputBucket}` },
sourceLocation: { type: 's3', path: path, ...role, files: files },
batch: true,
tileMatrix,
oneCogCovering: false,
};

const id = createHash('sha256').update(JSON.stringify(ctx)).digest('base64');
const jobId = Config.ProcessingJob.id(id);

function getRequest(path: string, projection: string): LambdaHttpRequest {
return new LambdaAlbRequest(
{
requestContext: null as any,
httpMethod: 'get',
path: '/v1/tiles/import',
body: null,
isBase64Encoded: false,
queryStringParameters: { path: path, p: projection },
},
{} as Context,
LogConfig.get(),
);
}

o('should return projection not found', async () => {
// Given ... wrong projection
const req = getRequest(path, '0000');

// When ... Then ...
const res = await Import(req);
o(res.body).equals('{"status":404,"message":"Target projection Not found"}');
});

o('should return Invalid s3 location', async () => {
// Given... wrong s3 path
const req = getRequest('s3::testbucket/', '2193');

// When ...Then ...
const res = await Import(req);
o(res.body).equals('{"status":500,"message":"Invalid s3 path"}');
});

o('should return Unable to access bucket', async () => {
// Given... different bucket have no access role
sandbox.stub(fsa, 'readJson').resolves({ buckets: [role] });
const req = getRequest(`s3://wrong-bucket/imagery/`, '2193');

// When ...Then ...
const res = await Import(req);
o(res.body).equals('{"status":500,"message":"Unable to Access the bucket"}');
});

o('should return Imagery not found', async () => {
// Given... none imagery find from bucket
sandbox.stub(fsa, 'readJson').resolves({ buckets: [role] });
sandbox.stub(fsa, 'list').callsFake(async function* () {
yield `${path}1.json`;
});

const req = getRequest(path, '2193');

// When ...Then ...
const res = await Import(req);
o(res.body).equals('{"status":404,"message":"Imagery Not Found"}');
});

o('should return 200 with existing import', async () => {
// Given... different bucket have no access role
sandbox.stub(fsa, 'readJson').resolves({ buckets: [role] });
sandbox.stub(fsa, 'list').callsFake(listFiles);
sandbox.stub(CogJobFactory, 'create').resolves(undefined);

const jobConfig = {
id: jobId,
name: path,
status: 'complete',
} as ConfigProcessingJob;
sandbox.stub(Config.ProcessingJob, 'get').resolves(jobConfig);
const req = getRequest(path, '2193');

// When ...Then ...
const res = await Import(req);
o(res.status).equals(200);
const body = Buffer.from(res.body ?? '', 'base64').toString();
o(JSON.parse(body)).deepEquals(jobConfig);
});
});
60 changes: 60 additions & 0 deletions packages/lambda-tiler/src/import/imagery.find.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { AwsCredentials } from '@chunkd/source-aws-v2';
import { fsa } from '@chunkd/fs';
import { Env } from '@basemaps/shared';

export interface RoleConfig {
bucket: string;
accountId: string;
roleArn: string;
externalId?: string;
roleSessionDuration?: number;
}

interface BucketConfig {
v: number;
buckets: RoleConfig[];
version: string;
package: string;
hash: string;
updatedAt: string;
}

export class RoleRegister {
/** Get all imagery source aws roles */
static async _loadRoles(): Promise<RoleConfig[]> {
const configBucket = Env.get(Env.AwsRoleConfigBucket);
if (configBucket == null) return [];
const configPath = `s3://${configBucket}/config.json`;
const config: BucketConfig = await fsa.readJson(configPath);
const roles = [];
for (const role of config.buckets) {
fsa.register(
's3://' + role.bucket,
AwsCredentials.fsFromRole(role.roleArn, role.externalId, role.roleSessionDuration),
);
roles.push(role);
}
return roles;
}

static _loadRolesPromise: Promise<RoleConfig[]> | undefined;
static loadRoles(): Promise<RoleConfig[]> {
if (RoleRegister._loadRolesPromise == null) RoleRegister._loadRolesPromise = this._loadRoles();
return RoleRegister._loadRolesPromise;
}

static async findRole(path: string): Promise<RoleConfig | undefined> {
const roles = await this.loadRoles();
return roles.find((f) => path.startsWith(`s3://${f.bucket}`));
}
}

/** Search for the imagery across all of our buckets */
export async function findImagery(path: string): Promise<string[]> {
const files: string[] = [];
for await (const key of fsa.list(path)) {
const searchKey = key.toLowerCase();
if (searchKey.endsWith('.tif') || searchKey.endsWith('.tiff')) files.push(key);
}
return files;
}
29 changes: 29 additions & 0 deletions packages/lambda-tiler/src/import/make.cog.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { JobCreationContext } from '@basemaps/cli/build/cog/cog.stac.job';
import { TileMatrixSet } from '@basemaps/geo';
import { Env } from '@basemaps/shared';
import { RoleConfig } from './imagery.find.js';

export async function getJobCreationContext(
path: string,
tileMatrix: TileMatrixSet,
role: RoleConfig,
files: string[],
): Promise<JobCreationContext> {
const bucket = Env.get(Env.ImportImageryBucket);
if (bucket == null) throw new Error('Output AWS s3 bucket Not Found.');
const ctx: JobCreationContext = {
override: {
projection: tileMatrix.projection,
resampling: {
warp: 'bilinear',
overview: 'lanczos',
},
},
outputLocation: { type: 's3' as const, path: `s3://${bucket}` },
sourceLocation: { type: 's3', path, ...role, files: files },
batch: true,
tileMatrix,
oneCogCovering: false,
};
return ctx;
}
2 changes: 2 additions & 0 deletions packages/lambda-tiler/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { createHash } from 'crypto';
import { Imagery } from './routes/imagery.js';
import { Esri } from './routes/esri/rest.js';
import { St } from './source.tracer.js';
import { Import } from './routes/import.js';

const app = new Router();

Expand All @@ -17,6 +18,7 @@ app.get('version', Version);
app.get('tiles', Tiles);
app.get('imagery', Imagery);
app.get('esri', Esri);
app.get('import', Import);

let slowTimer: NodeJS.Timer | null = null;
export async function handleRequest(req: LambdaHttpRequest): Promise<LambdaHttpResponse> {
Expand Down
66 changes: 66 additions & 0 deletions packages/lambda-tiler/src/routes/import.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { HttpHeader, LambdaHttpRequest, LambdaHttpResponse } from '@linzjs/lambda';
import { Config, Const, fsa } from '@basemaps/shared';
import { createHash } from 'crypto';
import { findImagery, RoleRegister } from '../import/imagery.find.js';
import { Nztm2000Tms, TileMatrixSets } from '@basemaps/geo';
import { getJobCreationContext } from '../import/make.cog.js';
import { ConfigProcessingJob, ConfigProviderDynamo } from '@basemaps/config';
import { CogJobFactory } from '@basemaps/cli';

/**
* Trigger import imagery job by this endpoint
*
* @example
* - /v1/import?path=s3://linz-imagery-staging/2022-03/wellington_rural_2022_delivery_1
*/
export async function Import(req: LambdaHttpRequest): Promise<LambdaHttpResponse> {
const path = req.query.get('path');
const projection = req.query.get('p');

// Parse projection as target, default to process both NZTM2000Quad
let targetTms = Nztm2000Tms;
if (projection != null) {
const tileMatrix = TileMatrixSets.find(projection);
if (tileMatrix == null) return new LambdaHttpResponse(404, 'Target projection Not found');
targetTms = tileMatrix;
}

// Find the imagery from s3
if (path == null || !path.startsWith('s3://')) return new LambdaHttpResponse(500, 'Invalid s3 path');
const role = await RoleRegister.findRole(path);
if (role == null) return new LambdaHttpResponse(500, 'Unable to Access the bucket');
const files = await findImagery(path);
if (files.length === 0) return new LambdaHttpResponse(404, 'Imagery Not Found');

// Prepare Cog jobs
const ctx = await getJobCreationContext(path, targetTms, role, files);

const id = createHash('sha256').update(JSON.stringify(ctx)).digest('base64');
const jobId = Config.ProcessingJob.id(id);
let jobConfig = await Config.ProcessingJob.get(jobId);
if (jobConfig == null) {
// Add id back to JobCreationContext
ctx.override!.id = id;
ctx.outputLocation.path = fsa.join(ctx.outputLocation.path, id);

// Start processing job
await CogJobFactory.create(ctx);
jobConfig = {
id: jobId,
name: path,
status: 'processing',
} as ConfigProcessingJob;

const config = new ConfigProviderDynamo(Const.TileMetadata.TableName);
await config.ProcessingJob.put(jobConfig);
}

const json = JSON.stringify(jobConfig);
const data = Buffer.from(json);

const response = new LambdaHttpResponse(200, 'ok');
response.header(HttpHeader.CacheControl, 'no-store');
response.buffer(data, 'application/json');
req.set('bytes', data.byteLength);
return response;
}
6 changes: 6 additions & 0 deletions packages/shared/src/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ export const Env = {
/** Number of hours to assume a role for, @default 8 */
AwsRoleDurationHours: 'AWS_ROLE_DURATION_HOURS',

/** AWS role config bucket */
AwsRoleConfigBucket: 'ROLE_CONFIG_BUCKET',

/** Import Imagery bucket */
ImportImageryBucket: 'IMPORT_IMAGERY_BUCKET',

Gdal: {
/** Should the gdal docker container be used? */
UseDocker: 'GDAL_DOCKER',
Expand Down

0 comments on commit 76b6175

Please sign in to comment.