Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Fix bug: some trial jobs hyper params not stored and error handling updates #225

Merged
merged 11 commits into from
Oct 18, 2018
5 changes: 4 additions & 1 deletion src/nni_manager/common/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ export namespace NNIErrorNames {
}

export class NNIError extends Error {
constructor (name: string, message: string) {
constructor (name: string, message: string, err?: Error) {
super(message);
this.name = name;
if (err !== undefined) {
this.stack = err.stack;
}
}
}

Expand Down
12 changes: 9 additions & 3 deletions src/nni_manager/core/ipcInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import * as assert from 'assert';
import { ChildProcess } from 'child_process';
import { EventEmitter } from 'events';
import { Readable, Writable } from 'stream';
import { NNIError } from '../common/errors';
import { getLogger, Logger } from '../common/log';
import * as CommandType from './commands';

Expand Down Expand Up @@ -98,9 +99,14 @@ class IpcInterface {
public sendCommand(commandType: string, content: string = ''): void {
this.logger.debug(`ipcInterface command type: [${commandType}], content:[${content}]`);
assert.ok(this.acceptCommandTypes.has(commandType));
const data: Buffer = encodeCommand(commandType, content);
if (!this.outgoingStream.write(data)) {
this.logger.error('Commands jammed in buffer!');

try {
const data: Buffer = encodeCommand(commandType, content);
if (!this.outgoingStream.write(data)) {
this.logger.error('Commands jammed in buffer!');
}
} catch (err) {
throw new NNIError('Dispatcher Error', `Dispatcher Error: ${err.message}`, err);
}
}

Expand Down
36 changes: 24 additions & 12 deletions src/nni_manager/core/nniDataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { Database, DataStore, MetricData, MetricDataRecord, MetricType,
TrialJobEvent, TrialJobEventRecord, TrialJobInfo } from '../common/datastore';
import { isNewExperiment } from '../common/experimentStartupInfo';
import { getExperimentId } from '../common/experimentStartupInfo';
import { NNIError } from '../common/errors';
import { getExperimentId, isNewExperiment } from '../common/experimentStartupInfo';
import { getLogger, Logger } from '../common/log';
import { ExperimentProfile, TrialJobStatistics } from '../common/manager';
import { TrialJobStatus } from '../common/trainingService';
Expand Down Expand Up @@ -72,7 +72,11 @@ class NNIDataStore implements DataStore {
}

public async storeExperimentProfile(experimentProfile: ExperimentProfile): Promise<void> {
await this.db.storeExperimentProfile(experimentProfile);
try {
await this.db.storeExperimentProfile(experimentProfile);
} catch (err) {
throw new NNIError('Datastore error', `Datastore error: ${err.message}`, err);
}
}

public getExperimentProfile(experimentId: string): Promise<ExperimentProfile> {
Expand All @@ -82,7 +86,11 @@ class NNIDataStore implements DataStore {
public storeTrialJobEvent(event: TrialJobEvent, trialJobId: string, data?: string, logPath?: string): Promise<void> {
this.log.debug(`storeTrialJobEvent: event: ${event}, data: ${data}, logpath: ${logPath}`);

return this.db.storeTrialJobEvent(event, trialJobId, data, logPath);
return this.db.storeTrialJobEvent(event, trialJobId, data, logPath).catch(
(err: Error) => {
throw new NNIError('Datastore error', `Datastore error: ${err.message}`, err);
}
);
}

public async getTrialJobStatistics(): Promise<any[]> {
Expand Down Expand Up @@ -128,14 +136,18 @@ class NNIDataStore implements DataStore {
return;
}
assert(trialJobId === metrics.trial_job_id);
await this.db.storeMetricData(trialJobId, JSON.stringify({
trialJobId: metrics.trial_job_id,
parameterId: metrics.parameter_id,
type: metrics.type,
sequence: metrics.sequence,
data: metrics.value,
timestamp: Date.now()
}));
try {
await this.db.storeMetricData(trialJobId, JSON.stringify({
trialJobId: metrics.trial_job_id,
parameterId: metrics.parameter_id,
type: metrics.type,
sequence: metrics.sequence,
data: metrics.value,
timestamp: Date.now()
}));
} catch (err) {
throw new NNIError('Datastore error', `Datastore error: ${err.message}`, err);
}
}

public getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Expand Down
34 changes: 22 additions & 12 deletions src/nni_manager/core/nnimanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { ChildProcess, spawn } from 'child_process';
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
import { NNIError } from '../common/errors';
import { getExperimentId } from '../common/experimentStartupInfo';
import { getLogger, Logger } from '../common/log';
import {
Expand Down Expand Up @@ -122,7 +123,7 @@ class NNIManager implements Manager {
}

const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.multiPhase);
console.log(`dispatcher command: ${dispatcherCommand}`);
this.log.debug(`dispatcher command: ${dispatcherCommand}`);
this.setupTuner(
//expParams.tuner.tunerCommand,
dispatcherCommand,
Expand All @@ -136,6 +137,7 @@ class NNIManager implements Manager {
this.run().catch((err: Error) => {
this.criticalError(err);
});

return this.experimentProfile.id;
}

Expand All @@ -146,12 +148,12 @@ class NNIManager implements Manager {
const expParams: ExperimentParams = this.experimentProfile.params;

// Set up multiphase config
if(expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
}

const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.multiPhase);
console.log(`dispatcher command: ${dispatcherCommand}`);
this.log.debug(`dispatcher command: ${dispatcherCommand}`);
this.setupTuner(
dispatcherCommand,
undefined,
Expand Down Expand Up @@ -369,8 +371,12 @@ class NNIManager implements Manager {

await Promise.all([
this.periodicallyUpdateExecDuration(),
this.trainingService.run(),
this.trialJobsMaintainer.run()]);
this.trainingService.run().catch((err: Error) => {
throw new NNIError('Training service error', `Training service error: ${err.message}`, err);
}),
this.trialJobsMaintainer.run().catch((err: Error) => {
throw new NNIError('Job maintainer error', `Job maintainer error: ${err.message}`, err);
})]);
}

private addEventListeners(): void {
Expand All @@ -380,26 +386,26 @@ class NNIManager implements Manager {
}
this.trainingService.addTrialJobMetricListener((metric: TrialJobMetric) => {
this.onTrialJobMetrics(metric).catch((err: Error) => {
this.criticalError(err);
this.criticalError(new NNIError('Job metrics error', `Job metrics error: ${err.message}`, err));
});
});

this.trialJobsMaintainer.on(async (event: TrialJobMaintainerEvent, trialJobDetail: TrialJobDetail) => {
this.onTrialJobEvent(event, trialJobDetail).catch((err: Error) => {
this.criticalError(err);
this.criticalError(new NNIError('Trial job event error', `Trial job event error: ${err.message}`, err));
});
});

this.dispatcher.onCommand((commandType: string, content: string) => {
this.onTunerCommand(commandType, content).catch((err: Error) => {
this.criticalError(err);
this.criticalError(new NNIError('Tuner command event error', `Tuner command event error: ${err.message}`, err));
});
});
}

private sendInitTunerCommands(): void {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup');
throw new Error('Dispatcher error: tuner has not been setup');
}
// TO DO: we should send INITIALIZE command to tuner if user's tuner needs to run init method in tuner
this.log.debug(`Send tuner command: update search space: ${this.experimentProfile.params.searchSpace}`);
Expand Down Expand Up @@ -479,9 +485,13 @@ class NNIManager implements Manager {
};
const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm);
this.trialJobsMaintainer.setTrialJob(trialJobDetail.id, Object.assign({}, trialJobDetail));
// TO DO: to uncomment
assert(trialJobDetail.status === 'WAITING');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removing this assert? In which case does trainingservice return non 'WAITING' status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the submitted job is happened to be scheduled immediately after submit, the status of trialJobDetail is not waiting, now change to use a snapshot jobdetail object created by job maintainer.

await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, content, trialJobDetail.url);
const jobDetailSnapshot: TrialJobDetail | undefined = this.trialJobsMaintainer.getTrialJob(trialJobDetail.id);
if (jobDetailSnapshot !== undefined) {
await this.dataStore.storeTrialJobEvent(
jobDetailSnapshot.status, jobDetailSnapshot.id, content, jobDetailSnapshot.url);
} else {
assert(false, `undefined jobdetail in job maintainer: ${trialJobDetail.id}`);
}
if (this.currSubmittedTrialNum === this.experimentProfile.params.maxTrialNum) {
this.trialJobsMaintainer.setNoMoreTrials();
}
Expand Down