diff --git a/src/nni_manager/main.ts b/src/nni_manager/main.ts index 9831d35238..1944cc9956 100644 --- a/src/nni_manager/main.ts +++ b/src/nni_manager/main.ts @@ -36,8 +36,8 @@ import { LocalTrainingServiceForGPU } from './training_service/local/localTraini import { RemoteMachineTrainingService } from './training_service/remote_machine/remoteMachineTrainingService'; -import { PAITrainingService } from './training_service/pai/paiTrainingService' - +import { PAITrainingService } from './training_service/pai/paiTrainingService'; +import { KubeflowTrainingService } from './training_service/kubeflow/kubeflowTrainingService'; function initStartupInfo(startExpMode: string, resumeExperimentId: string, basePort: number) { const createNew: boolean = (startExpMode === 'new'); @@ -52,6 +52,8 @@ async function initContainer(platformMode: string): Promise { Container.bind(TrainingService).to(RemoteMachineTrainingService).scope(Scope.Singleton); } else if (platformMode === 'pai') { Container.bind(TrainingService).to(PAITrainingService).scope(Scope.Singleton); + } else if (platformMode === 'kubeflow') { + Container.bind(TrainingService).to(KubeflowTrainingService).scope(Scope.Singleton); } else { throw new Error(`Error: unsupported mode: ${mode}`); } @@ -76,19 +78,22 @@ if (!strPort || strPort.length === 0) { const port: number = parseInt(strPort, 10); const mode: string = parseArg(['--mode', '-m']); -if (!['local', 'remote', 'pai'].includes(mode)) { +if (!['local', 'remote', 'pai', 'kubeflow'].includes(mode)) { + console.log(`FATAL: unknown mode: ${mode}`); usage(); process.exit(1); } const startMode: string = parseArg(['--start_mode', '-s']); if (!['new', 'resume'].includes(startMode)) { + console.log(`FATAL: unknown start_mode: ${startMode}`); usage(); process.exit(1); } const experimentId: string = parseArg(['--experiment_id', '-id']); if (startMode === 'resume' && experimentId.trim().length < 1) { + console.log(`FATAL: cannot resume experiment, invalid experiment_id: ${experimentId}`); usage(); process.exit(1); } diff --git a/src/nni_manager/package.json b/src/nni_manager/package.json index 04ee4df3c2..78e06d4ebf 100644 --- a/src/nni_manager/package.json +++ b/src/nni_manager/package.json @@ -15,6 +15,7 @@ "express": "^4.16.3", "express-joi-validator": "^2.0.0", "node-nvidia-smi": "^1.0.0", + "node-yaml": "^3.1.1", "rx": "^4.1.0", "sqlite3": "^4.0.2", "ssh2": "^0.6.1", diff --git a/src/nni_manager/rest_server/restHandler.ts b/src/nni_manager/rest_server/restHandler.ts index 8e3d6cdafc..a23788eca0 100644 --- a/src/nni_manager/rest_server/restHandler.ts +++ b/src/nni_manager/rest_server/restHandler.ts @@ -89,9 +89,7 @@ class NNIRestHandler { return router; } - private handle_error(err: Error, res: Response): void { - this.log.info(err); - + private handle_error(err: Error, res: Response, isFatal: boolean = false): void { if (err instanceof NNIError && err.name === NNIErrorNames.NOT_FOUND) { res.status(404); } else { @@ -100,6 +98,14 @@ class NNIRestHandler { res.send({ error: err.message }); + + // If it's a fatal error, exit process + if(isFatal) { + this.log.critical(err); + process.exit(1); + } + + this.log.error(err); } // TODO add validators for request params, query, body @@ -145,12 +151,14 @@ class NNIRestHandler { experiment_id: eid }); }).catch((err: Error) => { + // Start experiment is a step of initialization, so any exception thrown is a fatal this.handle_error(err, res); }); } else { this.nniManager.resumeExperiment().then(() => { res.send(); }).catch((err: Error) => { + // Resume experiment is a step of initialization, so any exception thrown is a fatal this.handle_error(err, res); }); } @@ -180,7 +188,8 @@ class NNIRestHandler { } res.send(); } catch (err) { - this.handle_error(err, res); + // setClusterMetata is a step of initialization, so any exception thrown is a fatal + this.handle_error(err, res, true); } }); } diff --git a/src/nni_manager/rest_server/restValidationSchemas.ts b/src/nni_manager/rest_server/restValidationSchemas.ts index 32e8ef5215..d727e9d13e 100644 --- a/src/nni_manager/rest_server/restValidationSchemas.ts +++ b/src/nni_manager/rest_server/restValidationSchemas.ts @@ -46,6 +46,14 @@ export namespace ValidationSchemas { userName: joi.string().min(1).required(), passWord: joi.string().min(1).required(), host: joi.string().min(1).required() + }), + kubeflow_config: joi.object({ + operator: joi.string().min(1).required(), + nfs: joi.object({ + server: joi.string().min(1).required(), + path: joi.string().min(1).required() + }).required(), + kubernetesServer: joi.string().min(1).required() }) } }; diff --git a/src/nni_manager/training_service/common/clusterJobRestServer.ts b/src/nni_manager/training_service/common/clusterJobRestServer.ts new file mode 100644 index 0000000000..057d57ae9e --- /dev/null +++ b/src/nni_manager/training_service/common/clusterJobRestServer.ts @@ -0,0 +1,96 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import * as assert from 'assert'; +import { Request, Response, Router } from 'express'; +import * as bodyParser from 'body-parser'; +import * as component from '../../common/component'; +import { getBasePort, getExperimentId } from '../../common/experimentStartupInfo'; +import { RestServer } from '../../common/restServer' + +/** + * Cluster Job Training service Rest server, provides rest API to support Cluster job metrics update + * + */ +@component.Singleton +export abstract class ClusterJobRestServer extends RestServer{ + private readonly API_ROOT_URL: string = '/api/v1/nni-pai'; + + private readonly expId: string = getExperimentId(); + + /** + * constructor to provide NNIRestServer's own rest property, e.g. port + */ + constructor() { + super(); + const basePort: number = getBasePort(); + assert(basePort && basePort > 1024); + + this.port = basePort + 1; + } + + public get clusterRestServerPort(): number { + if(!this.port) { + throw new Error('PAI Rest server port is undefined'); + } + return this.port; + } + + /** + * NNIRestServer's own router registration + */ + protected registerRestHandler(): void { + this.app.use(bodyParser.json()); + this.app.use(this.API_ROOT_URL, this.createRestHandler()); + } + + private createRestHandler() : Router { + const router: Router = Router(); + + // tslint:disable-next-line:typedef + router.use((req: Request, res: Response, next) => { + this.log.info(`${req.method}: ${req.url}: body:\n${JSON.stringify(req.body, undefined, 4)}`); + res.setHeader('Content-Type', 'application/json'); + next(); + }); + + router.post(`/update-metrics/${this.expId}/:trialId`, (req: Request, res: Response) => { + try { + this.log.info(`Get update-metrics request, trial job id is ${req.params.trialId}`); + this.log.info(`update-metrics body is ${JSON.stringify(req.body)}`); + + this.handleTrialMetrics(req.body.jobId, req.body.metrics); + + res.send(); + } + catch(err) { + this.log.error(`json parse metrics error: ${err}`); + res.status(500); + res.send(err.message); + } + }); + + return router; + } + + /** Abstract method to handle trial metrics data */ + protected abstract handleTrialMetrics(jobId : string, trialMetrics : any[]) : void; +} \ No newline at end of file diff --git a/src/nni_manager/training_service/common/containerJobData.ts b/src/nni_manager/training_service/common/containerJobData.ts new file mode 100644 index 0000000000..1a96fb88e1 --- /dev/null +++ b/src/nni_manager/training_service/common/containerJobData.ts @@ -0,0 +1,30 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +export const CONTAINER_INSTALL_NNI_SHELL_FORMAT: string = +`#!/bin/bash +if python3 -c 'import nni' > /dev/null 2>&1; then + # nni module is already installed, skip + return +else + # Install nni + python3 -m pip install --user --upgrade nni +fi`; \ No newline at end of file diff --git a/src/nni_manager/training_service/common/trialConfigMetadataKey.ts b/src/nni_manager/training_service/common/trialConfigMetadataKey.ts index 334a9604d6..81169c986e 100644 --- a/src/nni_manager/training_service/common/trialConfigMetadataKey.ts +++ b/src/nni_manager/training_service/common/trialConfigMetadataKey.ts @@ -28,5 +28,6 @@ export enum TrialConfigMetadataKey { EXPERIMENT_ID = 'experimentId', MULTI_PHASE = 'multiPhase', RANDOM_SCHEDULER = 'random_scheduler', - PAI_CLUSTER_CONFIG = 'pai_config' + PAI_CLUSTER_CONFIG = 'pai_config', + KUBEFLOW_CLUSTER_CONFIG = 'kubeflow_config' } diff --git a/src/nni_manager/training_service/kubeflow/kubeflowConfig.ts b/src/nni_manager/training_service/kubeflow/kubeflowConfig.ts new file mode 100644 index 0000000000..dca552ee11 --- /dev/null +++ b/src/nni_manager/training_service/kubeflow/kubeflowConfig.ts @@ -0,0 +1,93 @@ +import { TrialConfig } from "../common/trialConfig"; + +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + + +/** operator types that kubeflow supported */ +export type KubeflowOperator = 'tf-operator' | 'pytorch-operator' | 'mxnet-operator' | 'caffe2-operator' | 'chainer-operator' | 'mpi-operator'; +export type KubeflowOperatorPlural = 'tfjobs' | 'pytorchjobs' | 'mxjobs' | 'caffe2jobs' | 'chainerjobs' | 'mpijobs'; + +/** + * map from Kubeflow operator name to its plural name in K8S + */ +export const kubeflowOperatorMap : Map = new Map([ + ['tf-operator' , 'tfjobs'], + ['pytorch-operator', 'pytorchjobs'], + ['mxnet-operator', 'mxjobs'], + ['caffe2-operator', 'caffe2jobs'], + ['chainer-operator', 'chainerjobs'], + ['mpi-operator', 'mpijobs'] +]); + +/** + * Kuberflow cluster configuration + * + */ +export class KubeflowClusterConfig { + /** Name of Kubeflow operator, like tf-operator */ + public readonly operator: KubeflowOperator; + public readonly nfs: NFSConfig; + public readonly kubernetesServer: string; + + /** + * Constructor + * @param userName User name of Kubeflow Cluster + * @param passWord password of Kubeflow Cluster + * @param host Host IP of Kubeflow Cluster + */ + constructor(operator: KubeflowOperator, nfs : NFSConfig, kubernetesServer : string) { + this.operator = operator; + this.nfs = nfs; + this.kubernetesServer = kubernetesServer; + } +} + +/** + * NFS configuration to store Kubeflow job related files + */ +export class NFSConfig { + /** IP Adress of NFS server */ + public readonly server : string; + /** exported NFS path on NFS server */ + public readonly path : string; + + constructor(server : string, path : string) { + this.server = server; + this.path = path; + } +} + +/** + * Trial job configuration for Kubeflow + */ +export class KubeflowTrialConfig extends TrialConfig { + public readonly cpuNum: number; + public readonly memoryMB: number; + public readonly image: string; + + constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number, image: string) { + super(command, codeDir, gpuNum); + this.cpuNum = cpuNum; + this.memoryMB = memoryMB; + this.image = image; + } +} \ No newline at end of file diff --git a/src/nni_manager/training_service/kubeflow/kubeflowData.ts b/src/nni_manager/training_service/kubeflow/kubeflowData.ts new file mode 100644 index 0000000000..f65d0cb603 --- /dev/null +++ b/src/nni_manager/training_service/kubeflow/kubeflowData.ts @@ -0,0 +1,78 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import { JobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../common/trainingService'; + +/** + * KubeflowTrialJobDetail + */ +// tslint:disable-next-line:max-classes-per-file +export class KubeflowTrialJobDetail implements TrialJobDetail { + public id: string; + public status: TrialJobStatus; + public submitTime: number; + public startTime?: number; + public endTime?: number; + public tags?: string[]; + public url?: string; + public workingDirectory: string; + public form: JobApplicationForm; + public kubeflowJobName: string; + public sequenceId: number; + public queryJobFailedCount: number; + public k8sPluralName: string + + constructor(id: string, status: TrialJobStatus, submitTime: number, + workingDirectory: string, form: JobApplicationForm, + kubeflowJobName: string, sequenceId: number, url: string, k8sPluralName: string) { + this.id = id; + this.status = status; + this.submitTime = submitTime; + this.workingDirectory = workingDirectory; + this.form = form; + this.kubeflowJobName = kubeflowJobName; + this.sequenceId = sequenceId; + this.tags = []; + this.queryJobFailedCount = 0; + this.url = url; + this.k8sPluralName = k8sPluralName; + } +} + +export const KUBEFLOW_RUN_SHELL_FORMAT: string = +`#!/bin/bash +export NNI_PLATFORM=kubeflow +export NNI_SYS_DIR={0} +export NNI_OUTPUT_DIR={1} +export MULTI_PHASE=false +export NNI_TRIAL_JOB_ID={2} +export NNI_EXP_ID={3} +export NNI_CODE_DIR={4} +export NNI_TRIAL_SEQ_ID={5} +mkdir -p $NNI_SYS_DIR +mkdir -p $NNI_OUTPUT_DIR +cp -rT $NNI_CODE_DIR $NNI_SYS_DIR +cd $NNI_SYS_DIR +sh install_nni.sh # Check and install NNI pkg +python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR//trialkeeper_stderr +` + +export type KubeflowTFJobType = 'Created' | 'Running' | 'Failed' | 'Succeeded'; \ No newline at end of file diff --git a/src/nni_manager/training_service/kubeflow/kubeflowJobInfoCollector.ts b/src/nni_manager/training_service/kubeflow/kubeflowJobInfoCollector.ts new file mode 100644 index 0000000000..454703007f --- /dev/null +++ b/src/nni_manager/training_service/kubeflow/kubeflowJobInfoCollector.ts @@ -0,0 +1,108 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import * as cpp from 'child-process-promise'; +import { getLogger, Logger } from '../../common/log'; +import { KubeflowTrialJobDetail, KubeflowTFJobType} from './kubeflowData'; +import { NNIError, NNIErrorNames } from '../../common/errors'; +import { TrialJobStatus } from '../../common/trainingService'; + +/** + * Collector Kubeflow jobs info from Kubernetes cluster, and update kubeflow job status locally + */ +export class KubeflowJobInfoCollector { + private readonly trialJobsMap : Map; + private readonly log: Logger = getLogger(); + private readonly statusesNeedToCheck: TrialJobStatus[]; + private readonly MAX_FAILED_QUERY_JOB_NUMBER: number = 30; + + constructor(jobMap: Map) { + this.trialJobsMap = jobMap; + this.statusesNeedToCheck = ['RUNNING', 'WAITING']; + } + + public async retrieveTrialStatus() : Promise { + const updateKubeflowTrialJobs : Promise[] = []; + for(let [trialJobId, kubeflowTrialJob] of this.trialJobsMap) { + if (!kubeflowTrialJob) { + throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`); + } + // Since Kubeflow needs some delay to schedule jobs, we provide 20 seconds buffer time to check kubeflow job's status + if( Date.now() - kubeflowTrialJob.submitTime < 20 * 1000) { + return Promise.resolve(); + } + updateKubeflowTrialJobs.push(this.retrieveSingleTrialJobInfo(kubeflowTrialJob)) + } + + await Promise.all(updateKubeflowTrialJobs); + } + + private async retrieveSingleTrialJobInfo(kubeflowTrialJob : KubeflowTrialJobDetail) : Promise { + if (!this.statusesNeedToCheck.includes(kubeflowTrialJob.status)) { + return Promise.resolve(); + } + + let result : cpp.childProcessPromise.Result; + try { + result = await cpp.exec(`kubectl get ${kubeflowTrialJob.k8sPluralName} ${kubeflowTrialJob.kubeflowJobName} -o json`); + if(result.stderr) { + this.log.error(`Get ${kubeflowTrialJob.k8sPluralName} ${kubeflowTrialJob.kubeflowJobName} failed. Error is ${result.stderr}, failed checking number is ${kubeflowTrialJob.queryJobFailedCount}`); + kubeflowTrialJob.queryJobFailedCount++; + if(kubeflowTrialJob.queryJobFailedCount >= this.MAX_FAILED_QUERY_JOB_NUMBER) { + kubeflowTrialJob.status = 'UNKNOWN'; + } + } + } catch(error) { + this.log.error(`kubectl get ${kubeflowTrialJob.k8sPluralName} ${kubeflowTrialJob.kubeflowJobName} failed, error is ${error}`); + return Promise.resolve(); + } + + const kubeflowJobInfo = JSON.parse(result.stdout); + if(kubeflowJobInfo.status && kubeflowJobInfo.status.conditions) { + const latestCondition = kubeflowJobInfo.status.conditions[kubeflowJobInfo.status.conditions.length - 1]; + const tfJobType : KubeflowTFJobType = latestCondition.type; + switch(tfJobType) { + case 'Created': + kubeflowTrialJob.status = 'WAITING'; + kubeflowTrialJob.startTime = Date.parse(latestCondition.lastUpdateTime); + break; + case 'Running': + kubeflowTrialJob.status = 'RUNNING'; + if(!kubeflowTrialJob.startTime) { + kubeflowTrialJob.startTime = Date.parse(latestCondition.lastUpdateTime); + } + break; + case 'Failed': + kubeflowTrialJob.status = 'FAILED'; + kubeflowTrialJob.endTime = Date.parse(latestCondition.lastUpdateTime); + break; + case 'Succeeded': + kubeflowTrialJob.status = 'SUCCEEDED'; + kubeflowTrialJob.endTime = Date.parse(latestCondition.lastUpdateTime); + break; + default: + break; + } + } + + return Promise.resolve(); + } +} \ No newline at end of file diff --git a/src/nni_manager/training_service/kubeflow/kubeflowJobRestServer.ts b/src/nni_manager/training_service/kubeflow/kubeflowJobRestServer.ts new file mode 100644 index 0000000000..c12d4fba9d --- /dev/null +++ b/src/nni_manager/training_service/kubeflow/kubeflowJobRestServer.ts @@ -0,0 +1,54 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import * as component from '../../common/component'; +import { Inject } from 'typescript-ioc'; +import { KubeflowTrainingService } from './kubeflowTrainingService'; +import { ClusterJobRestServer } from '../common/clusterJobRestServer' + +/** + * Kubeflow Training service Rest server, provides rest API to support kubeflow job metrics update + * + */ +@component.Singleton +export class KubeflowJobRestServer extends ClusterJobRestServer{ + @Inject + private readonly kubeflowTrainingService : KubeflowTrainingService; + + /** + * constructor to provide NNIRestServer's own rest property, e.g. port + */ + constructor() { + super(); + this.kubeflowTrainingService = component.get(KubeflowTrainingService); + } + + protected handleTrialMetrics(jobId : string, metrics : any[]) : void { + // Split metrics array into single metric, then emit + // Warning: If not split metrics into single ones, the behavior will be UNKNOWN + for (const singleMetric of metrics) { + this.kubeflowTrainingService.MetricsEmitter.emit('metric', { + id : jobId, + data : singleMetric + }); + } + } +} \ No newline at end of file diff --git a/src/nni_manager/training_service/kubeflow/kubeflowTrainingService.ts b/src/nni_manager/training_service/kubeflow/kubeflowTrainingService.ts new file mode 100644 index 0000000000..1f3b30b404 --- /dev/null +++ b/src/nni_manager/training_service/kubeflow/kubeflowTrainingService.ts @@ -0,0 +1,411 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict' + +import * as assert from 'assert'; +import * as component from '../../common/component'; +import * as cpp from 'child-process-promise'; +import * as fs from 'fs'; +import * as path from 'path'; + +import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData'; +import { EventEmitter } from 'events'; +import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo'; +import { getLogger, Logger } from '../../common/log'; +import { MethodNotImplementedError } from '../../common/errors'; +import { String } from 'typescript-string-operations'; +import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; +import { + JobApplicationForm, TrainingService, TrialJobApplicationForm, + TrialJobDetail, TrialJobMetric +} from '../../common/trainingService'; +import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils'; +import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, NFSConfig } from './kubeflowConfig'; +import { KubeflowTrialJobDetail, KUBEFLOW_RUN_SHELL_FORMAT } from './kubeflowData'; +import { KubeflowJobRestServer } from './kubeflowJobRestServer'; +import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector'; + +var yaml = require('node-yaml'); + +/** + * Training Service implementation for Kubeflow + * Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow + */ +@component.Singleton +class KubeflowTrainingService implements TrainingService { + private readonly NNI_KUBEFLOW_TRIAL_LABEL = 'nni-kubeflow-trial'; + private readonly log!: Logger; + private readonly metricsEmitter: EventEmitter; + private readonly trialJobsMap: Map; + /** experiment root dir in NFS */ + private readonly trialLocalNFSTempFolder: string; + private stopping: boolean = false; + private experimentId! : string; + private nextTrialSequenceId: number; + private kubeflowClusterConfig?: KubeflowClusterConfig; + private kubeflowTrialConfig?: KubeflowTrialConfig; + private kubeflowJobInfoCollector: KubeflowJobInfoCollector; + private kubeflowRestServerPort?: number; + private kubeflowJobPlural?: string; + private readonly CONTAINER_MOUNT_PATH: string; + + constructor() { + this.log = getLogger(); + this.metricsEmitter = new EventEmitter(); + this.trialJobsMap = new Map(); + this.kubeflowJobInfoCollector = new KubeflowJobInfoCollector(this.trialJobsMap); + this.trialLocalNFSTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp'); + this.experimentId = getExperimentId(); + this.nextTrialSequenceId = -1; + this.CONTAINER_MOUNT_PATH = '/tmp/nfs'; + } + + public async run(): Promise { + const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer); + await restServer.start(); + this.log.info(`Kubeflow Training service rest server listening on: ${restServer.endPoint}`); + while (!this.stopping) { + // collect metrics by calling 'kubectl get' command on Kubeflow jobs + await delay(3000); + await this.kubeflowJobInfoCollector.retrieveTrialStatus(); + } + } + + public async submitTrialJob(form: JobApplicationForm): Promise { + if(!this.kubeflowClusterConfig) { + throw new Error('Kubeflow Cluster config is not initialized'); + } + + if(!this.kubeflowTrialConfig) { + throw new Error('Kubeflow trial config is not initialized'); + } + + if(!this.kubeflowJobPlural) { + throw new Error('Kubeflow job plural name is undefined'); + } + + if(!this.kubeflowRestServerPort) { + const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer); + this.kubeflowRestServerPort = restServer.clusterRestServerPort; + } + + const trialJobId: string = uniqueString(5); + const curTrialSequenceId: number = this.generateSequenceId(); + // Set trial's NFS working folder + const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId); + const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId); + //create tmp trial working folder locally. + await cpp.exec(`mkdir -p ${path.dirname(trialLocalTempFolder)}`); + await cpp.exec(`cp -r ${this.kubeflowTrialConfig.codeDir} ${trialLocalTempFolder}`); + + const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT; + // Write NNI installation file to local tmp files + await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' }); + + const kubeflowRunScriptContent: string = String.Format( + KUBEFLOW_RUN_SHELL_FORMAT, + `$PWD/nni/${trialJobId}`, + path.join(trialWorkingFolder, 'output'), + trialJobId, + getExperimentId(), + trialWorkingFolder, + curTrialSequenceId, + this.kubeflowTrialConfig.command, + getIPV4Address(), + this.kubeflowRestServerPort + ); + + //create tmp trial working folder locally. + await cpp.exec(`mkdir -p ${trialLocalTempFolder}`); + + // Write file content ( run.sh and parameter.cfg ) to local tmp files + await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), kubeflowRunScriptContent, { encoding: 'utf8' }); + + // Write file content ( parameter.cfg ) to local tmp folders + const trialForm : TrialJobApplicationForm = (form) + if(trialForm && trialForm.hyperParameters) { + await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)), + trialForm.hyperParameters.value, { encoding: 'utf8' }); + } + + const kubeflowJobYamlPath = path.join(trialLocalTempFolder, `kubeflow-job-${trialJobId}.yaml`); + const kubeflowJobName = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase(); + const podResources : any = {}; + podResources.requests = { + 'memory': `${this.kubeflowTrialConfig.memoryMB}Mi`, + 'cpu': `${this.kubeflowTrialConfig.cpuNum}`, + 'nvidia.com/gpu': `${this.kubeflowTrialConfig.gpuNum}` + } + + podResources.limits = Object.assign({}, podResources.requests); + + // Generate kubeflow job resource yaml file for K8S + yaml.write( + kubeflowJobYamlPath, + this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, podResources), + 'utf-8' + ); + + // Creat work dir for current trial in NFS directory + await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`); + // Copy code files from local dir to NFS mounted dir + await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`); + + const nfsConfig: NFSConfig = this.kubeflowClusterConfig.nfs; + const trialJobDetail: KubeflowTrialJobDetail = new KubeflowTrialJobDetail( + trialJobId, + 'WAITING', + Date.now(), + trialWorkingFolder, + form, + kubeflowJobName, + curTrialSequenceId, + `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`, + this.kubeflowJobPlural + ); + + // Create kubeflow training jobs + await cpp.exec(`kubectl create -f ${kubeflowJobYamlPath}`); + + // Set trial job detail until kubectl create resource successfully + this.trialJobsMap.set(trialJobId, trialJobDetail); + + return Promise.resolve(trialJobDetail); + } + + public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { + throw new MethodNotImplementedError(); + } + + public listTrialJobs(): Promise { + throw new MethodNotImplementedError(); + } + + public getTrialJob(trialJobId: string): Promise { + if(!this.kubeflowClusterConfig) { + throw new Error('Kubeflow Cluster config is not initialized'); + } + + const kubeflowTrialJob: TrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); + + if (!kubeflowTrialJob) { + return Promise.reject(`trial job ${trialJobId} not found`) + } + + return Promise.resolve(kubeflowTrialJob); + } + + public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) { + this.metricsEmitter.on('metric', listener); + } + + public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) { + this.metricsEmitter.off('metric', listener); + } + + public get isMultiPhaseJobSupported(): boolean { + return false; + } + + public async cancelTrialJob(trialJobId: string): Promise { + const trialJobDetail : KubeflowTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); + if(!trialJobDetail) { + const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} not found`; + this.log.error(errorMessage); + return Promise.reject(errorMessage); + } + if(!this.kubeflowJobPlural) { + const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} failed because kubeflowJobPlural is undefined`; + this.log.error(errorMessage); + return Promise.reject(errorMessage); + } + + const result: cpp.childProcessPromise.Result = await cpp.exec(`kubectl delete ${this.kubeflowJobPlural} -l app=${this.NNI_KUBEFLOW_TRIAL_LABEL},expId=${getExperimentId()},trialId=${trialJobId}`); + if(result.stderr) { + const errorMessage: string = `kubectl delete ${this.kubeflowJobPlural} for trial ${trialJobId} failed: ${result.stderr}`; + this.log.error(errorMessage); + return Promise.reject(errorMessage); + } + + trialJobDetail.endTime = Date.now(); + trialJobDetail.status = 'USER_CANCELED'; + + return Promise.resolve(); + } + + public async setClusterMetadata(key: string, value: string): Promise { + switch (key) { + case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG: + this.kubeflowClusterConfig = JSON.parse(value); + + // If NFS config section is valid in config file, proceed to mount and config NFS + if(this.kubeflowClusterConfig.nfs) { + //Check and mount NFS mount point here + await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`); + const nfsServer: string = this.kubeflowClusterConfig.nfs.server; + const nfsPath: string = this.kubeflowClusterConfig.nfs.path; + + try { + await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalNFSTempFolder}`); + } catch(error) { + const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalNFSTempFolder} failed, error is ${error}`; + this.log.error(mountError); + throw new Error(mountError); + } + } + + this.kubeflowJobPlural = kubeflowOperatorMap.get(this.kubeflowClusterConfig.operator); + break; + + case TrialConfigMetadataKey.TRIAL_CONFIG: + if (!this.kubeflowClusterConfig){ + this.log.error('kubeflow cluster config is not initialized'); + return Promise.reject(new Error('kubeflow cluster config is not initialized')); + } + + this.kubeflowTrialConfig = JSON.parse(value); + break; + default: + break; + } + + return Promise.resolve(); + } + + public getClusterMetadata(key: string): Promise { + throw new MethodNotImplementedError(); + } + + public async cleanUp(): Promise { + this.stopping = true; + + // First, cancel all running kubeflow jobs + for(let [trialJobId, kubeflowTrialJob] of this.trialJobsMap) { + if(['RUNNING', 'WAITING', 'UNKNOWN'].includes(kubeflowTrialJob.status)) { + try { + await this.cancelTrialJob(trialJobId); + } catch(error) {} // DONT throw error during cleanup + kubeflowTrialJob.status = 'SYS_CANCELED'; + } + } + + assert(this.kubeflowJobPlural !== undefined); + + // Delete all kubeflow jobs whose expId label is current experiment id + try { + await cpp.exec(`kubectl delete ${this.kubeflowJobPlural} -l app=${this.NNI_KUBEFLOW_TRIAL_LABEL},expId=${getExperimentId()}`); + } catch(error) { + this.log.error(`Delete ${this.kubeflowJobPlural} with label: app=${this.NNI_KUBEFLOW_TRIAL_LABEL},expId=${getExperimentId()} failed, error is ${error}`); + } + + // Unmount NFS + try { + await cpp.exec(`sudo umount ${this.trialLocalNFSTempFolder}`); + } catch(error) { + this.log.error(`Unmount ${this.trialLocalNFSTempFolder} failed, error is ${error}`); + } + + // Stop Kubeflow rest server + const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer); + try { + await restServer.stop(); + this.log.info('Kubeflow Training service rest server stopped successfully.'); + } catch (error) { + this.log.error(`Kubeflow Training service rest server stopped failed, error: ${error.message}`); + Promise.reject(error); + } + + return Promise.resolve(); + } + + public get MetricsEmitter() : EventEmitter { + return this.metricsEmitter; + } + + private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, podResources : any) : any { + if(!this.kubeflowClusterConfig) { + throw new Error('Kubeflow Cluster config is not initialized'); + } + + if(!this.kubeflowTrialConfig) { + throw new Error('Kubeflow trial config is not initialized'); + } + + return { + apiVersion: 'kubeflow.org/v1alpha2', + kind: 'TFJob', + metadata: { + name: kubeflowJobName, + namespace: 'default', + labels: { + app: this.NNI_KUBEFLOW_TRIAL_LABEL, + expId: getExperimentId(), + trialId: trialJobId + } + }, + spec: { + tfReplicaSpecs: { + Worker: { + replicas: 1, + template: { + metadata: { + creationTimestamp: null + }, + spec: { + containers: [ + { + // Kubeflow tensorflow operator requires that containers' name must be tensorflow + // TODO: change the name based on operator's type + name: 'tensorflow', + image: this.kubeflowTrialConfig.image, + args: ["sh", `${path.join(trialWorkingFolder, 'run.sh')}`], + volumeMounts: [{ + name: 'nni-nfs-vol', + mountPath: this.CONTAINER_MOUNT_PATH + }], + resources: podResources//, + //workingDir: '/tmp/nni/nuDEP' + }], + restartPolicy: 'ExitCode', + volumes: [{ + name: 'nni-nfs-vol', + nfs: { + server: `${this.kubeflowClusterConfig.nfs.server}`, + path: `${this.kubeflowClusterConfig.nfs.path}` + } + }] + } + } + } + } + } + }; + } + + private generateSequenceId(): number { + if (this.nextTrialSequenceId === -1) { + this.nextTrialSequenceId = getInitTrialSequenceId(); + } + + return this.nextTrialSequenceId++; + } +} + +export { KubeflowTrainingService } \ No newline at end of file diff --git a/src/nni_manager/training_service/local/localTrainingService.ts b/src/nni_manager/training_service/local/localTrainingService.ts index d6e81a9344..2bf4032175 100644 --- a/src/nni_manager/training_service/local/localTrainingService.ts +++ b/src/nni_manager/training_service/local/localTrainingService.ts @@ -320,6 +320,7 @@ class LocalTrainingService implements TrainingService { { key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory }, { key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id }, { key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory }, + { key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.sequenceId.toString() }, { key: 'MULTI_PHASE', value: this.isMultiPhase.toString() } ]; } @@ -368,7 +369,6 @@ class LocalTrainingService implements TrainingService { await cpp.exec(`touch ${path.join(trialJobDetail.workingDirectory, '.nni', 'metrics')}`); await fs.promises.writeFile(path.join(trialJobDetail.workingDirectory, 'run.sh'), runScriptLines.join('\n'), { encoding: 'utf8', mode: 0o777 }); await this.writeParameterFile(trialJobDetail.workingDirectory, (trialJobDetail.form).hyperParameters); - await this.writeSequenceIdFile(trialJobId); const process: cp.ChildProcess = cp.exec(`bash ${path.join(trialJobDetail.workingDirectory, 'run.sh')}`); this.setTrialJobStatus(trialJobDetail, 'RUNNING'); @@ -450,13 +450,6 @@ class LocalTrainingService implements TrainingService { return this.trialSequenceId++; } - - private async writeSequenceIdFile(trialJobId: string): Promise { - const trialJobDetail: LocalTrialJobDetail = this.jobMap.get(trialJobId); - assert(trialJobDetail !== undefined); - const filepath: string = path.join(trialJobDetail.workingDirectory, '.nni', 'sequence_id'); - await fs.promises.writeFile(filepath, trialJobDetail.sequenceId.toString(), { encoding: 'utf8' }); - } } export { LocalTrainingService }; diff --git a/src/nni_manager/training_service/pai/paiData.ts b/src/nni_manager/training_service/pai/paiData.ts index a9e3e898b6..bb22f56900 100644 --- a/src/nni_manager/training_service/pai/paiData.ts +++ b/src/nni_manager/training_service/pai/paiData.ts @@ -60,10 +60,10 @@ else fi`; export const PAI_TRIAL_COMMAND_FORMAT: string = -`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} +`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} && cd $NNI_SYS_DIR && sh install_nni.sh -&& python3 -m nni_trial_tool.trial_keeper --trial_command '{4}' --nnimanager_ip '{5}' --nnimanager_port '{6}' ---pai_hdfs_output_dir '{7}' --pai_hdfs_host '{8}' --pai_user_name {9}`; +&& python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}' +--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10}`; export const PAI_OUTPUT_DIR_FORMAT: string = `hdfs://{0}:9000/`; diff --git a/src/nni_manager/training_service/pai/paiJobInfoCollector.ts b/src/nni_manager/training_service/pai/paiJobInfoCollector.ts index bda48387bf..d3c19a7f69 100644 --- a/src/nni_manager/training_service/pai/paiJobInfoCollector.ts +++ b/src/nni_manager/training_service/pai/paiJobInfoCollector.ts @@ -20,7 +20,6 @@ 'use strict'; import * as request from 'request'; -import { EventEmitter } from 'events'; import { Deferred } from 'ts-deferred'; import { getLogger, Logger } from '../../common/log'; import { NNIError, NNIErrorNames } from '../../common/errors'; @@ -43,7 +42,7 @@ export class PAIJobInfoCollector { this.finalStatuses = ['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED']; } - public async updateTrialStatusFromPAI(paiToken? : string, paiClusterConfig?: PAIClusterConfig) : Promise { + public async retrieveTrialStatus(paiToken? : string, paiClusterConfig?: PAIClusterConfig) : Promise { if (!paiClusterConfig || !paiToken) { return Promise.resolve(); } diff --git a/src/nni_manager/training_service/pai/paiJobRestServer.ts b/src/nni_manager/training_service/pai/paiJobRestServer.ts index 4bba44da85..c551e1f575 100644 --- a/src/nni_manager/training_service/pai/paiJobRestServer.ts +++ b/src/nni_manager/training_service/pai/paiJobRestServer.ts @@ -19,29 +19,17 @@ 'use strict'; -import * as assert from 'assert'; -import { Request, Response, Router } from 'express'; -import * as bodyParser from 'body-parser'; import * as component from '../../common/component'; -import { getBasePort } from '../../common/experimentStartupInfo'; -import { getExperimentId } from '../../common/experimentStartupInfo'; import { Inject } from 'typescript-ioc'; import { PAITrainingService } from './paiTrainingService'; -import { RestServer } from '../../common/restServer' +import { ClusterJobRestServer } from '../common/clusterJobRestServer' /** * PAI Training service Rest server, provides rest API to support pai job metrics update * */ @component.Singleton -export class PAIJobRestServer extends RestServer{ - /** NNI main rest service default port */ - private static readonly DEFAULT_PORT: number = 51189; - - private readonly API_ROOT_URL: string = '/api/v1/nni-pai'; - - private readonly expId: string = getExperimentId(); - +export class PAIJobRestServer extends ClusterJobRestServer{ @Inject private readonly paiTrainingService : PAITrainingService; @@ -50,61 +38,17 @@ export class PAIJobRestServer extends RestServer{ */ constructor() { super(); - const basePort: number = getBasePort(); - assert(basePort && basePort > 1024); - - this.port = basePort + 1; // PAIJobRestServer.DEFAULT_PORT; this.paiTrainingService = component.get(PAITrainingService); } - public get paiRestServerPort(): number { - if(!this.port) { - throw new Error('PAI Rest server port is undefined'); + protected handleTrialMetrics(jobId : string, metrics : any[]) : void { + // Split metrics array into single metric, then emit + // Warning: If not split metrics into single ones, the behavior will be UNKNOWN + for (const singleMetric of metrics) { + this.paiTrainingService.MetricsEmitter.emit('metric', { + id : jobId, + data : singleMetric + }); } - return this.port; - } - - /** - * NNIRestServer's own router registration - */ - protected registerRestHandler(): void { - this.app.use(bodyParser.json()); - this.app.use(this.API_ROOT_URL, this.createRestHandler()); - } - - private createRestHandler() : Router { - const router: Router = Router(); - - // tslint:disable-next-line:typedef - router.use((req: Request, res: Response, next) => { - this.log.info(`${req.method}: ${req.url}: body:\n${JSON.stringify(req.body, undefined, 4)}`); - res.setHeader('Content-Type', 'application/json'); - next(); - }); - - router.post(`/update-metrics/${this.expId}/:trialId`, (req: Request, res: Response) => { - try { - this.log.info(`Get update-metrics request, trial job id is ${req.params.trialId}`); - this.log.info(`update-metrics body is ${JSON.stringify(req.body)}`); - - // Split metrics array into single metric, then emit - // Warning: If not split metrics into single ones, the behavior will be UNKNOWN - for (const singleMetric of req.body.metrics) { - this.paiTrainingService.MetricsEmitter.emit('metric', { - id : req.body.jobId, - data : singleMetric - }); - } - - res.send(); - } - catch(err) { - this.log.error(`json parse metrics error: ${err}`); - res.status(500); - res.send(err.message); - } - }); - - return router; } } \ No newline at end of file diff --git a/src/nni_manager/training_service/pai/paiTrainingService.ts b/src/nni_manager/training_service/pai/paiTrainingService.ts index 013b568af8..e7fff6fdee 100644 --- a/src/nni_manager/training_service/pai/paiTrainingService.ts +++ b/src/nni_manager/training_service/pai/paiTrainingService.ts @@ -20,13 +20,13 @@ 'use strict' -import * as assert from 'assert'; import * as component from '../../common/component'; import * as cpp from 'child-process-promise'; import * as fs from 'fs'; import * as path from 'path'; import * as request from 'request'; +import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData'; import { Deferred } from 'ts-deferred'; import { EventEmitter } from 'events'; import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo'; @@ -40,7 +40,7 @@ import { } from '../../common/trainingService'; import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils'; import { PAIJobRestServer } from './paiJobRestServer' -import { PAITrialJobDetail, PAI_INSTALL_NNI_SHELL_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_LOG_PATH_FORMAT } from './paiData'; +import { PAITrialJobDetail, PAI_TRIAL_COMMAND_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_LOG_PATH_FORMAT } from './paiData'; import { PAIJobInfoCollector } from './paiJobInfoCollector'; import { String } from 'typescript-string-operations'; import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig'; @@ -67,7 +67,7 @@ class PAITrainingService implements TrainingService { private readonly hdfsDirPattern: string; private hdfsBaseDir: string | undefined; private hdfsOutputHost: string | undefined; - private trialSequenceId: number; + private nextTrialSequenceId: number; private paiRestServerPort?: number; constructor() { @@ -79,7 +79,7 @@ class PAITrainingService implements TrainingService { this.experimentId = getExperimentId(); this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap); this.hdfsDirPattern = 'hdfs://(?([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(?/.*)?'; - this.trialSequenceId = -1; + this.nextTrialSequenceId = -1; } public async run(): Promise { @@ -87,7 +87,7 @@ class PAITrainingService implements TrainingService { await restServer.start(); this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`); while (!this.stopping) { - await this.paiJobCollector.updateTrialStatusFromPAI(this.paiToken, this.paiClusterConfig); + await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig); await delay(3000); } } @@ -148,7 +148,7 @@ class PAITrainingService implements TrainingService { if(!this.paiRestServerPort) { const restServer: PAIJobRestServer = component.get(PAIJobRestServer); - this.paiRestServerPort = restServer.paiRestServerPort; + this.paiRestServerPort = restServer.clusterRestServerPort; } this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`); @@ -162,9 +162,8 @@ class PAITrainingService implements TrainingService { //create tmp trial working folder locally. await cpp.exec(`mkdir -p ${path.dirname(trialLocalTempFolder)}`); await cpp.exec(`cp -r ${this.paiTrialConfig.codeDir} ${trialLocalTempFolder}`); - await cpp.exec(`mkdir -p ${path.join(trialLocalTempFolder, '.nni')}`); - const runScriptContent : string = PAI_INSTALL_NNI_SHELL_FORMAT; + const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT; // Write NNI installation file to local tmp files await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' }); @@ -173,7 +172,6 @@ class PAITrainingService implements TrainingService { if(trialForm) { await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)), trialForm.hyperParameters.value, { encoding: 'utf8' }); - await fs.promises.writeFile(path.join(trialLocalTempFolder, '.nni', 'sequence_id'), trialSequenceId.toString(), { encoding: 'utf8' }); } // Step 1. Prepare PAI job configuration @@ -204,6 +202,7 @@ class PAITrainingService implements TrainingService { `$PWD/${trialJobId}/nnioutput`, trialJobId, this.experimentId, + trialSequenceId, this.paiTrialConfig.command, getIPV4Address(), this.paiRestServerPort, @@ -461,11 +460,11 @@ class PAITrainingService implements TrainingService { } private generateSequenceId(): number { - if (this.trialSequenceId === -1) { - this.trialSequenceId = getInitTrialSequenceId(); + if (this.nextTrialSequenceId === -1) { + this.nextTrialSequenceId = getInitTrialSequenceId(); } - return this.trialSequenceId++; + return this.nextTrialSequenceId++; } } diff --git a/src/nni_manager/training_service/remote_machine/remoteMachineData.ts b/src/nni_manager/training_service/remote_machine/remoteMachineData.ts index 9a27a61b23..27577c095e 100644 --- a/src/nni_manager/training_service/remote_machine/remoteMachineData.ts +++ b/src/nni_manager/training_service/remote_machine/remoteMachineData.ts @@ -112,6 +112,7 @@ export const REMOTEMACHINE_RUN_SHELL_FORMAT: string = `#!/bin/bash export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_TRIAL_JOB_ID={1} NNI_OUTPUT_DIR={0} export MULTI_PHASE={7} +export NNI_TRIAL_SEQ_ID={8} cd $NNI_SYS_DIR echo $$ >{2} eval {3}{4} 2>{5} diff --git a/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts b/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts index 4bb533da0a..eec86be093 100644 --- a/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts +++ b/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts @@ -442,6 +442,11 @@ class RemoteMachineTrainingService implements TrainingService { // for lint return; } + const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); + if (trialJobDetail === undefined) { + throw new Error(`Can not get trial job detail for job: ${trialJobId}`); + } + const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId); await SSHClientUtility.remoteExeCommand(`mkdir -p ${trialWorkingFolder}`, sshClient); @@ -463,7 +468,9 @@ class RemoteMachineTrainingService implements TrainingService { path.join(trialWorkingFolder, 'stderr'), path.join(trialWorkingFolder, '.nni', 'code'), /** Mark if the trial is multi-phase job */ - this.isMultiPhase); + this.isMultiPhase, + trialJobDetail.sequenceId.toString() + ); //create tmp trial working folder locally. await cpp.exec(`mkdir -p ${path.join(trialLocalTempFolder, '.nni')}`); @@ -475,7 +482,6 @@ class RemoteMachineTrainingService implements TrainingService { await SSHClientUtility.copyFileToRemote( path.join(trialLocalTempFolder, 'run.sh'), path.join(trialWorkingFolder, 'run.sh'), sshClient); await this.writeParameterFile(trialJobId, form.hyperParameters, rmScheduleInfo.rmMeta); - await this.writeSequenceIdFile(trialJobId, rmScheduleInfo.rmMeta); // Copy files in codeDir to remote working directory await SSHClientUtility.copyDirectoryToRemote(this.trialConfig.codeDir, trialWorkingFolder, sshClient); @@ -614,15 +620,6 @@ class RemoteMachineTrainingService implements TrainingService { return this.trialSequenceId++; } - private async writeSequenceIdFile(trialJobId: string, rmMeta: RemoteMachineMeta): Promise { - const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); - if (trialJobDetail === undefined) { - assert(false, `Can not get trial job detail for job: ${trialJobId}`); - } else { - await this.writeRemoteTrialFile(trialJobId, trialJobDetail.sequenceId.toString(), rmMeta, path.join('.nni', 'sequence_id')); - } - } - private async writeRemoteTrialFile(trialJobId: string, fileContent: string, rmMeta: RemoteMachineMeta, fileName: string): Promise { const sshClient: Client | undefined = this.machineSSHClientMap.get(rmMeta); diff --git a/src/nni_manager/training_service/test/kubeflowTrainingService.test.ts b/src/nni_manager/training_service/test/kubeflowTrainingService.test.ts new file mode 100644 index 0000000000..dc1a05afcc --- /dev/null +++ b/src/nni_manager/training_service/test/kubeflowTrainingService.test.ts @@ -0,0 +1,89 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import * as chai from 'chai'; +import * as chaiAsPromised from 'chai-as-promised'; +import * as fs from 'fs'; +import * as tmp from 'tmp'; +import * as component from '../../common/component'; +import { cleanupUnitTest, prepareUnitTest } from '../../common/utils'; +import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; +import { KubeflowTrainingService } from '../kubeflow/kubeflowTrainingService'; + +// TODO: copy mockedTrail.py to local folder +const localCodeDir: string = tmp.dirSync().name +const mockedTrialPath: string = './training_service/test/mockedTrial.py' +fs.copyFileSync(mockedTrialPath, localCodeDir + '/mockedTrial.py') + +describe('Unit Test for KubeflowTrainingService', () => { + let skip: boolean = false; + let testKubeflowConfig: any; + let testKubeflowTrialConfig : any; + try { + testKubeflowConfig = JSON.parse(fs.readFileSync('../../.vscode/kubeflowCluster.json', 'utf8')); + testKubeflowTrialConfig = `{\"command\":\"python3 mnist-keras.py\",\"codeDir\":\"/home/desy/nni/examples/trials/mnist",\"gpuNum\":\"1\",\"cpuNum\":\"2\",\"memoryMB\":\"8196\",\"image\":\"msranni/nni:v0.3.3\"}`; + } catch (err) { + console.log('Please configure rminfo.json to enable remote machine unit test.'); + //skip = true; + } + + let kubeflowTrainingService: KubeflowTrainingService; + + console.log(tmp.dirSync().name); + + before(() => { + chai.should(); + chai.use(chaiAsPromised); + prepareUnitTest(); + }); + + after(() => { + cleanupUnitTest(); + }); + + beforeEach(() => { + if (skip) { + return; + } + kubeflowTrainingService = component.get(KubeflowTrainingService); + }); + + afterEach(() => { + if (skip) { + return; + } + kubeflowTrainingService.cleanUp(); + }); + + it('Simple Test', async () => { + if (skip) { + return; + } + kubeflowTrainingService.setClusterMetadata(TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG, testKubeflowConfig), + kubeflowTrainingService.setClusterMetadata(TrialConfigMetadataKey.TRIAL_CONFIG, testKubeflowTrialConfig); + try { + const trialDetail = await kubeflowTrainingService.submitTrialJob({jobType : 'TRIAL'}); + } catch(error) { + console.log('Submit job failed:' + error); + chai.assert(error) + } + }); +}); \ No newline at end of file diff --git a/src/nni_manager/types/child-process-promise/index.d.ts b/src/nni_manager/types/child-process-promise/index.d.ts index 5b5dc31045..e0a877cdd8 100644 --- a/src/nni_manager/types/child-process-promise/index.d.ts +++ b/src/nni_manager/types/child-process-promise/index.d.ts @@ -1,3 +1,11 @@ declare module 'child-process-promise' { - export function exec(command: string): Promise; + export function exec(command: string): Promise; + + export namespace childProcessPromise { + interface Result { + stdout: string; + stderr: string, + message: string + } + } } \ No newline at end of file diff --git a/src/sdk/pynni/nni/platform/__init__.py b/src/sdk/pynni/nni/platform/__init__.py index fed452fc47..9195cc3c8c 100644 --- a/src/sdk/pynni/nni/platform/__init__.py +++ b/src/sdk/pynni/nni/platform/__init__.py @@ -27,7 +27,7 @@ from .standalone import * elif env_args.platform == 'unittest': from .test import * -elif env_args.platform in ('local', 'remote', 'pai'): +elif env_args.platform in ('local', 'remote', 'pai', 'kubeflow'): from .local import * else: raise RuntimeError('Unknown platform %s' % env_args.platform) diff --git a/src/sdk/pynni/nni/platform/local.py b/src/sdk/pynni/nni/platform/local.py index 032c18e71e..c864abc9ba 100644 --- a/src/sdk/pynni/nni/platform/local.py +++ b/src/sdk/pynni/nni/platform/local.py @@ -79,5 +79,4 @@ def send_metric(string): _metric_file.flush() def get_sequence_id(): - with open(os.path.join(_sysdir, '.nni', 'sequence_id'), 'r') as f: - return int(f.read().strip()) + return os.environ['NNI_TRIAL_SEQ_ID'] \ No newline at end of file diff --git a/tools/nni_cmd/config_schema.py b/tools/nni_cmd/config_schema.py index 19e36fea98..7461cd2b7c 100644 --- a/tools/nni_cmd/config_schema.py +++ b/tools/nni_cmd/config_schema.py @@ -28,7 +28,7 @@ 'trialConcurrency': And(int, lambda n: 1 <=n <= 999999), Optional('maxExecDuration'): Regex(r'^[1-9][0-9]*[s|m|h|d]$'), Optional('maxTrialNum'): And(int, lambda x: 1 <= x <= 99999), -'trainingServicePlatform': And(str, lambda x: x in ['remote', 'local', 'pai']), +'trainingServicePlatform': And(str, lambda x: x in ['remote', 'local', 'pai', 'kubeflow']), Optional('searchSpacePath'): os.path.exists, Optional('multiPhase'): bool, 'useAnnotation': bool, @@ -89,6 +89,28 @@ } } +kubeflow_trial_schema = { +'trial':{ + 'command': str, + 'codeDir': os.path.exists, + 'gpuNum': And(int, lambda x: 0 <= x <= 99999), + 'cpuNum': And(int, lambda x: 0 <= x <= 99999), + 'memoryMB': int, + 'image': str + } +} + +kubeflow_config_schema = { + 'kubeflowConfig':{ + 'operator': Or('tf-operator', 'mxnet-operator', 'pytorch-operato'), + 'nfs': { + 'server': str, + 'path': str + }, + 'kubernetesServer': str + } +} + machine_list_schima = { Optional('machineList'):[Or({ 'ip': str, @@ -108,4 +130,6 @@ REMOTE_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema, **machine_list_schima}) -PAI_CONFIG_SCHEMA = Schema({**common_schema, **pai_trial_schema, **pai_config_schema}) \ No newline at end of file +PAI_CONFIG_SCHEMA = Schema({**common_schema, **pai_trial_schema, **pai_config_schema}) + +KUBEFLOW_CONFIG_SCHEMA = Schema({**common_schema, **kubeflow_trial_schema, **kubeflow_config_schema}) diff --git a/tools/nni_cmd/launcher.py b/tools/nni_cmd/launcher.py index e5f2d5e32b..c902765c8f 100644 --- a/tools/nni_cmd/launcher.py +++ b/tools/nni_cmd/launcher.py @@ -37,7 +37,6 @@ import site from pathlib import Path - def get_log_path(config_file_name): '''generate stdout and stderr log path''' stdout_full_path = os.path.join(NNICTL_HOME_DIR, config_file_name, 'stdout') @@ -66,7 +65,7 @@ def start_rest_server(port, platform, mode, config_file_name, experiment_id=None 'You could use \'nnictl create --help\' to get help information' % port) exit(1) - if platform == 'pai' and detect_port(int(port) + 1): + if (platform == 'pai' or platform == 'kubeflow') and detect_port(int(port) + 1): print_error('PAI mode need an additional adjacent port %d, and the port %d is used by another process!\n' \ 'You could set another port to start experiment!\n' \ 'You could use \'nnictl create --help\' to get help information' % ((int(port) + 1), (int(port) + 1))) @@ -165,6 +164,23 @@ def set_pai_config(experiment_config, port, config_file_name): #set trial_config return set_trial_config(experiment_config, port, config_file_name), err_message +def set_kubeflow_config(experiment_config, port, config_file_name): + '''set kubeflow configuration''' + kubeflow_config_data = dict() + kubeflow_config_data['kubeflow_config'] = experiment_config['kubeflowConfig'] + response = rest_put(cluster_metadata_url(port), json.dumps(kubeflow_config_data), 20) + err_message = None + if not response or not response.status_code == 200: + if response is not None: + err_message = response.text + _, stderr_full_path = get_log_path(config_file_name) + with open(stderr_full_path, 'a+') as fout: + fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':'))) + return False, err_message + + #set trial_config + return set_trial_config(experiment_config, port, config_file_name), err_message + def set_experiment(experiment_config, mode, port, config_file_name): '''Call startExperiment (rest POST /experiment) with yaml file content''' request_data = dict() @@ -311,6 +327,22 @@ def launch_experiment(args, experiment_config, mode, config_file_name, experimen except Exception: raise Exception(ERROR_INFO % 'Restful server stopped!') exit(1) + + #set kubeflow config + if experiment_config['trainingServicePlatform'] == 'kubeflow': + print_normal('Setting kubeflow config...') + config_result, err_msg = set_kubeflow_config(experiment_config, args.port, config_file_name) + if config_result: + print_normal('Successfully set kubeflow config!') + else: + if err_msg: + print_error('Failed! Error is: {}'.format(err_msg)) + try: + cmds = ['pkill', '-P', str(rest_process.pid)] + call(cmds) + except Exception: + raise Exception(ERROR_INFO % 'Restful server stopped!') + exit(1) # start a new experiment print_normal('Starting experiment...') diff --git a/tools/nni_cmd/launcher_utils.py b/tools/nni_cmd/launcher_utils.py index 4de3286f90..1749e7ecf3 100644 --- a/tools/nni_cmd/launcher_utils.py +++ b/tools/nni_cmd/launcher_utils.py @@ -20,7 +20,7 @@ import os import json -from .config_schema import LOCAL_CONFIG_SCHEMA, REMOTE_CONFIG_SCHEMA, PAI_CONFIG_SCHEMA +from .config_schema import LOCAL_CONFIG_SCHEMA, REMOTE_CONFIG_SCHEMA, PAI_CONFIG_SCHEMA, KUBEFLOW_CONFIG_SCHEMA from .common_utils import get_json_content, print_error def expand_path(experiment_config, key): @@ -77,18 +77,17 @@ def validate_search_space_content(experiment_config): except: raise Exception('searchspace file is not a valid json format!') - - def validate_common_content(experiment_config): '''Validate whether the common values in experiment_config is valid''' if not experiment_config.get('trainingServicePlatform') or \ - experiment_config.get('trainingServicePlatform') not in ['local', 'remote', 'pai']: + experiment_config.get('trainingServicePlatform') not in ['local', 'remote', 'pai', 'kubeflow']: print_error('Please set correct trainingServicePlatform!') exit(0) schema_dict = { 'local': LOCAL_CONFIG_SCHEMA, 'remote': REMOTE_CONFIG_SCHEMA, - 'pai': PAI_CONFIG_SCHEMA + 'pai': PAI_CONFIG_SCHEMA, + 'kubeflow': KUBEFLOW_CONFIG_SCHEMA } try: schema_dict.get(experiment_config['trainingServicePlatform']).validate(experiment_config) diff --git a/tools/nni_trial_tool/constants.py b/tools/nni_trial_tool/constants.py index c767e03a89..3ae30a3a33 100644 --- a/tools/nni_trial_tool/constants.py +++ b/tools/nni_trial_tool/constants.py @@ -28,6 +28,8 @@ LOG_DIR = os.environ['NNI_OUTPUT_DIR'] +NNI_PLATFORM = os.environ['NNI_PLATFORM'] + STDOUT_FULL_PATH = os.path.join(LOG_DIR, 'stdout') STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr') diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index ae8fa8a8d5..5c8c7de684 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -28,7 +28,7 @@ from pyhdfs import HdfsClient from .hdfsClientUtility import copyDirectoryToHdfs -from .constants import HOME_DIR, LOG_DIR, STDOUT_FULL_PATH, STDERR_FULL_PATH +from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH from .metrics_reader import read_experiment_metrics logger = logging.getLogger('trial_keeper') @@ -52,17 +52,18 @@ def main_loop(args): if retCode is not None: print('subprocess terminated. Exit code is {}. Quit'.format(retCode)) - #copy local directory to hdfs - nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] - try: - hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) - if copyDirectoryToHdfs(nni_local_output_dir, args.pai_hdfs_output_dir, hdfs_client): - print('copy directory from {0} to {1} success!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) - else: - print('copy directory from {0} to {1} failed!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) - except Exception as exception: - print('HDFS copy directory got exception') - raise exception + if NNI_PLATFORM == 'pai': + # Copy local directory to hdfs for OpenPAI + nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] + try: + hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) + if copyDirectoryToHdfs(nni_local_output_dir, args.pai_hdfs_output_dir, hdfs_client): + print('copy directory from {0} to {1} success!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) + else: + print('copy directory from {0} to {1} failed!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) + except Exception as exception: + print('HDFS copy directory got exception') + raise exception ## Exit as the retCode of subprocess(trial) exit(retCode)