diff --git a/src/nni_manager/common/errors.ts b/src/nni_manager/common/errors.ts index bcd06c84ba..8a22482c65 100644 --- a/src/nni_manager/common/errors.ts +++ b/src/nni_manager/common/errors.ts @@ -26,9 +26,12 @@ export namespace NNIErrorNames { } export class NNIError extends Error { - constructor (name: string, message: string) { + constructor (name: string, message: string, err?: Error) { super(message); this.name = name; + if (err !== undefined) { + this.stack = err.stack; + } } } diff --git a/src/nni_manager/core/ipcInterface.ts b/src/nni_manager/core/ipcInterface.ts index 260ea86a5a..e0b12d4573 100644 --- a/src/nni_manager/core/ipcInterface.ts +++ b/src/nni_manager/core/ipcInterface.ts @@ -23,6 +23,7 @@ import * as assert from 'assert'; import { ChildProcess } from 'child_process'; import { EventEmitter } from 'events'; import { Readable, Writable } from 'stream'; +import { NNIError } from '../common/errors'; import { getLogger, Logger } from '../common/log'; import * as CommandType from './commands'; @@ -98,9 +99,14 @@ class IpcInterface { public sendCommand(commandType: string, content: string = ''): void { this.logger.debug(`ipcInterface command type: [${commandType}], content:[${content}]`); assert.ok(this.acceptCommandTypes.has(commandType)); - const data: Buffer = encodeCommand(commandType, content); - if (!this.outgoingStream.write(data)) { - this.logger.error('Commands jammed in buffer!'); + + try { + const data: Buffer = encodeCommand(commandType, content); + if (!this.outgoingStream.write(data)) { + this.logger.error('Commands jammed in buffer!'); + } + } catch (err) { + throw new NNIError('Dispatcher Error', `Dispatcher Error: ${err.message}`, err); } } diff --git a/src/nni_manager/core/nniDataStore.ts b/src/nni_manager/core/nniDataStore.ts index c112340f64..0ccfdd9bb6 100644 --- a/src/nni_manager/core/nniDataStore.ts +++ b/src/nni_manager/core/nniDataStore.ts @@ -25,8 +25,8 @@ import { Deferred } from 'ts-deferred'; import * as component from '../common/component'; import { Database, DataStore, MetricData, MetricDataRecord, MetricType, TrialJobEvent, TrialJobEventRecord, TrialJobInfo } from '../common/datastore'; -import { isNewExperiment } from '../common/experimentStartupInfo'; -import { getExperimentId } from '../common/experimentStartupInfo'; +import { NNIError } from '../common/errors'; +import { getExperimentId, isNewExperiment } from '../common/experimentStartupInfo'; import { getLogger, Logger } from '../common/log'; import { ExperimentProfile, TrialJobStatistics } from '../common/manager'; import { TrialJobStatus } from '../common/trainingService'; @@ -72,7 +72,11 @@ class NNIDataStore implements DataStore { } public async storeExperimentProfile(experimentProfile: ExperimentProfile): Promise { - await this.db.storeExperimentProfile(experimentProfile); + try { + await this.db.storeExperimentProfile(experimentProfile); + } catch (err) { + throw new NNIError('Datastore error', `Datastore error: ${err.message}`, err); + } } public getExperimentProfile(experimentId: string): Promise { @@ -82,7 +86,11 @@ class NNIDataStore implements DataStore { public storeTrialJobEvent(event: TrialJobEvent, trialJobId: string, data?: string, logPath?: string): Promise { this.log.debug(`storeTrialJobEvent: event: ${event}, data: ${data}, logpath: ${logPath}`); - return this.db.storeTrialJobEvent(event, trialJobId, data, logPath); + return this.db.storeTrialJobEvent(event, trialJobId, data, logPath).catch( + (err: Error) => { + throw new NNIError('Datastore error', `Datastore error: ${err.message}`, err); + } + ); } public async getTrialJobStatistics(): Promise { @@ -128,14 +136,18 @@ class NNIDataStore implements DataStore { return; } assert(trialJobId === metrics.trial_job_id); - await this.db.storeMetricData(trialJobId, JSON.stringify({ - trialJobId: metrics.trial_job_id, - parameterId: metrics.parameter_id, - type: metrics.type, - sequence: metrics.sequence, - data: metrics.value, - timestamp: Date.now() - })); + try { + await this.db.storeMetricData(trialJobId, JSON.stringify({ + trialJobId: metrics.trial_job_id, + parameterId: metrics.parameter_id, + type: metrics.type, + sequence: metrics.sequence, + data: metrics.value, + timestamp: Date.now() + })); + } catch (err) { + throw new NNIError('Datastore error', `Datastore error: ${err.message}`, err); + } } public getMetricData(trialJobId?: string, metricType?: MetricType): Promise { diff --git a/src/nni_manager/core/nnimanager.ts b/src/nni_manager/core/nnimanager.ts index 6b95de4a45..c8b56f2fd4 100644 --- a/src/nni_manager/core/nnimanager.ts +++ b/src/nni_manager/core/nnimanager.ts @@ -25,6 +25,7 @@ import { ChildProcess, spawn } from 'child_process'; import { Deferred } from 'ts-deferred'; import * as component from '../common/component'; import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore'; +import { NNIError } from '../common/errors'; import { getExperimentId } from '../common/experimentStartupInfo'; import { getLogger, Logger } from '../common/log'; import { @@ -122,7 +123,7 @@ class NNIManager implements Manager { } const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.multiPhase); - console.log(`dispatcher command: ${dispatcherCommand}`); + this.log.debug(`dispatcher command: ${dispatcherCommand}`); this.setupTuner( //expParams.tuner.tunerCommand, dispatcherCommand, @@ -136,6 +137,7 @@ class NNIManager implements Manager { this.run().catch((err: Error) => { this.criticalError(err); }); + return this.experimentProfile.id; } @@ -146,12 +148,12 @@ class NNIManager implements Manager { const expParams: ExperimentParams = this.experimentProfile.params; // Set up multiphase config - if(expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) { + if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) { this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString()); } const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.multiPhase); - console.log(`dispatcher command: ${dispatcherCommand}`); + this.log.debug(`dispatcher command: ${dispatcherCommand}`); this.setupTuner( dispatcherCommand, undefined, @@ -369,8 +371,12 @@ class NNIManager implements Manager { await Promise.all([ this.periodicallyUpdateExecDuration(), - this.trainingService.run(), - this.trialJobsMaintainer.run()]); + this.trainingService.run().catch((err: Error) => { + throw new NNIError('Training service error', `Training service error: ${err.message}`, err); + }), + this.trialJobsMaintainer.run().catch((err: Error) => { + throw new NNIError('Job maintainer error', `Job maintainer error: ${err.message}`, err); + })]); } private addEventListeners(): void { @@ -380,26 +386,26 @@ class NNIManager implements Manager { } this.trainingService.addTrialJobMetricListener((metric: TrialJobMetric) => { this.onTrialJobMetrics(metric).catch((err: Error) => { - this.criticalError(err); + this.criticalError(new NNIError('Job metrics error', `Job metrics error: ${err.message}`, err)); }); }); this.trialJobsMaintainer.on(async (event: TrialJobMaintainerEvent, trialJobDetail: TrialJobDetail) => { this.onTrialJobEvent(event, trialJobDetail).catch((err: Error) => { - this.criticalError(err); + this.criticalError(new NNIError('Trial job event error', `Trial job event error: ${err.message}`, err)); }); }); this.dispatcher.onCommand((commandType: string, content: string) => { this.onTunerCommand(commandType, content).catch((err: Error) => { - this.criticalError(err); + this.criticalError(new NNIError('Tuner command event error', `Tuner command event error: ${err.message}`, err)); }); }); } private sendInitTunerCommands(): void { if (this.dispatcher === undefined) { - throw new Error('Error: tuner has not been setup'); + throw new Error('Dispatcher error: tuner has not been setup'); } // TO DO: we should send INITIALIZE command to tuner if user's tuner needs to run init method in tuner this.log.debug(`Send tuner command: update search space: ${this.experimentProfile.params.searchSpace}`); @@ -479,9 +485,13 @@ class NNIManager implements Manager { }; const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm); this.trialJobsMaintainer.setTrialJob(trialJobDetail.id, Object.assign({}, trialJobDetail)); - // TO DO: to uncomment - assert(trialJobDetail.status === 'WAITING'); - await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, content, trialJobDetail.url); + const jobDetailSnapshot: TrialJobDetail | undefined = this.trialJobsMaintainer.getTrialJob(trialJobDetail.id); + if (jobDetailSnapshot !== undefined) { + await this.dataStore.storeTrialJobEvent( + jobDetailSnapshot.status, jobDetailSnapshot.id, content, jobDetailSnapshot.url); + } else { + assert(false, `undefined jobdetail in job maintainer: ${trialJobDetail.id}`); + } if (this.currSubmittedTrialNum === this.experimentProfile.params.maxTrialNum) { this.trialJobsMaintainer.setNoMoreTrials(); }