Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

import finished trial data to tuner when experiment is resumed #1107

Merged
merged 11 commits into from
May 27, 2019
16 changes: 15 additions & 1 deletion src/nni_manager/common/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ interface TrialJobInfo {
stderrPath?: string;
}

interface HyperParameterFormat {
parameter_source: string;
parameters: Object;
parameter_id: number;
}

interface ExportedDataFormat {
parameter: Object;
value: Object;
id: string;
}

abstract class DataStore {
public abstract init(): Promise<void>;
public abstract close(): Promise<void>;
Expand All @@ -82,6 +94,8 @@ abstract class DataStore {
public abstract getTrialJob(trialJobId: string): Promise<TrialJobInfo>;
public abstract storeMetricData(trialJobId: string, data: string): Promise<void>;
public abstract getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]>;
public abstract exportTrialHpConfigs(): Promise<string>;
public abstract getImportedData(): Promise<string[]>;
}

abstract class Database {
Expand All @@ -99,5 +113,5 @@ abstract class Database {

export {
DataStore, Database, TrialJobEvent, MetricType, MetricData, TrialJobInfo,
ExperimentProfileRecord, TrialJobEventRecord, MetricDataRecord
ExperimentProfileRecord, TrialJobEventRecord, MetricDataRecord, HyperParameterFormat, ExportedDataFormat
};
1 change: 1 addition & 0 deletions src/nni_manager/common/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ abstract class Manager {
public abstract getExperimentProfile(): Promise<ExperimentProfile>;
public abstract updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void>;
public abstract importData(data: string): Promise<void>;
public abstract exportData(): Promise<string>;

public abstract addCustomizedTrialJob(hyperParams: string): Promise<void>;
public abstract cancelTrialJobByUser(trialJobId: string): Promise<void>;
Expand Down
58 changes: 57 additions & 1 deletion src/nni_manager/core/nniDataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import { Deferred } from 'ts-deferred';

import * as component from '../common/component';
import { Database, DataStore, MetricData, MetricDataRecord, MetricType,
TrialJobEvent, TrialJobEventRecord, TrialJobInfo } from '../common/datastore';
TrialJobEvent, TrialJobEventRecord, TrialJobInfo, HyperParameterFormat,
ExportedDataFormat } from '../common/datastore';
import { NNIError } from '../common/errors';
import { getExperimentId, isNewExperiment } from '../common/experimentStartupInfo';
import { getLogger, Logger } from '../common/log';
Expand Down Expand Up @@ -171,6 +172,61 @@ class NNIDataStore implements DataStore {
return this.db.queryMetricData(trialJobId, metricType);
}

public async exportTrialHpConfigs(): Promise<string> {
const jobs: TrialJobInfo[] = await this.listTrialJobs();
let exportedData: ExportedDataFormat[] = [];
for (const job of jobs) {
if (job.hyperParameters && job.finalMetricData) {
if (job.hyperParameters.length === 1 && job.finalMetricData.length === 1) {
// optimization for non-multi-phase case
const parameters: HyperParameterFormat = <HyperParameterFormat>JSON.parse(job.hyperParameters[0]);
const oneEntry: ExportedDataFormat = {
parameter: parameters.parameters,
value: JSON.parse(job.finalMetricData[0].data),
id: job.id
};
exportedData.push(oneEntry);
} else {
let paraMap: Map<number, Object> = new Map();
let metricMap: Map<number, Object> = new Map();
for (const eachPara of job.hyperParameters) {
const parameters: HyperParameterFormat = <HyperParameterFormat>JSON.parse(eachPara);
paraMap.set(parameters.parameter_id, parameters.parameters);
}
for (const eachMetric of job.finalMetricData) {
const value: Object = JSON.parse(eachMetric.data);
metricMap.set(Number(eachMetric.parameterId), value);
}
paraMap.forEach((value: Object, key: number) => {
const metricValue: Object | undefined = metricMap.get(key);
if (metricValue) {
const oneEntry: ExportedDataFormat = {
parameter: value,
value: metricValue,
id: job.id
};
exportedData.push(oneEntry);
}
});
}
}
}

return JSON.stringify(exportedData);
}

public async getImportedData(): Promise<string[]> {
let importedData: string[] = [];
const importDataEvents: TrialJobEventRecord[] = await this.db.queryTrialJobEvent(undefined, 'IMPORT_DATA');
for (const event of importDataEvents) {
if (event.data) {
importedData.push(event.data);
}
}

return importedData;
}

private async queryTrialJobs(status?: TrialJobStatus, trialJobId?: string): Promise<TrialJobInfo[]> {
const result: TrialJobInfo[] = [];
const trialJobEvents: TrialJobEventRecord[] = await this.db.queryTrialJobEvent(trialJobId);
Expand Down
23 changes: 23 additions & 0 deletions src/nni_manager/core/nnimanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class NNIManager implements Manager {
private status: NNIManagerStatus;
private waitingTrials: string[];
private trialJobs: Map<string, TrialJobDetail>;
private trialDataForTuner: string;

private trialJobMetricListener: (metric: TrialJobMetric) => void;

constructor() {
Expand All @@ -69,6 +71,7 @@ class NNIManager implements Manager {
this.dispatcherPid = 0;
this.waitingTrials = [];
this.trialJobs = new Map<string, TrialJobDetail>();
this.trialDataForTuner = '';

this.log = getLogger();
this.dataStore = component.get(DataStore);
Expand Down Expand Up @@ -116,6 +119,10 @@ class NNIManager implements Manager {
return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
}

public async exportData(): Promise<string> {
return this.dataStore.exportTrialHpConfigs();
}

public addCustomizedTrialJob(hyperParams: string): Promise<void> {
if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
return Promise.reject(
Expand Down Expand Up @@ -212,6 +219,16 @@ class NNIManager implements Manager {
.filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
.map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));

// Collect generated trials and imported trials
const finishedTrialData: string = await this.exportData();
const importedData: string[] = await this.dataStore.getImportedData();
let trialData: Object[] = JSON.parse(finishedTrialData);
for (const oneImportedData of importedData) {
// do not deduplicate
trialData = trialData.concat(<Object[]>JSON.parse(oneImportedData));
}
this.trialDataForTuner = JSON.stringify(trialData);

if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
this.experimentProfile.endTime) {
Expand Down Expand Up @@ -647,6 +664,12 @@ class NNIManager implements Manager {
switch (commandType) {
case INITIALIZED:
// Tuner is intialized, search space is set, request tuner to generate hyper parameters
if (this.trialDataForTuner.length > 0) {
if (this.dispatcher === undefined) {
throw new Error('Dispatcher error: tuner has not been setup');
}
this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
}
this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
break;
case NEW_TRIAL_JOB:
Expand Down
10 changes: 10 additions & 0 deletions src/nni_manager/core/test/mockedDatastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,16 @@ class MockedDataStore implements DataStore {
return result;
}

async exportTrialHpConfigs(): Promise<string> {
const ret: string = '';
return Promise.resolve(ret);
}

async getImportedData(): Promise<string[]> {
const ret: string[] = [];
return Promise.resolve(ret);
}

public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
throw new Error("Method not implemented.");
}
Expand Down
11 changes: 11 additions & 0 deletions src/nni_manager/rest_server/restHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class NNIRestHandler {
this.addTrialJob(router);
this.cancelTrialJob(router);
this.getMetricData(router);
this.exportData(router);

// Express-joi-validator configuration
router.use((err: any, req: Request, res: Response, next: any) => {
Expand Down Expand Up @@ -261,6 +262,16 @@ class NNIRestHandler {
});
}

private exportData(router: Router): void {
router.get('/export-data', (req: Request, res: Response) => {
this.nniManager.exportData().then((exportedData: string) => {
res.send(exportedData);
}).catch((err: Error) => {
this.handle_error(err, res);
});
});
}

private setErrorPathForFailedJob(jobInfo: TrialJobInfo): TrialJobInfo {
if (jobInfo === undefined || jobInfo.status !== 'FAILED' || jobInfo.logPath === undefined) {
return jobInfo;
Expand Down
4 changes: 4 additions & 0 deletions src/nni_manager/rest_server/test/mockedNNIManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ export class MockedNNIManager extends Manager {
public importData(data: string): Promise<void> {
return Promise.resolve();
}
public async exportData(): Promise<string> {
const ret: string = '';
return Promise.resolve(ret);
}
public getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
const deferred: Deferred<TrialJobStatistics[]> = new Deferred<TrialJobStatistics[]>();
deferred.resolve([{
Expand Down
13 changes: 6 additions & 7 deletions src/sdk/pynni/nni/hyperopt_tuner/hyperopt_tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,14 @@ def _add_index(in_x, parameter):
Will change to format in hyperopt, like:
{'dropout_rate': 0.8, 'conv_size': {'_index': 1, '_value': 3}, 'hidden_size': {'_index': 1, '_value': 512}}
"""
if TYPE not in in_x: # if at the top level
if NodeType.TYPE not in in_x: # if at the top level
out_y = dict()
for key, value in parameter.items():
out_y[key] = _add_index(in_x[key], value)
return out_y
elif isinstance(in_x, dict):
value_type = in_x[TYPE]
value_format = in_x[VALUE]
value_type = in_x[NodeType.TYPE]
value_format = in_x[NodeType.VALUE]
if value_type == "choice":
choice_name = parameter[0] if isinstance(parameter,
list) else parameter
Expand All @@ -173,15 +173,14 @@ def _add_index(in_x, parameter):
choice_value_format = item[1]
if choice_key == choice_name:
return {
INDEX:
pos,
VALUE: [
NodeType.INDEX: pos,
NodeType.VALUE: [
choice_name,
_add_index(choice_value_format, parameter[1])
]
}
elif choice_name == item:
return {INDEX: pos, VALUE: item}
return {NodeType.INDEX: pos, NodeType.VALUE: item}
else:
return parameter

Expand Down
61 changes: 21 additions & 40 deletions tools/nni_cmd/nnictl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import time
from subprocess import call, check_output
from .rest_utils import rest_get, rest_delete, check_rest_server_quick, check_response
from .url_utils import trial_jobs_url, experiment_url, trial_job_id_url, export_data_url
from pyhdfs import HdfsClient, HdfsFileNotFoundException
from .config_utils import Config, Experiments, HDFSConfig
from .url_utils import trial_jobs_url, experiment_url, trial_job_id_url
from .constants import NNICTL_HOME_DIR, EXPERIMENT_INFORMATION_FORMAT, EXPERIMENT_DETAIL_FORMAT, \
EXPERIMENT_MONITOR_INFO, TRIAL_MONITOR_HEAD, TRIAL_MONITOR_CONTENT, TRIAL_MONITOR_TAIL, REST_TIME_OUT
from .common_utils import print_normal, print_error, print_warning, detect_process
Expand Down Expand Up @@ -451,30 +451,9 @@ def monitor_experiment(args):
print_error(exception)
exit(1)


def parse_trial_data(content):
"""output: List[Dict]"""
trial_records = []
for trial_data in content:
for phase_i in range(len(trial_data['hyperParameters'])):
hparam = json.loads(trial_data['hyperParameters'][phase_i])['parameters']
hparam['id'] = trial_data['id']
if 'finalMetricData' in trial_data.keys() and phase_i < len(trial_data['finalMetricData']):
reward = json.loads(trial_data['finalMetricData'][phase_i]['data'])
if isinstance(reward, (float, int)):
dict_tmp = {**hparam, **{'reward': reward}}
elif isinstance(reward, dict):
dict_tmp = {**hparam, **reward}
else:
raise ValueError("Invalid finalMetricsData format: {}/{}".format(type(reward), reward))
else:
dict_tmp = hparam
trial_records.append(dict_tmp)
return trial_records

def export_trials_data(args):
"""export experiment metadata to csv
"""
'''export experiment metadata to csv
'''
nni_config = Config(get_config_filename(args))
rest_port = nni_config.get_config('restServerPort')
rest_pid = nni_config.get_config('restServerPid')
Expand All @@ -483,25 +462,27 @@ def export_trials_data(args):
return
running, response = check_rest_server_quick(rest_port)
if running:
response = rest_get(trial_jobs_url(rest_port), 20)
response = rest_get(export_data_url(rest_port), 20)
if response is not None and check_response(response):
content = json.loads(response.text)
# dframe = pd.DataFrame.from_records([parse_trial_data(t_data) for t_data in content])
# dframe.to_csv(args.csv_path, sep='\t')
records = parse_trial_data(content)
if args.type == 'json':
json_records = []
for trial in records:
value = trial.pop('reward', None)
trial_id = trial.pop('id', None)
json_records.append({'parameter': trial, 'value': value, 'id': trial_id})
with open(args.path, 'w') as file:
if args.type == 'csv':
writer = csv.DictWriter(file, set.union(*[set(r.keys()) for r in records]))
with open(args.path, 'w') as file:
file.write(response.text)
elif args.type == 'csv':
content = json.loads(response.text)
trial_records = []
for record in content:
if not isinstance(record['value'], (float, int)):
formated_record = {**record['parameter'], **record['value'], **{'id': record['id']}}
else:
formated_record = {**record['parameter'], **{'reward': record['value'], 'id': record['id']}}
trial_records.append(formated_record)
with open(args.path, 'w') as file:
writer = csv.DictWriter(file, set.union(*[set(r.keys()) for r in trial_records]))
writer.writeheader()
writer.writerows(records)
else:
json.dump(json_records, file)
writer.writerows(trial_records)
else:
print_error('Unknown type: %s' % args.type)
exit(1)
else:
print_error('Export failed...')
else:
Expand Down
7 changes: 7 additions & 0 deletions tools/nni_cmd/url_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

TRIAL_JOBS_API = '/trial-jobs'

EXPORT_DATA_API = '/export-data'

TENSORBOARD_API = '/tensorboard'


Expand Down Expand Up @@ -68,6 +70,11 @@ def trial_job_id_url(port, job_id):
return '{0}:{1}{2}{3}/:{4}'.format(BASE_URL, port, API_ROOT_URL, TRIAL_JOBS_API, job_id)


def export_data_url(port):
'''get export_data url'''
return '{0}:{1}{2}{3}'.format(BASE_URL, port, API_ROOT_URL, EXPORT_DATA_API)


def tensorboard_url(port):
'''get tensorboard url'''
return '{0}:{1}{2}{3}'.format(BASE_URL, port, API_ROOT_URL, TENSORBOARD_API)
Expand Down