diff --git a/src/nni_manager/common/datastore.ts b/src/nni_manager/common/datastore.ts index 209292be73..769fe48017 100644 --- a/src/nni_manager/common/datastore.ts +++ b/src/nni_manager/common/datastore.ts @@ -70,6 +70,18 @@ interface TrialJobInfo { stderrPath?: string; } +interface HyperParameterFormat { + parameter_source: string; + parameters: Object; + parameter_id: number; +} + +interface ExportedDataFormat { + parameter: Object; + value: Object; + id: string; +} + abstract class DataStore { public abstract init(): Promise; public abstract close(): Promise; @@ -82,6 +94,8 @@ abstract class DataStore { public abstract getTrialJob(trialJobId: string): Promise; public abstract storeMetricData(trialJobId: string, data: string): Promise; public abstract getMetricData(trialJobId?: string, metricType?: MetricType): Promise; + public abstract exportTrialHpConfigs(): Promise; + public abstract getImportedData(): Promise; } abstract class Database { @@ -99,5 +113,5 @@ abstract class Database { export { DataStore, Database, TrialJobEvent, MetricType, MetricData, TrialJobInfo, - ExperimentProfileRecord, TrialJobEventRecord, MetricDataRecord + ExperimentProfileRecord, TrialJobEventRecord, MetricDataRecord, HyperParameterFormat, ExportedDataFormat }; diff --git a/src/nni_manager/common/manager.ts b/src/nni_manager/common/manager.ts index a20e4d010b..4933465b92 100644 --- a/src/nni_manager/common/manager.ts +++ b/src/nni_manager/common/manager.ts @@ -100,6 +100,7 @@ abstract class Manager { public abstract getExperimentProfile(): Promise; public abstract updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise; public abstract importData(data: string): Promise; + public abstract exportData(): Promise; public abstract addCustomizedTrialJob(hyperParams: string): Promise; public abstract cancelTrialJobByUser(trialJobId: string): Promise; diff --git a/src/nni_manager/core/nniDataStore.ts b/src/nni_manager/core/nniDataStore.ts index 4f9f84d4b6..86defd0971 100644 --- a/src/nni_manager/core/nniDataStore.ts +++ b/src/nni_manager/core/nniDataStore.ts @@ -24,7 +24,8 @@ import { Deferred } from 'ts-deferred'; import * as component from '../common/component'; import { Database, DataStore, MetricData, MetricDataRecord, MetricType, - TrialJobEvent, TrialJobEventRecord, TrialJobInfo } from '../common/datastore'; + TrialJobEvent, TrialJobEventRecord, TrialJobInfo, HyperParameterFormat, + ExportedDataFormat } from '../common/datastore'; import { NNIError } from '../common/errors'; import { getExperimentId, isNewExperiment } from '../common/experimentStartupInfo'; import { getLogger, Logger } from '../common/log'; @@ -171,6 +172,61 @@ class NNIDataStore implements DataStore { return this.db.queryMetricData(trialJobId, metricType); } + public async exportTrialHpConfigs(): Promise { + const jobs: TrialJobInfo[] = await this.listTrialJobs(); + let exportedData: ExportedDataFormat[] = []; + for (const job of jobs) { + if (job.hyperParameters && job.finalMetricData) { + if (job.hyperParameters.length === 1 && job.finalMetricData.length === 1) { + // optimization for non-multi-phase case + const parameters: HyperParameterFormat = JSON.parse(job.hyperParameters[0]); + const oneEntry: ExportedDataFormat = { + parameter: parameters.parameters, + value: JSON.parse(job.finalMetricData[0].data), + id: job.id + }; + exportedData.push(oneEntry); + } else { + let paraMap: Map = new Map(); + let metricMap: Map = new Map(); + for (const eachPara of job.hyperParameters) { + const parameters: HyperParameterFormat = JSON.parse(eachPara); + paraMap.set(parameters.parameter_id, parameters.parameters); + } + for (const eachMetric of job.finalMetricData) { + const value: Object = JSON.parse(eachMetric.data); + metricMap.set(Number(eachMetric.parameterId), value); + } + paraMap.forEach((value: Object, key: number) => { + const metricValue: Object | undefined = metricMap.get(key); + if (metricValue) { + const oneEntry: ExportedDataFormat = { + parameter: value, + value: metricValue, + id: job.id + }; + exportedData.push(oneEntry); + } + }); + } + } + } + + return JSON.stringify(exportedData); + } + + public async getImportedData(): Promise { + let importedData: string[] = []; + const importDataEvents: TrialJobEventRecord[] = await this.db.queryTrialJobEvent(undefined, 'IMPORT_DATA'); + for (const event of importDataEvents) { + if (event.data) { + importedData.push(event.data); + } + } + + return importedData; + } + private async queryTrialJobs(status?: TrialJobStatus, trialJobId?: string): Promise { const result: TrialJobInfo[] = []; const trialJobEvents: TrialJobEventRecord[] = await this.db.queryTrialJobEvent(trialJobId); diff --git a/src/nni_manager/core/nnimanager.ts b/src/nni_manager/core/nnimanager.ts index 6997a57380..bc564289f8 100644 --- a/src/nni_manager/core/nnimanager.ts +++ b/src/nni_manager/core/nnimanager.ts @@ -58,6 +58,8 @@ class NNIManager implements Manager { private status: NNIManagerStatus; private waitingTrials: string[]; private trialJobs: Map; + private trialDataForTuner: string; + private trialJobMetricListener: (metric: TrialJobMetric) => void; constructor() { @@ -69,6 +71,7 @@ class NNIManager implements Manager { this.dispatcherPid = 0; this.waitingTrials = []; this.trialJobs = new Map(); + this.trialDataForTuner = ''; this.log = getLogger(); this.dataStore = component.get(DataStore); @@ -116,6 +119,10 @@ class NNIManager implements Manager { return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data); } + public async exportData(): Promise { + return this.dataStore.exportTrialHpConfigs(); + } + public addCustomizedTrialJob(hyperParams: string): Promise { if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) { return Promise.reject( @@ -212,6 +219,16 @@ class NNIManager implements Manager { .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING') .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id))); + // Collect generated trials and imported trials + const finishedTrialData: string = await this.exportData(); + const importedData: string[] = await this.dataStore.getImportedData(); + let trialData: Object[] = JSON.parse(finishedTrialData); + for (const oneImportedData of importedData) { + // do not deduplicate + trialData = trialData.concat(JSON.parse(oneImportedData)); + } + this.trialDataForTuner = JSON.stringify(trialData); + if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration && this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum && this.experimentProfile.endTime) { @@ -647,6 +664,12 @@ class NNIManager implements Manager { switch (commandType) { case INITIALIZED: // Tuner is intialized, search space is set, request tuner to generate hyper parameters + if (this.trialDataForTuner.length > 0) { + if (this.dispatcher === undefined) { + throw new Error('Dispatcher error: tuner has not been setup'); + } + this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner); + } this.requestTrialJobs(this.experimentProfile.params.trialConcurrency); break; case NEW_TRIAL_JOB: diff --git a/src/nni_manager/core/test/mockedDatastore.ts b/src/nni_manager/core/test/mockedDatastore.ts index d08b5b801b..1e4c580a04 100644 --- a/src/nni_manager/core/test/mockedDatastore.ts +++ b/src/nni_manager/core/test/mockedDatastore.ts @@ -210,6 +210,16 @@ class MockedDataStore implements DataStore { return result; } + async exportTrialHpConfigs(): Promise { + const ret: string = ''; + return Promise.resolve(ret); + } + + async getImportedData(): Promise { + const ret: string[] = []; + return Promise.resolve(ret); + } + public getTrialJob(trialJobId: string): Promise { throw new Error("Method not implemented."); } diff --git a/src/nni_manager/rest_server/restHandler.ts b/src/nni_manager/rest_server/restHandler.ts index ded9bd6232..7513aec496 100644 --- a/src/nni_manager/rest_server/restHandler.ts +++ b/src/nni_manager/rest_server/restHandler.ts @@ -72,6 +72,7 @@ class NNIRestHandler { this.addTrialJob(router); this.cancelTrialJob(router); this.getMetricData(router); + this.exportData(router); // Express-joi-validator configuration router.use((err: any, req: Request, res: Response, next: any) => { @@ -261,6 +262,16 @@ class NNIRestHandler { }); } + private exportData(router: Router): void { + router.get('/export-data', (req: Request, res: Response) => { + this.nniManager.exportData().then((exportedData: string) => { + res.send(exportedData); + }).catch((err: Error) => { + this.handle_error(err, res); + }); + }); + } + private setErrorPathForFailedJob(jobInfo: TrialJobInfo): TrialJobInfo { if (jobInfo === undefined || jobInfo.status !== 'FAILED' || jobInfo.logPath === undefined) { return jobInfo; diff --git a/src/nni_manager/rest_server/test/mockedNNIManager.ts b/src/nni_manager/rest_server/test/mockedNNIManager.ts index d65ad1ed62..299c473aa6 100644 --- a/src/nni_manager/rest_server/test/mockedNNIManager.ts +++ b/src/nni_manager/rest_server/test/mockedNNIManager.ts @@ -49,6 +49,10 @@ export class MockedNNIManager extends Manager { public importData(data: string): Promise { return Promise.resolve(); } + public async exportData(): Promise { + const ret: string = ''; + return Promise.resolve(ret); + } public getTrialJobStatistics(): Promise { const deferred: Deferred = new Deferred(); deferred.resolve([{ diff --git a/src/sdk/pynni/nni/hyperopt_tuner/hyperopt_tuner.py b/src/sdk/pynni/nni/hyperopt_tuner/hyperopt_tuner.py index 650d4c2ffc..47489a11df 100644 --- a/src/sdk/pynni/nni/hyperopt_tuner/hyperopt_tuner.py +++ b/src/sdk/pynni/nni/hyperopt_tuner/hyperopt_tuner.py @@ -153,14 +153,14 @@ def _add_index(in_x, parameter): Will change to format in hyperopt, like: {'dropout_rate': 0.8, 'conv_size': {'_index': 1, '_value': 3}, 'hidden_size': {'_index': 1, '_value': 512}} """ - if TYPE not in in_x: # if at the top level + if NodeType.TYPE not in in_x: # if at the top level out_y = dict() for key, value in parameter.items(): out_y[key] = _add_index(in_x[key], value) return out_y elif isinstance(in_x, dict): - value_type = in_x[TYPE] - value_format = in_x[VALUE] + value_type = in_x[NodeType.TYPE] + value_format = in_x[NodeType.VALUE] if value_type == "choice": choice_name = parameter[0] if isinstance(parameter, list) else parameter @@ -173,15 +173,14 @@ def _add_index(in_x, parameter): choice_value_format = item[1] if choice_key == choice_name: return { - INDEX: - pos, - VALUE: [ + NodeType.INDEX: pos, + NodeType.VALUE: [ choice_name, _add_index(choice_value_format, parameter[1]) ] } elif choice_name == item: - return {INDEX: pos, VALUE: item} + return {NodeType.INDEX: pos, NodeType.VALUE: item} else: return parameter diff --git a/tools/nni_cmd/nnictl_utils.py b/tools/nni_cmd/nnictl_utils.py index 0a0ea2dcf9..9339a41a86 100644 --- a/tools/nni_cmd/nnictl_utils.py +++ b/tools/nni_cmd/nnictl_utils.py @@ -26,9 +26,9 @@ import time from subprocess import call, check_output from .rest_utils import rest_get, rest_delete, check_rest_server_quick, check_response +from .url_utils import trial_jobs_url, experiment_url, trial_job_id_url, export_data_url from pyhdfs import HdfsClient, HdfsFileNotFoundException from .config_utils import Config, Experiments, HDFSConfig -from .url_utils import trial_jobs_url, experiment_url, trial_job_id_url from .constants import NNICTL_HOME_DIR, EXPERIMENT_INFORMATION_FORMAT, EXPERIMENT_DETAIL_FORMAT, \ EXPERIMENT_MONITOR_INFO, TRIAL_MONITOR_HEAD, TRIAL_MONITOR_CONTENT, TRIAL_MONITOR_TAIL, REST_TIME_OUT from .common_utils import print_normal, print_error, print_warning, detect_process @@ -451,30 +451,9 @@ def monitor_experiment(args): print_error(exception) exit(1) - -def parse_trial_data(content): - """output: List[Dict]""" - trial_records = [] - for trial_data in content: - for phase_i in range(len(trial_data['hyperParameters'])): - hparam = json.loads(trial_data['hyperParameters'][phase_i])['parameters'] - hparam['id'] = trial_data['id'] - if 'finalMetricData' in trial_data.keys() and phase_i < len(trial_data['finalMetricData']): - reward = json.loads(trial_data['finalMetricData'][phase_i]['data']) - if isinstance(reward, (float, int)): - dict_tmp = {**hparam, **{'reward': reward}} - elif isinstance(reward, dict): - dict_tmp = {**hparam, **reward} - else: - raise ValueError("Invalid finalMetricsData format: {}/{}".format(type(reward), reward)) - else: - dict_tmp = hparam - trial_records.append(dict_tmp) - return trial_records - def export_trials_data(args): - """export experiment metadata to csv - """ + '''export experiment metadata to csv + ''' nni_config = Config(get_config_filename(args)) rest_port = nni_config.get_config('restServerPort') rest_pid = nni_config.get_config('restServerPid') @@ -483,25 +462,27 @@ def export_trials_data(args): return running, response = check_rest_server_quick(rest_port) if running: - response = rest_get(trial_jobs_url(rest_port), 20) + response = rest_get(export_data_url(rest_port), 20) if response is not None and check_response(response): - content = json.loads(response.text) - # dframe = pd.DataFrame.from_records([parse_trial_data(t_data) for t_data in content]) - # dframe.to_csv(args.csv_path, sep='\t') - records = parse_trial_data(content) if args.type == 'json': - json_records = [] - for trial in records: - value = trial.pop('reward', None) - trial_id = trial.pop('id', None) - json_records.append({'parameter': trial, 'value': value, 'id': trial_id}) - with open(args.path, 'w') as file: - if args.type == 'csv': - writer = csv.DictWriter(file, set.union(*[set(r.keys()) for r in records])) + with open(args.path, 'w') as file: + file.write(response.text) + elif args.type == 'csv': + content = json.loads(response.text) + trial_records = [] + for record in content: + if not isinstance(record['value'], (float, int)): + formated_record = {**record['parameter'], **record['value'], **{'id': record['id']}} + else: + formated_record = {**record['parameter'], **{'reward': record['value'], 'id': record['id']}} + trial_records.append(formated_record) + with open(args.path, 'w') as file: + writer = csv.DictWriter(file, set.union(*[set(r.keys()) for r in trial_records])) writer.writeheader() - writer.writerows(records) - else: - json.dump(json_records, file) + writer.writerows(trial_records) + else: + print_error('Unknown type: %s' % args.type) + exit(1) else: print_error('Export failed...') else: diff --git a/tools/nni_cmd/url_utils.py b/tools/nni_cmd/url_utils.py index 0e77a77b99..0dce341a59 100644 --- a/tools/nni_cmd/url_utils.py +++ b/tools/nni_cmd/url_utils.py @@ -35,6 +35,8 @@ TRIAL_JOBS_API = '/trial-jobs' +EXPORT_DATA_API = '/export-data' + TENSORBOARD_API = '/tensorboard' @@ -68,6 +70,11 @@ def trial_job_id_url(port, job_id): return '{0}:{1}{2}{3}/:{4}'.format(BASE_URL, port, API_ROOT_URL, TRIAL_JOBS_API, job_id) +def export_data_url(port): + '''get export_data url''' + return '{0}:{1}{2}{3}'.format(BASE_URL, port, API_ROOT_URL, EXPORT_DATA_API) + + def tensorboard_url(port): '''get tensorboard url''' return '{0}:{1}{2}{3}'.format(BASE_URL, port, API_ROOT_URL, TENSORBOARD_API)