Skip to content

Commit

Permalink
Merge pull request #1617 from gchq/feature/refactor-release-export
Browse files Browse the repository at this point in the history
Refactor release export to export releases artefacts as separate S3 uploads
  • Loading branch information
ARADDCC012 authored Nov 20, 2024
2 parents b02984d + f3426f4 commit ae77bc3
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 54 deletions.
4 changes: 2 additions & 2 deletions backend/src/routes/v2/model/postRequestExport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { parse } from '../../../utils/validate.js'

export const postRequestExportToS3Schema = z.object({
params: z.object({
modelId: z.string(),
modelId: z.string().openapi({ example: 'yolo-v4-abcdef' }),
}),
body: z.object({
disclaimerAgreement: z.boolean(),
Expand All @@ -19,7 +19,7 @@ export const postRequestExportToS3Schema = z.object({

registerPath({
method: 'post',
path: '/api/v2/model/:modelId/export/s3',
path: '/api/v2/model/{modelId}/export/s3',
tags: ['model', 'mirror'],
description:
'Request for all current model card revisions to be exported to S3 as a Zip file. Can also include releases specified by semver in the body.',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ export const postAccessRequestReviewResponseSchema = z.object({

registerPath({
method: 'post',
path: '/api/v2/model/{modelId}/access-request/{acessRequestId}/review',
tags: ['accessRequest', 'review'],
path: '/api/v2/model/{modelId}/access-request/{accessRequestId}/review',
tags: ['access-request', 'review'],
description: 'Send a review for an access request.',
schema: postAccessRequestReviewResponseSchema,
responses: {
Expand Down
101 changes: 52 additions & 49 deletions backend/src/services/mirroredModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,7 @@ export async function exportModel(
const s3Stream = new PassThrough()
zip.pipe(s3Stream)

if (config.modelMirror.export.kmsSignature.enabled) {
log.debug({ modelId, semvers }, 'Using signatures. Uploading to temporary S3 location first.')
uploadToTemporaryS3Location(modelId, semvers, s3Stream).then(() =>
copyToExportBucketWithSignatures(modelId, semvers, mirroredModelId, user.dn).catch((error) =>
log.error({ modelId, semvers, error }, 'Failed to upload export to export location with signatures'),
),
)
} else {
log.debug({ modelId, semvers }, 'Signatures not enabled. Uploading to export S3 location.')
uploadToExportS3Location(modelId, semvers, s3Stream, { modelId, mirroredModelId })
}
await uploadToS3(`${modelId}.zip`, s3Stream, user.dn, { modelId, semvers }, { modelId, mirroredModelId })

try {
await addModelCardRevisionsToZip(user, model, zip)
Expand Down Expand Up @@ -172,50 +162,68 @@ function parseModelCard(modelCardJson: string, mirroredModelId: string, sourceMo
return { modelCard }
}

async function uploadToS3(
fileName: string,
stream: Readable,
exporter: string,
logData: Record<string, unknown>,
metadata?: Record<string, string>,
) {
if (config.modelMirror.export.kmsSignature.enabled) {
log.debug(logData, 'Using signatures. Uploading to temporary S3 location first.')
uploadToTemporaryS3Location(fileName, stream, logData).then(() =>
copyToExportBucketWithSignatures(fileName, exporter, logData, metadata).catch((error) =>
log.error({ error, ...logData }, 'Failed to upload export to export location with signatures'),
),
)
} else {
log.debug(logData, 'Signatures not enabled. Uploading to export S3 location.')
uploadToExportS3Location(fileName, stream, logData, metadata)
}
}

async function copyToExportBucketWithSignatures(
modelId: string,
semvers: string[] | undefined,
mirroredModelId: string,
fileName: string,
exporter: string,
logData: Record<string, unknown>,
metadata?: Record<string, string>,
) {
let signatures = {}
log.debug({ modelId, semvers }, 'Getting stream from S3 to generate signatures.')
const streamForDigest = await getObjectFromTemporaryS3Location(modelId, semvers)
log.debug(logData, 'Getting stream from S3 to generate signatures.')
const streamForDigest = await getObjectFromTemporaryS3Location(fileName, logData)
const messageDigest = await generateDigest(streamForDigest)
log.debug({ modelId, semvers }, 'Generating signatures.')
log.debug(logData, 'Generating signatures.')
try {
signatures = await sign(messageDigest)
} catch (e) {
log.error({ modelId }, 'Error generating signature for export.')
log.error(logData, 'Error generating signature for export.')
throw e
}
log.debug({ modelId, semvers }, 'Successfully generated signatures')
log.debug({ modelId, semvers }, 'Getting stream from S3 to upload to export location.')
const streamToCopy = await getObjectFromTemporaryS3Location(modelId, semvers)
await uploadToExportS3Location(modelId, semvers, streamToCopy, {
modelId,
mirroredModelId,
log.debug(logData, 'Successfully generated signatures')
log.debug(logData, 'Getting stream from S3 to upload to export location.')
const streamToCopy = await getObjectFromTemporaryS3Location(fileName, logData)
await uploadToExportS3Location(fileName, streamToCopy, logData, {
exporter,
...signatures,
...metadata,
})
}

async function uploadToTemporaryS3Location(
modelId: string,
semvers: string[] | undefined,
fileName: string,
stream: Readable,
logData: Record<string, unknown>,
metadata?: Record<string, string>,
) {
const bucket = config.s3.buckets.uploads
const object = `exportQueue/${modelId}.zip`
const object = `exportQueue/${fileName}`
try {
await putObjectStream(bucket, object, stream, metadata)
log.debug(
{
bucket,
object,
modelId,
semvers,
...logData,
},
'Successfully uploaded export to temporary S3 location.',
)
Expand All @@ -224,26 +232,24 @@ async function uploadToTemporaryS3Location(
{
bucket,
object,
modelId,
semvers,
error,
...logData,
},
'Failed to export to temporary S3 location.',
)
}
}

async function getObjectFromTemporaryS3Location(modelId: string, semvers: string[] | undefined) {
async function getObjectFromTemporaryS3Location(fileName: string, logData: Record<string, unknown>) {
const bucket = config.s3.buckets.uploads
const object = `exportQueue/${modelId}.zip`
const object = `exportQueue/${fileName}`
try {
const stream = (await getObjectStream(bucket, object)).Body as Readable
log.debug(
{
bucket,
object,
modelId,
semvers,
...logData,
},
'Successfully retrieved stream from temporary S3 location.',
)
Expand All @@ -253,9 +259,8 @@ async function getObjectFromTemporaryS3Location(modelId: string, semvers: string
{
bucket,
object,
modelId,
semvers,
error,
...logData,
},
'Failed to retrieve stream from temporary S3 location.',
)
Expand All @@ -264,21 +269,19 @@ async function getObjectFromTemporaryS3Location(modelId: string, semvers: string
}

async function uploadToExportS3Location(
modelId: string,
semvers: string[] | undefined,
object: string,
stream: Readable,
logData: Record<string, unknown>,
metadata?: Record<string, string>,
) {
const bucket = config.modelMirror.export.bucket
const object = `${modelId}.zip`
try {
await putObjectStream(bucket, object, stream, metadata)
log.debug(
{
bucket,
object,
modelId,
semvers,
...logData,
},
'Successfully uploaded export to export S3 location.',
)
Expand All @@ -287,9 +290,8 @@ async function uploadToExportS3Location(
{
bucket,
object,
modelId,
semvers,
error,
...logData,
},
'Failed to export to export S3 location.',
)
Expand Down Expand Up @@ -323,13 +325,14 @@ async function addReleaseToZip(user: UserInterface, model: ModelDoc, release: Re
log.debug('Adding release to zip file of releases.', { user, modelId: model.id, semver: release.semver })
const files: FileInterfaceDoc[] = await getFilesByIds(user, release.modelId, release.fileIds)

const baseUri = `releases/${release.semver}`
try {
zip.append(JSON.stringify(release.toJSON()), { name: `${baseUri}/releaseDocument.json` })
zip.append(JSON.stringify(release.toJSON()), { name: `releases/${release.semver}.json` })
for (const file of files) {
zip.append(JSON.stringify(file.toJSON()), { name: `${baseUri}/files/${file._id}/fileDocument.json` })
zip.append((await downloadFile(user, file._id)).Body as stream.Readable, {
name: `${baseUri}/files/${file._id}/fileContent`,
zip.append(JSON.stringify(file.toJSON()), { name: `files/${file._id}.json` })
await uploadToS3(file.path, (await downloadFile(user, file._id)).Body as stream.Readable, user.dn, {
modelId: model.id,
releaseId: release.id,
fileId: file.id,
})
}
} catch (error: any) {
Expand Down
3 changes: 2 additions & 1 deletion backend/test/services/mirroredModel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ describe('services > mirroredModel', () => {

test('exportModel > successful export if no files exist', async () => {
releaseMocks.getAllFileIds.mockResolvedValueOnce([])
fileMocks.getFilesByIds.mockResolvedValueOnce([])
await exportModel({} as UserInterface, 'modelId', true, ['1.2.3', '1.2.4'])
// Allow for completion of asynchronous content
await new Promise((r) => setTimeout(r))
Expand Down Expand Up @@ -314,7 +315,7 @@ describe('services > mirroredModel', () => {
test('exportModel > export uploaded to S3 for model cards and releases', async () => {
await exportModel({} as UserInterface, 'modelId', true, ['1.2.3', '3.2.1'])

expect(s3Mocks.putObjectStream).toBeCalledTimes(2)
expect(s3Mocks.putObjectStream).toBeCalledTimes(3)
})

test('exportModel > unable to upload to tmp S3 location', async () => {
Expand Down

0 comments on commit ae77bc3

Please sign in to comment.