From 2ba38a2c0a59d4a4c297bacac156b787f2a135f3 Mon Sep 17 00:00:00 2001 From: chicm-ms <38930155+chicm-ms@users.noreply.github.com> Date: Mon, 11 Mar 2019 11:43:49 +0800 Subject: [PATCH 01/19] Fix integration test dependencies (#822) * Install dependencies for PAI/k8s IT * updates --- test/pipelines-it-kubeflow.yml | 10 ++++++++++ test/pipelines-it-pai.yml | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/test/pipelines-it-kubeflow.yml b/test/pipelines-it-kubeflow.yml index 5314dd7ce3..b7f86a968a 100644 --- a/test/pipelines-it-kubeflow.yml +++ b/test/pipelines-it-kubeflow.yml @@ -38,6 +38,16 @@ jobs: source install.sh displayName: 'Install nni toolkit via source code' + - script: | + python3 -m pip install scikit-learn==0.20.0 --user + python3 -m pip install torch==0.4.1 --user + python3 -m pip install torchvision==0.2.1 --user + python3 -m pip install keras==2.1.6 --user + python3 -m pip install tensorflow-gpu==1.10.0 --user + sudo apt-get install swig -y + nnictl package install --name=SMAC + displayName: 'Install dependencies for integration tests in Kubeflow mode' + - script: | if [ $(build_docker_img) = 'true' ] then diff --git a/test/pipelines-it-pai.yml b/test/pipelines-it-pai.yml index 7a58eb6cba..5c8f7a9dcc 100644 --- a/test/pipelines-it-pai.yml +++ b/test/pipelines-it-pai.yml @@ -38,6 +38,16 @@ jobs: source install.sh displayName: 'Install nni toolkit via source code' + - script: | + python3 -m pip install scikit-learn==0.20.0 --user + python3 -m pip install torch==0.4.1 --user + python3 -m pip install torchvision==0.2.1 --user + python3 -m pip install keras==2.1.6 --user + python3 -m pip install tensorflow-gpu==1.10.0 --user + sudo apt-get install swig -y + nnictl package install --name=SMAC + displayName: 'Install dependencies for integration tests in PAI mode' + - script: | if [ $(build_docker_img) = 'true' ] then From 7846d93e82a9d5d3395d920a8b7abd77007874ed Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Tue, 12 Mar 2019 10:09:48 +0800 Subject: [PATCH 02/19] seperate tuner assessor msg queue --- src/nni_manager/common/manager.ts | 1 + src/nni_manager/core/nnimanager.ts | 8 ++- .../rest_server/restValidationSchemas.ts | 3 +- src/sdk/pynni/nni/msg_dispatcher.py | 40 +++++++++-- src/sdk/pynni/nni/msg_dispatcher_base.py | 72 ++++++++++++++----- tools/nni_cmd/config_schema.py | 1 + 6 files changed, 100 insertions(+), 25 deletions(-) diff --git a/src/nni_manager/common/manager.ts b/src/nni_manager/common/manager.ts index 5d05eeb5db..435b8c7254 100644 --- a/src/nni_manager/common/manager.ts +++ b/src/nni_manager/common/manager.ts @@ -45,6 +45,7 @@ interface ExperimentParams { classFileName?: string; checkpointDir: string; gpuNum?: number; + includeIntermeidateResults?: boolean; }; assessor?: { className: string; diff --git a/src/nni_manager/core/nnimanager.ts b/src/nni_manager/core/nnimanager.ts index c96302cd92..c1f24af4b2 100644 --- a/src/nni_manager/core/nnimanager.ts +++ b/src/nni_manager/core/nnimanager.ts @@ -273,11 +273,17 @@ class NNIManager implements Manager { newCwd = cwd; } // TO DO: add CUDA_VISIBLE_DEVICES + let includeIntermeidateResultsEnv: boolean | undefined = false; + if (this.experimentProfile.params.tuner !== undefined) { + includeIntermeidateResultsEnv = this.experimentProfile.params.tuner.includeIntermeidateResults; + } + let nniEnv = { NNI_MODE: mode, NNI_CHECKPOINT_DIRECTORY: dataDirectory, NNI_LOG_DIRECTORY: getLogDir(), - NNI_LOG_LEVEL: getLogLevel() + NNI_LOG_LEVEL: getLogLevel(), + NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermeidateResultsEnv }; let newEnv = Object.assign({}, process.env, nniEnv); const tunerProc: ChildProcess = spawn(command, [], { diff --git a/src/nni_manager/rest_server/restValidationSchemas.ts b/src/nni_manager/rest_server/restValidationSchemas.ts index 8eedfe3444..02b2e346bc 100644 --- a/src/nni_manager/rest_server/restValidationSchemas.ts +++ b/src/nni_manager/rest_server/restValidationSchemas.ts @@ -158,7 +158,8 @@ export namespace ValidationSchemas { className: joi.string(), classArgs: joi.any(), gpuNum: joi.number().min(0), - checkpointDir: joi.string().allow('') + checkpointDir: joi.string().allow(''), + includeIntermeidateResults: joi.boolean() }), assessor: joi.object({ builtinAssessorName: joi.string().valid('Medianstop', 'Curvefitting'), diff --git a/src/sdk/pynni/nni/msg_dispatcher.py b/src/sdk/pynni/nni/msg_dispatcher.py index 325befc7d1..765d696d1b 100644 --- a/src/sdk/pynni/nni/msg_dispatcher.py +++ b/src/sdk/pynni/nni/msg_dispatcher.py @@ -18,14 +18,15 @@ # OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # ================================================================================================== +import os import logging from collections import defaultdict import json_tricks -import threading from .protocol import CommandType, send from .msg_dispatcher_base import MsgDispatcherBase from .assessor import AssessResult +from .common import multi_thread_enabled _logger = logging.getLogger(__name__) @@ -126,12 +127,7 @@ def handle_report_metric_data(self, data): - 'type': report type, support {'FINAL', 'PERIODICAL'} """ if data['type'] == 'FINAL': - id_ = data['parameter_id'] - value = data['value'] - if id_ in _customized_parameter_ids: - self.tuner.receive_customized_trial_result(id_, _trial_params[id_], value) - else: - self.tuner.receive_trial_result(id_, _trial_params[id_], value) + self._handle_final_metric_data(data) elif data['type'] == 'PERIODICAL': if self.assessor is not None: self._handle_intermediate_metric_data(data) @@ -157,7 +153,21 @@ def handle_trial_end(self, data): self.assessor.trial_end(trial_job_id, data['event'] == 'SUCCEEDED') return True + def _handle_final_metric_data(self, data): + ''' + Call tuner to process final results + ''' + id_ = data['parameter_id'] + value = data['value'] + if id_ in _customized_parameter_ids: + self.tuner.receive_customized_trial_result(id_, _trial_params[id_], value) + else: + self.tuner.receive_trial_result(id_, _trial_params[id_], value) + def _handle_intermediate_metric_data(self, data): + ''' + Call assessor to process intermediate results + ''' if data['type'] != 'PERIODICAL': return True if self.assessor is None: @@ -187,5 +197,21 @@ def _handle_intermediate_metric_data(self, data): if result is AssessResult.Bad: _logger.debug('BAD, kill %s', trial_job_id) send(CommandType.KillTrialJob, json_tricks.dumps(trial_job_id)) + # notify tuner + _logger.debug('env var: NNI_INCLUDE_INTERMEDIATE_RESULTS: [%s]', os.environ.get('NNI_INCLUDE_INTERMEDIATE_RESULTS')) + if os.environ.get('NNI_INCLUDE_INTERMEDIATE_RESULTS') == 'true': + self._earlystop_notify_tuner(data) else: _logger.debug('GOOD') + + def _earlystop_notify_tuner(self, data): + ''' + Send last intermediate result as final result to tuner in case the + trial is early stopped. + ''' + _logger.debug('Early stop notify tuner data: [%s]', data) + data['type'] = 'FINAL' + if multi_thread_enabled(): + self._handle_final_metric_data(data) + else: + self.enqueue_command(CommandType.ReportMetricData, data) diff --git a/src/sdk/pynni/nni/msg_dispatcher_base.py b/src/sdk/pynni/nni/msg_dispatcher_base.py index 337c3c6f11..66cc879059 100644 --- a/src/sdk/pynni/nni/msg_dispatcher_base.py +++ b/src/sdk/pynni/nni/msg_dispatcher_base.py @@ -19,14 +19,13 @@ # ================================================================================================== #import json_tricks -import logging import os -from queue import Queue -import sys - +import threading +import logging from multiprocessing.dummy import Pool as ThreadPool - +from queue import Queue, Empty import json_tricks + from .common import init_logger, multi_thread_enabled from .recoverable import Recoverable from .protocol import CommandType, receive @@ -39,6 +38,14 @@ def __init__(self): if multi_thread_enabled(): self.pool = ThreadPool() self.thread_results = [] + else: + self.stopping = False + self.default_command_queue = Queue() + self.assessor_command_queue = Queue() + self.default_worker = threading.Thread(target=self.command_queue_worker, args=(self.default_command_queue,)) + self.assessor_worker = threading.Thread(target=self.command_queue_worker, args=(self.assessor_command_queue,)) + self.default_worker.start() + self.assessor_worker.start() def run(self): """Run the tuner. @@ -51,40 +58,73 @@ def run(self): while True: _logger.debug('waiting receive_message') command, data = receive() + if data: + data = json_tricks.loads(data) + if command is None or command is CommandType.Terminate: break if multi_thread_enabled(): - result = self.pool.map_async(self.handle_request_thread, [(command, data)]) + result = self.pool.map_async(self.process_command_thread, [(command, data)]) self.thread_results.append(result) if any([thread_result.ready() and not thread_result.successful() for thread_result in self.thread_results]): _logger.debug('Caught thread exception') break else: - self.handle_request((command, data)) + if command == CommandType.ReportMetricData and data['type'] == 'PERIODICAL': + self.enqueue_assessor_command(command, data) + else: + self.enqueue_command(command, data) + self.stopping = True if multi_thread_enabled(): self.pool.close() self.pool.join() + else: + self.default_worker.join() + self.assessor_worker.join() _logger.info('Terminated by NNI manager') - def handle_request_thread(self, request): + def command_queue_worker(self, command_queue): + ''' + Process commands in command queues. + ''' + while not self.stopping: + try: + # set timeout to ensure self.stopping is checked periodically + command, data = command_queue.get(timeout=5) + self.process_command(command, data) + except Empty: + pass + + def enqueue_command(self, command, data): + ''' + Enqueue command into default queue + ''' + self.default_command_queue.put((command, data)) + + def enqueue_assessor_command(self, command, data): + ''' + Enqueue command into a seperate command queue for assessor + ''' + self.assessor_command_queue.put((command, data)) + + def process_command_thread(self, request): + ''' + Worker thread to process a command. + ''' + command, data = request if multi_thread_enabled(): try: - self.handle_request(request) + self.process_command(command, data) except Exception as e: _logger.exception(str(e)) raise else: pass - def handle_request(self, request): - command, data = request - - _logger.debug('handle request: command: [{}], data: [{}]'.format(command, data)) - - if data: - data = json_tricks.loads(data) + def process_command(self, command, data): + _logger.debug('process_command: command: [{}], data: [{}]'.format(command, data)) command_handlers = { # Tunner commands: diff --git a/tools/nni_cmd/config_schema.py b/tools/nni_cmd/config_schema.py index 0f519a8926..249fcb2c53 100644 --- a/tools/nni_cmd/config_schema.py +++ b/tools/nni_cmd/config_schema.py @@ -57,6 +57,7 @@ Optional('classArgs'): { 'optimize_mode': Or('maximize', 'minimize') }, + Optional('includeIntermeidateResults'): bool, Optional('gpuNum'): And(int, lambda x: 0 <= x <= 99999), },{ 'builtinTunerName': Or('BatchTuner', 'GridSearch'), From 74584e1d58aa3e910732861aae15854caa926925 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Wed, 20 Mar 2019 16:25:44 +0800 Subject: [PATCH 03/19] add warning message --- src/sdk/pynni/nni/msg_dispatcher_base.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/sdk/pynni/nni/msg_dispatcher_base.py b/src/sdk/pynni/nni/msg_dispatcher_base.py index 66cc879059..ae2fc3fbfc 100644 --- a/src/sdk/pynni/nni/msg_dispatcher_base.py +++ b/src/sdk/pynni/nni/msg_dispatcher_base.py @@ -33,6 +33,8 @@ init_logger('dispatcher.log') _logger = logging.getLogger(__name__) +QUEUE_LEN_WARNING_MARK = 20 + class MsgDispatcherBase(Recoverable): def __init__(self): if multi_thread_enabled(): @@ -103,12 +105,20 @@ def enqueue_command(self, command, data): ''' self.default_command_queue.put((command, data)) + qsize = self.default_command_queue.qsize() + if qsize >= QUEUE_LEN_WARNING_MARK: + _logger.warning('default queue length: %d', qsize) + def enqueue_assessor_command(self, command, data): ''' Enqueue command into a seperate command queue for assessor ''' self.assessor_command_queue.put((command, data)) + qsize = self.assessor_command_queue.qsize() + if qsize >= QUEUE_LEN_WARNING_MARK: + _logger.warning('assessor queue length: %d', qsize) + def process_command_thread(self, request): ''' Worker thread to process a command. From 8af3f8b11c22f43e89f59a553b57401c01d2ca56 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Wed, 20 Mar 2019 17:31:08 +0800 Subject: [PATCH 04/19] updates --- docs/en_US/ExperimentConfig.md | 7 ++++++- src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/docs/en_US/ExperimentConfig.md b/docs/en_US/ExperimentConfig.md index f640a26d08..ed08782394 100644 --- a/docs/en_US/ExperimentConfig.md +++ b/docs/en_US/ExperimentConfig.md @@ -227,12 +227,17 @@ machineList: * __classArgs__ __classArgs__ specifies the arguments of tuner algorithm. - * __gpuNum__ + + * __gpuNum__ __gpuNum__ specifies the gpu number to run the tuner process. The value of this field should be a positive number. Note: users could only specify one way to set tuner, for example, set {tunerName, optimizationMode} or {tunerCommand, tunerCwd}, and could not set them both. + * __includeIntermeidateResults__ + + If __includeIntermeidateResults__ is true, the last intermediate results of the trials early stopped by assessor are sent to tuner as final results. The default value of __includeIntermeidateResults__ is false. + * __assessor__ * Description diff --git a/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py b/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py index 39b5c20039..cc67fd48e3 100644 --- a/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py +++ b/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py @@ -75,7 +75,7 @@ def _pack_parameter(parameter_id, params, customized=False, trial_job_id=None, p class MultiPhaseMsgDispatcher(MsgDispatcherBase): def __init__(self, tuner, assessor=None): - super() + super(MultiPhaseMsgDispatcher, self).__init__() self.tuner = tuner self.assessor = assessor if assessor is None: From ffa66233f505a7a97ab3eecb12a9b8bdddf5a5e0 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Wed, 20 Mar 2019 17:45:44 +0800 Subject: [PATCH 05/19] updates --- src/sdk/pynni/nni/msg_dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sdk/pynni/nni/msg_dispatcher.py b/src/sdk/pynni/nni/msg_dispatcher.py index 765d696d1b..d43acb4898 100644 --- a/src/sdk/pynni/nni/msg_dispatcher.py +++ b/src/sdk/pynni/nni/msg_dispatcher.py @@ -71,7 +71,7 @@ def _pack_parameter(parameter_id, params, customized=False): class MsgDispatcher(MsgDispatcherBase): def __init__(self, tuner, assessor=None): - super().__init__() + super(MsgDispatcher, self).__init__() self.tuner = tuner self.assessor = assessor if assessor is None: From 0d94a8e665dc9c4c7c7431b4a015e0d9dadac97a Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 21 Mar 2019 11:47:48 +0800 Subject: [PATCH 06/19] updates --- src/sdk/pynni/nni/msg_dispatcher_base.py | 3 ++- src/sdk/pynni/nni/protocol.py | 9 +++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/sdk/pynni/nni/msg_dispatcher_base.py b/src/sdk/pynni/nni/msg_dispatcher_base.py index ae2fc3fbfc..a3bab2faa2 100644 --- a/src/sdk/pynni/nni/msg_dispatcher_base.py +++ b/src/sdk/pynni/nni/msg_dispatcher_base.py @@ -53,12 +53,12 @@ def run(self): """Run the tuner. This function will never return unless raise. """ + _logger.info('Start dispatcher') mode = os.getenv('NNI_MODE') if mode == 'resume': self.load_checkpoint() while True: - _logger.debug('waiting receive_message') command, data = receive() if data: data = json_tricks.loads(data) @@ -77,6 +77,7 @@ def run(self): else: self.enqueue_command(command, data) + _logger.info('Dispatcher exiting...') self.stopping = True if multi_thread_enabled(): self.pool.close() diff --git a/src/sdk/pynni/nni/protocol.py b/src/sdk/pynni/nni/protocol.py index 3d9b143ee6..c9638bf025 100644 --- a/src/sdk/pynni/nni/protocol.py +++ b/src/sdk/pynni/nni/protocol.py @@ -42,11 +42,10 @@ class CommandType(Enum): NoMoreTrialJobs = b'NO' KillTrialJob = b'KI' - +_lock = threading.Lock() try: _in_file = open(3, 'rb') _out_file = open(4, 'wb') - _lock = threading.Lock() except OSError: _msg = 'IPC pipeline not exists, maybe you are importing tuner/assessor from trial code?' import logging @@ -60,8 +59,7 @@ def send(command, data): """ global _lock try: - if multi_thread_enabled(): - _lock.acquire() + _lock.acquire() data = data.encode('utf8') assert len(data) < 1000000, 'Command too long' msg = b'%b%06d%b' % (command.value, len(data), data) @@ -69,8 +67,7 @@ def send(command, data): _out_file.write(msg) _out_file.flush() finally: - if multi_thread_enabled(): - _lock.release() + _lock.release() def receive(): From d005790c9ec98fb59d05c49c252105954314fdf0 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 21 Mar 2019 13:51:26 +0800 Subject: [PATCH 07/19] updates --- src/nni_manager/core/test/ipcInterfaceTerminate.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nni_manager/core/test/ipcInterfaceTerminate.test.ts b/src/nni_manager/core/test/ipcInterfaceTerminate.test.ts index ca599d6a0c..33032405ec 100644 --- a/src/nni_manager/core/test/ipcInterfaceTerminate.test.ts +++ b/src/nni_manager/core/test/ipcInterfaceTerminate.test.ts @@ -106,7 +106,7 @@ describe('core/ipcInterface.terminate', (): void => { assert.ok(!procError); deferred.resolve(); }, - 2000); + 5000); return deferred.promise; }); From 5321ed369ad20b3492855963234acbe5a56f73a1 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 21 Mar 2019 16:00:07 +0800 Subject: [PATCH 08/19] updates --- src/sdk/pynni/tests/test_assessor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sdk/pynni/tests/test_assessor.py b/src/sdk/pynni/tests/test_assessor.py index 6898e7d3b4..856ab60bb1 100644 --- a/src/sdk/pynni/tests/test_assessor.py +++ b/src/sdk/pynni/tests/test_assessor.py @@ -79,7 +79,6 @@ def test_assessor(self): self.assertIs(type(e), AssertionError) self.assertEqual(e.args[0], 'Unsupported command: CommandType.NewTrialJob') - self.assertEqual(_trials, ['A', 'B', 'A']) self.assertEqual(_end_trials, [('A', False), ('B', True)]) _reverse_io() From 7f2ab19b3844195e2421e179335444eb49e39ab5 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 21 Mar 2019 16:40:09 +0800 Subject: [PATCH 09/19] updates --- src/sdk/pynni/tests/test_assessor.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/sdk/pynni/tests/test_assessor.py b/src/sdk/pynni/tests/test_assessor.py index 856ab60bb1..56886377a7 100644 --- a/src/sdk/pynni/tests/test_assessor.py +++ b/src/sdk/pynni/tests/test_assessor.py @@ -79,14 +79,5 @@ def test_assessor(self): self.assertIs(type(e), AssertionError) self.assertEqual(e.args[0], 'Unsupported command: CommandType.NewTrialJob') - self.assertEqual(_end_trials, [('A', False), ('B', True)]) - - _reverse_io() - command, data = receive() - self.assertIs(command, CommandType.KillTrialJob) - self.assertEqual(data, '"A"') - self.assertEqual(len(_out_buf.read()), 0) - - if __name__ == '__main__': main() \ No newline at end of file From f2a2f2efca9028afc91c34af871f45510c5dd6d9 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 21 Mar 2019 16:58:14 +0800 Subject: [PATCH 10/19] updates --- src/sdk/pynni/nni/msg_dispatcher_base.py | 2 +- test/tuner_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sdk/pynni/nni/msg_dispatcher_base.py b/src/sdk/pynni/nni/msg_dispatcher_base.py index a3bab2faa2..0d67b9bbc7 100644 --- a/src/sdk/pynni/nni/msg_dispatcher_base.py +++ b/src/sdk/pynni/nni/msg_dispatcher_base.py @@ -95,7 +95,7 @@ def command_queue_worker(self, command_queue): while not self.stopping: try: # set timeout to ensure self.stopping is checked periodically - command, data = command_queue.get(timeout=5) + command, data = command_queue.get(timeout=3) self.process_command(command, data) except Empty: pass diff --git a/test/tuner_test.py b/test/tuner_test.py index 4a6d4527d0..943bbd4988 100644 --- a/test/tuner_test.py +++ b/test/tuner_test.py @@ -77,7 +77,7 @@ def run(dispatch_type): for dispatcher_name in dipsatcher_list: try: # sleep 5 seconds here, to make sure previous stopped exp has enough time to exit to avoid port conflict - time.sleep(5) + time.sleep(6) test_builtin_dispatcher(dispatch_type, dispatcher_name) print(GREEN + 'Test %s %s: TEST PASS' % (dispatcher_name, dispatch_type) + CLEAR) except Exception as error: From 556f8ee5dc6f5066f45912fd7101eb20209c1e4d Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 21 Mar 2019 18:05:54 +0800 Subject: [PATCH 11/19] updates --- src/sdk/pynni/nni/msg_dispatcher_base.py | 5 ++++- src/sdk/pynni/tests/test_assessor.py | 11 +++++++++++ src/sdk/pynni/tests/test_tuner.py | 1 + 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/sdk/pynni/nni/msg_dispatcher_base.py b/src/sdk/pynni/nni/msg_dispatcher_base.py index 0d67b9bbc7..2fe6327fb8 100644 --- a/src/sdk/pynni/nni/msg_dispatcher_base.py +++ b/src/sdk/pynni/nni/msg_dispatcher_base.py @@ -34,6 +34,7 @@ _logger = logging.getLogger(__name__) QUEUE_LEN_WARNING_MARK = 20 +_worker_fast_exit_on_terminate = True class MsgDispatcherBase(Recoverable): def __init__(self): @@ -92,13 +93,15 @@ def command_queue_worker(self, command_queue): ''' Process commands in command queues. ''' - while not self.stopping: + while True: try: # set timeout to ensure self.stopping is checked periodically command, data = command_queue.get(timeout=3) self.process_command(command, data) except Empty: pass + if self.stopping and (_worker_fast_exit_on_terminate or command_queue.empty()): + break def enqueue_command(self, command, data): ''' diff --git a/src/sdk/pynni/tests/test_assessor.py b/src/sdk/pynni/tests/test_assessor.py index 56886377a7..281d9e1ac1 100644 --- a/src/sdk/pynni/tests/test_assessor.py +++ b/src/sdk/pynni/tests/test_assessor.py @@ -73,11 +73,22 @@ def test_assessor(self): assessor = NaiveAssessor() dispatcher = MsgDispatcher(None, assessor) + nni.msg_dispatcher_base._worker_fast_exit_on_terminate = False try: dispatcher.run() except Exception as e: self.assertIs(type(e), AssertionError) self.assertEqual(e.args[0], 'Unsupported command: CommandType.NewTrialJob') + self.assertEqual(_trials, ['A', 'B', 'A']) + self.assertEqual(_end_trials, [('A', False), ('B', True)]) + + _reverse_io() + command, data = receive() + self.assertIs(command, CommandType.KillTrialJob) + self.assertEqual(data, '"A"') + self.assertEqual(len(_out_buf.read()), 0) + + if __name__ == '__main__': main() \ No newline at end of file diff --git a/src/sdk/pynni/tests/test_tuner.py b/src/sdk/pynni/tests/test_tuner.py index 8dc9aea810..ca1eec15ca 100644 --- a/src/sdk/pynni/tests/test_tuner.py +++ b/src/sdk/pynni/tests/test_tuner.py @@ -88,6 +88,7 @@ def test_tuner(self): tuner = NaiveTuner() dispatcher = MsgDispatcher(tuner) + nni.msg_dispatcher_base._worker_fast_exit_on_terminate = False try: dispatcher.run() except Exception as e: From bd5084a4e30a66ccf2918f7deb0b5880a6c6afc0 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 21 Mar 2019 18:22:26 +0800 Subject: [PATCH 12/19] updates --- src/sdk/pynni/nni/msg_dispatcher_base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/sdk/pynni/nni/msg_dispatcher_base.py b/src/sdk/pynni/nni/msg_dispatcher_base.py index 2fe6327fb8..aa331830c2 100644 --- a/src/sdk/pynni/nni/msg_dispatcher_base.py +++ b/src/sdk/pynni/nni/msg_dispatcher_base.py @@ -97,7 +97,11 @@ def command_queue_worker(self, command_queue): try: # set timeout to ensure self.stopping is checked periodically command, data = command_queue.get(timeout=3) - self.process_command(command, data) + try: + self.process_command(command, data) + except Exception as e: + _logger.exception(e) + raise except Empty: pass if self.stopping and (_worker_fast_exit_on_terminate or command_queue.empty()): From b17b15f27bc32844dc57701e2d907b9b6dafc160 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 21 Mar 2019 18:30:48 +0800 Subject: [PATCH 13/19] updates --- src/sdk/pynni/tests/test_assessor.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/sdk/pynni/tests/test_assessor.py b/src/sdk/pynni/tests/test_assessor.py index 281d9e1ac1..7c5f7cea30 100644 --- a/src/sdk/pynni/tests/test_assessor.py +++ b/src/sdk/pynni/tests/test_assessor.py @@ -68,17 +68,12 @@ def test_assessor(self): send(CommandType.ReportMetricData, '{"trial_job_id":"A","type":"PERIODICAL","sequence":1,"value":3}') send(CommandType.TrialEnd, '{"trial_job_id":"A","event":"SYS_CANCELED"}') send(CommandType.TrialEnd, '{"trial_job_id":"B","event":"SUCCEEDED"}') - send(CommandType.NewTrialJob, 'null') _restore_io() assessor = NaiveAssessor() dispatcher = MsgDispatcher(None, assessor) nni.msg_dispatcher_base._worker_fast_exit_on_terminate = False - try: - dispatcher.run() - except Exception as e: - self.assertIs(type(e), AssertionError) - self.assertEqual(e.args[0], 'Unsupported command: CommandType.NewTrialJob') + dispatcher.run() self.assertEqual(_trials, ['A', 'B', 'A']) self.assertEqual(_end_trials, [('A', False), ('B', True)]) @@ -91,4 +86,4 @@ def test_assessor(self): if __name__ == '__main__': - main() \ No newline at end of file + main() From f56c765db90726f8554e260ce2f6fc390fd3d337 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 21 Mar 2019 18:51:30 +0800 Subject: [PATCH 14/19] updates --- src/sdk/pynni/nni/msg_dispatcher_base.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/sdk/pynni/nni/msg_dispatcher_base.py b/src/sdk/pynni/nni/msg_dispatcher_base.py index aa331830c2..d2990a5f78 100644 --- a/src/sdk/pynni/nni/msg_dispatcher_base.py +++ b/src/sdk/pynni/nni/msg_dispatcher_base.py @@ -73,10 +73,7 @@ def run(self): _logger.debug('Caught thread exception') break else: - if command == CommandType.ReportMetricData and data['type'] == 'PERIODICAL': - self.enqueue_assessor_command(command, data) - else: - self.enqueue_command(command, data) + self.enqueue_command(command, data) _logger.info('Dispatcher exiting...') self.stopping = True @@ -109,20 +106,17 @@ def command_queue_worker(self, command_queue): def enqueue_command(self, command, data): ''' - Enqueue command into default queue + Enqueue command into command queues ''' - self.default_command_queue.put((command, data)) + if command == CommandType.TrialEnd or (command == CommandType.ReportMetricData and data['type'] == 'PERIODICAL'): + self.assessor_command_queue.put((command, data)) + else: + self.default_command_queue.put((command, data)) qsize = self.default_command_queue.qsize() if qsize >= QUEUE_LEN_WARNING_MARK: _logger.warning('default queue length: %d', qsize) - def enqueue_assessor_command(self, command, data): - ''' - Enqueue command into a seperate command queue for assessor - ''' - self.assessor_command_queue.put((command, data)) - qsize = self.assessor_command_queue.qsize() if qsize >= QUEUE_LEN_WARNING_MARK: _logger.warning('assessor queue length: %d', qsize) From 2ca2fc6714e7bcec58100b4626e6d30159f05091 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 21 Mar 2019 19:17:29 +0800 Subject: [PATCH 15/19] updates --- src/sdk/pynni/nni/msg_dispatcher_base.py | 4 +++- src/sdk/pynni/tests/test_assessor.py | 5 +++++ src/sdk/pynni/tests/test_tuner.py | 10 +++++----- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/sdk/pynni/nni/msg_dispatcher_base.py b/src/sdk/pynni/nni/msg_dispatcher_base.py index d2990a5f78..1f0de810cb 100644 --- a/src/sdk/pynni/nni/msg_dispatcher_base.py +++ b/src/sdk/pynni/nni/msg_dispatcher_base.py @@ -49,6 +49,7 @@ def __init__(self): self.assessor_worker = threading.Thread(target=self.command_queue_worker, args=(self.assessor_command_queue,)) self.default_worker.start() self.assessor_worker.start() + self.worker_exceptions = [] def run(self): """Run the tuner. @@ -98,7 +99,8 @@ def command_queue_worker(self, command_queue): self.process_command(command, data) except Exception as e: _logger.exception(e) - raise + self.worker_exceptions.append(e) + break except Empty: pass if self.stopping and (_worker_fast_exit_on_terminate or command_queue.empty()): diff --git a/src/sdk/pynni/tests/test_assessor.py b/src/sdk/pynni/tests/test_assessor.py index 7c5f7cea30..9f992377cd 100644 --- a/src/sdk/pynni/tests/test_assessor.py +++ b/src/sdk/pynni/tests/test_assessor.py @@ -68,12 +68,17 @@ def test_assessor(self): send(CommandType.ReportMetricData, '{"trial_job_id":"A","type":"PERIODICAL","sequence":1,"value":3}') send(CommandType.TrialEnd, '{"trial_job_id":"A","event":"SYS_CANCELED"}') send(CommandType.TrialEnd, '{"trial_job_id":"B","event":"SUCCEEDED"}') + send(CommandType.NewTrialJob, 'null') _restore_io() assessor = NaiveAssessor() dispatcher = MsgDispatcher(None, assessor) nni.msg_dispatcher_base._worker_fast_exit_on_terminate = False + dispatcher.run() + e = dispatcher.worker_exceptions[0] + self.assertIs(type(e), AssertionError) + self.assertEqual(e.args[0], 'Unsupported command: CommandType.NewTrialJob') self.assertEqual(_trials, ['A', 'B', 'A']) self.assertEqual(_end_trials, [('A', False), ('B', True)]) diff --git a/src/sdk/pynni/tests/test_tuner.py b/src/sdk/pynni/tests/test_tuner.py index ca1eec15ca..44dd45a418 100644 --- a/src/sdk/pynni/tests/test_tuner.py +++ b/src/sdk/pynni/tests/test_tuner.py @@ -89,11 +89,11 @@ def test_tuner(self): tuner = NaiveTuner() dispatcher = MsgDispatcher(tuner) nni.msg_dispatcher_base._worker_fast_exit_on_terminate = False - try: - dispatcher.run() - except Exception as e: - self.assertIs(type(e), AssertionError) - self.assertEqual(e.args[0], 'Unsupported command: CommandType.KillTrialJob') + + dispatcher.run() + e = dispatcher.worker_exceptions[0] + self.assertIs(type(e), AssertionError) + self.assertEqual(e.args[0], 'Unsupported command: CommandType.KillTrialJob') _reverse_io() # now we are receiving from Tuner's outgoing stream self._assert_params(0, 2, [ ], None) From fc1ece04cdd10a77666c16546eb25d4f9695a66b Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 22 Mar 2019 10:31:52 +0800 Subject: [PATCH 16/19] updates --- src/sdk/pynni/nni/msg_dispatcher.py | 20 ++++++++------------ src/sdk/pynni/nni/msg_dispatcher_base.py | 15 ++++++--------- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/src/sdk/pynni/nni/msg_dispatcher.py b/src/sdk/pynni/nni/msg_dispatcher.py index d43acb4898..e30e3bf4e6 100644 --- a/src/sdk/pynni/nni/msg_dispatcher.py +++ b/src/sdk/pynni/nni/msg_dispatcher.py @@ -88,9 +88,8 @@ def save_checkpoint(self): self.assessor.save_checkpoint() def handle_initialize(self, data): - ''' - data is search space - ''' + """Data is search space + """ self.tuner.update_search_space(data) send(CommandType.Initialized, '') return True @@ -154,9 +153,8 @@ def handle_trial_end(self, data): return True def _handle_final_metric_data(self, data): - ''' - Call tuner to process final results - ''' + """Call tuner to process final results + """ id_ = data['parameter_id'] value = data['value'] if id_ in _customized_parameter_ids: @@ -165,9 +163,8 @@ def _handle_final_metric_data(self, data): self.tuner.receive_trial_result(id_, _trial_params[id_], value) def _handle_intermediate_metric_data(self, data): - ''' - Call assessor to process intermediate results - ''' + """Call assessor to process intermediate results + """ if data['type'] != 'PERIODICAL': return True if self.assessor is None: @@ -205,10 +202,9 @@ def _handle_intermediate_metric_data(self, data): _logger.debug('GOOD') def _earlystop_notify_tuner(self, data): - ''' - Send last intermediate result as final result to tuner in case the + """Send last intermediate result as final result to tuner in case the trial is early stopped. - ''' + """ _logger.debug('Early stop notify tuner data: [%s]', data) data['type'] = 'FINAL' if multi_thread_enabled(): diff --git a/src/sdk/pynni/nni/msg_dispatcher_base.py b/src/sdk/pynni/nni/msg_dispatcher_base.py index 1f0de810cb..60c2c93fda 100644 --- a/src/sdk/pynni/nni/msg_dispatcher_base.py +++ b/src/sdk/pynni/nni/msg_dispatcher_base.py @@ -88,9 +88,8 @@ def run(self): _logger.info('Terminated by NNI manager') def command_queue_worker(self, command_queue): - ''' - Process commands in command queues. - ''' + """Process commands in command queues. + """ while True: try: # set timeout to ensure self.stopping is checked periodically @@ -107,9 +106,8 @@ def command_queue_worker(self, command_queue): break def enqueue_command(self, command, data): - ''' - Enqueue command into command queues - ''' + """Enqueue command into command queues + """ if command == CommandType.TrialEnd or (command == CommandType.ReportMetricData and data['type'] == 'PERIODICAL'): self.assessor_command_queue.put((command, data)) else: @@ -124,9 +122,8 @@ def enqueue_command(self, command, data): _logger.warning('assessor queue length: %d', qsize) def process_command_thread(self, request): - ''' - Worker thread to process a command. - ''' + """Worker thread to process a command. + """ command, data = request if multi_thread_enabled(): try: From 8a21c9742a7ada9ce06580025b8863241a70597e Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 22 Mar 2019 10:46:52 +0800 Subject: [PATCH 17/19] updates --- test/tuner_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/tuner_test.py b/test/tuner_test.py index 943bbd4988..9a785f000d 100644 --- a/test/tuner_test.py +++ b/test/tuner_test.py @@ -76,7 +76,7 @@ def run(dispatch_type): dipsatcher_list = TUNER_LIST if dispatch_type == 'Tuner' else ASSESSOR_LIST for dispatcher_name in dipsatcher_list: try: - # sleep 5 seconds here, to make sure previous stopped exp has enough time to exit to avoid port conflict + # Sleep here to make sure previous stopped exp has enough time to exit to avoid port conflict time.sleep(6) test_builtin_dispatcher(dispatch_type, dispatcher_name) print(GREEN + 'Test %s %s: TEST PASS' % (dispatcher_name, dispatch_type) + CLEAR) From 96fceb0372e88bb575bb9a2416ff4be3b8633c99 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 22 Mar 2019 11:32:20 +0800 Subject: [PATCH 18/19] updates --- docs/en_US/ExperimentConfig.md | 4 ++-- src/nni_manager/common/manager.ts | 2 +- src/nni_manager/core/nnimanager.ts | 6 +++--- src/nni_manager/rest_server/restValidationSchemas.ts | 2 +- tools/nni_cmd/config_schema.py | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/en_US/ExperimentConfig.md b/docs/en_US/ExperimentConfig.md index ed08782394..7bd6d7a518 100644 --- a/docs/en_US/ExperimentConfig.md +++ b/docs/en_US/ExperimentConfig.md @@ -234,9 +234,9 @@ machineList: Note: users could only specify one way to set tuner, for example, set {tunerName, optimizationMode} or {tunerCommand, tunerCwd}, and could not set them both. - * __includeIntermeidateResults__ + * __includeIntermediateResults__ - If __includeIntermeidateResults__ is true, the last intermediate results of the trials early stopped by assessor are sent to tuner as final results. The default value of __includeIntermeidateResults__ is false. + If __includeIntermediateResults__ is true, the last intermediate results of the trials early stopped by assessor are sent to tuner as final results. The default value of __includeIntermediateResults__ is false. * __assessor__ diff --git a/src/nni_manager/common/manager.ts b/src/nni_manager/common/manager.ts index 435b8c7254..11c253ab7a 100644 --- a/src/nni_manager/common/manager.ts +++ b/src/nni_manager/common/manager.ts @@ -45,7 +45,7 @@ interface ExperimentParams { classFileName?: string; checkpointDir: string; gpuNum?: number; - includeIntermeidateResults?: boolean; + includeIntermediateResults?: boolean; }; assessor?: { className: string; diff --git a/src/nni_manager/core/nnimanager.ts b/src/nni_manager/core/nnimanager.ts index c1f24af4b2..1ac94b97b4 100644 --- a/src/nni_manager/core/nnimanager.ts +++ b/src/nni_manager/core/nnimanager.ts @@ -273,9 +273,9 @@ class NNIManager implements Manager { newCwd = cwd; } // TO DO: add CUDA_VISIBLE_DEVICES - let includeIntermeidateResultsEnv: boolean | undefined = false; + let includeIntermediateResultsEnv: boolean | undefined = false; if (this.experimentProfile.params.tuner !== undefined) { - includeIntermeidateResultsEnv = this.experimentProfile.params.tuner.includeIntermeidateResults; + includeIntermediateResultsEnv = this.experimentProfile.params.tuner.includeIntermediateResults; } let nniEnv = { @@ -283,7 +283,7 @@ class NNIManager implements Manager { NNI_CHECKPOINT_DIRECTORY: dataDirectory, NNI_LOG_DIRECTORY: getLogDir(), NNI_LOG_LEVEL: getLogLevel(), - NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermeidateResultsEnv + NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv }; let newEnv = Object.assign({}, process.env, nniEnv); const tunerProc: ChildProcess = spawn(command, [], { diff --git a/src/nni_manager/rest_server/restValidationSchemas.ts b/src/nni_manager/rest_server/restValidationSchemas.ts index 02b2e346bc..a4eba14438 100644 --- a/src/nni_manager/rest_server/restValidationSchemas.ts +++ b/src/nni_manager/rest_server/restValidationSchemas.ts @@ -159,7 +159,7 @@ export namespace ValidationSchemas { classArgs: joi.any(), gpuNum: joi.number().min(0), checkpointDir: joi.string().allow(''), - includeIntermeidateResults: joi.boolean() + includeIntermediateResults: joi.boolean() }), assessor: joi.object({ builtinAssessorName: joi.string().valid('Medianstop', 'Curvefitting'), diff --git a/tools/nni_cmd/config_schema.py b/tools/nni_cmd/config_schema.py index 249fcb2c53..07574541c0 100644 --- a/tools/nni_cmd/config_schema.py +++ b/tools/nni_cmd/config_schema.py @@ -57,7 +57,7 @@ Optional('classArgs'): { 'optimize_mode': Or('maximize', 'minimize') }, - Optional('includeIntermeidateResults'): bool, + Optional('includeIntermediateResults'): bool, Optional('gpuNum'): And(int, lambda x: 0 <= x <= 99999), },{ 'builtinTunerName': Or('BatchTuner', 'GridSearch'), From cc425e5e2b6a7d3e7952b48131830625c7887ef8 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 22 Mar 2019 20:05:39 +0800 Subject: [PATCH 19/19] updates --- docs/en_US/ExperimentConfig.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en_US/ExperimentConfig.md b/docs/en_US/ExperimentConfig.md index 7bd6d7a518..0df147b7fc 100644 --- a/docs/en_US/ExperimentConfig.md +++ b/docs/en_US/ExperimentConfig.md @@ -236,7 +236,7 @@ machineList: * __includeIntermediateResults__ - If __includeIntermediateResults__ is true, the last intermediate results of the trials early stopped by assessor are sent to tuner as final results. The default value of __includeIntermediateResults__ is false. + If __includeIntermediateResults__ is true, the last intermediate result of the trial that is early stopped by assessor is sent to tuner as final result. The default value of __includeIntermediateResults__ is false. * __assessor__