diff --git a/src/nni_manager/training_service/common/clusterJobRestServer.ts b/src/nni_manager/training_service/common/clusterJobRestServer.ts index 057d57ae9e..797eb59827 100644 --- a/src/nni_manager/training_service/common/clusterJobRestServer.ts +++ b/src/nni_manager/training_service/common/clusterJobRestServer.ts @@ -23,8 +23,12 @@ import * as assert from 'assert'; import { Request, Response, Router } from 'express'; import * as bodyParser from 'body-parser'; import * as component from '../../common/component'; +import * as fs from 'fs' +import * as path from 'path' import { getBasePort, getExperimentId } from '../../common/experimentStartupInfo'; import { RestServer } from '../../common/restServer' +import { getLogDir } from '../../common/utils'; +import { Writable } from 'stream'; /** * Cluster Job Training service Rest server, provides rest API to support Cluster job metrics update @@ -33,6 +37,7 @@ import { RestServer } from '../../common/restServer' @component.Singleton export abstract class ClusterJobRestServer extends RestServer{ private readonly API_ROOT_URL: string = '/api/v1/nni-pai'; + private readonly NNI_METRICS_PATTERN: string = `NNISDK_MEb'(?.*?)'`; private readonly expId: string = getExperimentId(); @@ -88,6 +93,38 @@ export abstract class ClusterJobRestServer extends RestServer{ } }); + router.post(`/stdout/${this.expId}/:trialId`, (req: Request, res: Response) => { + const trialLogPath: string = path.join(getLogDir(), `trial_${req.params.trialId}.log`); + try { + let skipLogging: boolean = false; + if(req.body.tag === 'trial' && req.body.msg !== undefined) { + const metricsContent = req.body.msg.match(this.NNI_METRICS_PATTERN); + if(metricsContent && metricsContent.groups) { + this.handleTrialMetrics(req.params.trialId, [metricsContent.groups['metrics']]); + skipLogging = true; + } + } + + if(!skipLogging){ + // Construct write stream to write remote trial's log into local file + const writeStream: Writable = fs.createWriteStream(trialLogPath, { + flags: 'a+', + encoding: 'utf8', + autoClose: true + }); + + writeStream.write(req.body.msg + '\n'); + writeStream.end(); + } + res.send(); + } + catch(err) { + this.log.error(`json parse stdout data error: ${err}`); + res.status(500); + res.send(err.message); + } + }); + return router; } diff --git a/src/nni_manager/training_service/pai/paiTrainingService.ts b/src/nni_manager/training_service/pai/paiTrainingService.ts index f415876a63..c1ef8ccb60 100644 --- a/src/nni_manager/training_service/pai/paiTrainingService.ts +++ b/src/nni_manager/training_service/pai/paiTrainingService.ts @@ -92,6 +92,7 @@ class PAITrainingService implements TrainingService { public async run(): Promise { const restServer: PAIJobRestServer = component.get(PAIJobRestServer); await restServer.start(); + this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`); while (!this.stopping) { await this.updatePaiToken(); diff --git a/src/sdk/pynni/nni/common.py b/src/sdk/pynni/nni/common.py index cb21efda64..d71241a7f5 100644 --- a/src/sdk/pynni/nni/common.py +++ b/src/sdk/pynni/nni/common.py @@ -44,10 +44,13 @@ def _load_env_args(): class _LoggerFileWrapper(TextIOBase): def __init__(self, logger_file): self.file = logger_file + self.orig_stdout = sys.stdout def write(self, s): if s != '\n': time = datetime.now().strftime(_time_format) + self.orig_stdout.write(s + '\n') + self.orig_stdout.flush() self.file.write('[{}] PRINT '.format(time) + s + '\n') self.file.flush() return len(s) diff --git a/src/sdk/pynni/nni/platform/local.py b/src/sdk/pynni/nni/platform/local.py index cf4df736d7..d2303b0aa9 100644 --- a/src/sdk/pynni/nni/platform/local.py +++ b/src/sdk/pynni/nni/platform/local.py @@ -34,8 +34,11 @@ _outputdir = os.environ['NNI_OUTPUT_DIR'] if not os.path.exists(_outputdir): os.makedirs(_outputdir) -_log_file_path = os.path.join(_outputdir, 'trial.log') -init_logger(_log_file_path) + +_nni_platform = os.environ['NNI_PLATFORM'] +if _nni_platform != 'pai': + _log_file_path = os.path.join(_outputdir, 'trial.log') + init_logger(_log_file_path) _multiphase = os.environ.get('MULTI_PHASE') @@ -74,11 +77,16 @@ def get_next_parameter(): return params def send_metric(string): - data = (string + '\n').encode('utf8') - assert len(data) < 1000000, 'Metric too long' - _metric_file.write(b'ME%06d%b' % (len(data), data)) - _metric_file.flush() - subprocess.run(['touch', _metric_file.name], check = True) + if _nni_platform == 'pai': + data = (string).encode('utf8') + assert len(data) < 1000000, 'Metric too long' + print('NNISDK_ME%s' % (data)) + else: + data = (string + '\n').encode('utf8') + assert len(data) < 1000000, 'Metric too long' + _metric_file.write(b'ME%06d%b' % (len(data), data)) + _metric_file.flush() + subprocess.run(['touch', _metric_file.name], check = True) def get_sequence_id(): return os.environ['NNI_TRIAL_SEQ_ID'] \ No newline at end of file diff --git a/tools/nni_trial_tool/constants.py b/tools/nni_trial_tool/constants.py index 3ae30a3a33..c1f18b8fea 100644 --- a/tools/nni_trial_tool/constants.py +++ b/tools/nni_trial_tool/constants.py @@ -34,4 +34,10 @@ STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr') -UPDATE_METRICS_API = '/update-metrics' \ No newline at end of file +UPDATE_METRICS_API = '/update-metrics' + +STDOUT_API = '/stdout' +NNI_SYS_DIR = os.environ['NNI_SYS_DIR'] +NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID'] +NNI_EXP_ID = os.environ['NNI_EXP_ID'] + diff --git a/tools/nni_trial_tool/log_utils.py b/tools/nni_trial_tool/log_utils.py index 55b1b7ed99..e4e63731d7 100644 --- a/tools/nni_trial_tool/log_utils.py +++ b/tools/nni_trial_tool/log_utils.py @@ -18,8 +18,23 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +import os +import sys +import json +import logging +import logging.handlers +import time +import threading + from datetime import datetime from enum import Enum, unique +from logging import StreamHandler + +from queue import Queue + +from .rest_utils import rest_get, rest_post, rest_put, rest_delete +from .constants import NNI_EXP_ID, NNI_TRIAL_JOB_ID, STDOUT_API +from .url_utils import gen_send_stdout_url @unique class LogType(Enum): @@ -29,7 +44,135 @@ class LogType(Enum): Error = 'ERROR' Critical = 'CRITICAL' +@unique +class StdOutputType(Enum): + Stdout = 'stdout', + Stderr = 'stderr' + def nni_log(log_type, log_message): '''Log message into stdout''' dt = datetime.now() - print('[{0}] {1} {2}'.format(dt, log_type.value, log_message)) \ No newline at end of file + print('[{0}] {1} {2}'.format(dt, log_type.value, log_message)) + +class NNIRestLogHanlder(StreamHandler): + def __init__(self, host, port, tag, std_output_type=StdOutputType.Stdout): + StreamHandler.__init__(self) + self.host = host + self.port = port + self.tag = tag + self.std_output_type = std_output_type + self.orig_stdout = sys.__stdout__ + self.orig_stderr = sys.__stderr__ + + def emit(self, record): + log_entry = {} + log_entry['tag'] = self.tag + log_entry['stdOutputType'] = self.std_output_type.name + log_entry['msg'] = self.format(record) + + try: + response = rest_post(gen_send_stdout_url(self.host, self.port), json.dumps(log_entry), 10, True) + except Exception as e: + self.orig_stderr.write(str(e) + '\n') + self.orig_stderr.flush() + +class RemoteLogger(object): + """ + NNI remote logger + """ + def __init__(self, syslog_host, syslog_port, tag, std_output_type, log_level=logging.INFO): + ''' + constructor + ''' + self.logger = logging.getLogger('nni_syslog_{}'.format(tag)) + self.log_level = log_level + self.logger.setLevel(self.log_level) + handler = NNIRestLogHanlder(syslog_host, syslog_port, tag) + self.logger.addHandler(handler) + if std_output_type == StdOutputType.Stdout: + self.orig_stdout = sys.__stdout__ + else: + self.orig_stdout = sys.__stderr__ + + def get_pipelog_reader(self): + ''' + Get pipe for remote logger + ''' + return PipeLogReader(self.logger, logging.INFO) + + def write(self, buf): + ''' + Write buffer data into logger/stdout + ''' + for line in buf.rstrip().splitlines(): + self.orig_stdout.write(line.rstrip() + '\n') + self.orig_stdout.flush() + try: + self.logger.log(self.log_level, line.rstrip()) + except Exception as e: + pass + +class PipeLogReader(threading.Thread): + """ + The reader thread reads log data from pipe + """ + def __init__(self, logger, log_level=logging.INFO): + """Setup the object with a logger and a loglevel + and start the thread + """ + threading.Thread.__init__(self) + self.queue = Queue() + self.logger = logger + self.daemon = False + self.log_level = log_level + self.fdRead, self.fdWrite = os.pipe() + self.pipeReader = os.fdopen(self.fdRead) + self.orig_stdout = sys.__stdout__ + self._is_read_completed = False + + def _populateQueue(stream, queue): + ''' + Collect lines from 'stream' and put them in 'quque'. + ''' + time.sleep(5) + while True: + try: + line = self.queue.get(True, 5) + try: + self.logger.log(self.log_level, line.rstrip()) + self.orig_stdout.write(line.rstrip() + '\n') + self.orig_stdout.flush() + except Exception as e: + pass + except Exception as e: + self._is_read_completed = True + break + + self.pip_log_reader_thread = threading.Thread(target = _populateQueue, + args = (self.pipeReader, self.queue)) + self.pip_log_reader_thread.daemon = True + self.start() + self.pip_log_reader_thread.start() + + def fileno(self): + """Return the write file descriptor of the pipe + """ + return self.fdWrite + + def run(self): + """Run the thread, logging everything. + """ + for line in iter(self.pipeReader.readline, ''): + self.queue.put(line) + self.pipeReader.close() + + def close(self): + """Close the write end of the pipe. + """ + os.close(self.fdWrite) + + @property + def is_read_completed(self): + """Return if read is completed + """ + return self._is_read_completed \ No newline at end of file diff --git a/tools/nni_trial_tool/metrics_reader.py b/tools/nni_trial_tool/metrics_reader.py index 6827dbd033..9d7f24b96d 100644 --- a/tools/nni_trial_tool/metrics_reader.py +++ b/tools/nni_trial_tool/metrics_reader.py @@ -25,14 +25,11 @@ import requests from datetime import datetime -from .constants import BASE_URL +from .constants import BASE_URL, NNI_EXP_ID, NNI_TRIAL_JOB_ID, NNI_SYS_DIR from .log_utils import LogType, nni_log from .rest_utils import rest_get, rest_post, rest_put, rest_delete from .url_utils import gen_update_metrics_url -NNI_SYS_DIR = os.environ['NNI_SYS_DIR'] -NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID'] -NNI_EXP_ID = os.environ['NNI_EXP_ID'] LEN_FIELD_SIZE = 6 MAGIC = 'ME' @@ -116,7 +113,7 @@ def read_experiment_metrics(nnimanager_ip, nnimanager_port): result['metrics'] = reader.read_trial_metrics() if len(result['metrics']) > 0: nni_log(LogType.Info, 'Result metrics is {}'.format(json.dumps(result))) - response = rest_post(gen_update_metrics_url(BASE_URL.format(nnimanager_ip), nnimanager_port, NNI_EXP_ID, NNI_TRIAL_JOB_ID), json.dumps(result), 10) + response = rest_post(gen_update_metrics_url(nnimanager_ip, nnimanager_port), json.dumps(result), 10) nni_log(LogType.Info,'Report metrics to NNI manager completed, http response code is {}'.format(response.status_code)) except Exception as e: #Error logging diff --git a/tools/nni_trial_tool/rest_utils.py b/tools/nni_trial_tool/rest_utils.py index d6abf0905e..71eb353614 100644 --- a/tools/nni_trial_tool/rest_utils.py +++ b/tools/nni_trial_tool/rest_utils.py @@ -31,13 +31,15 @@ def rest_get(url, timeout): print('Get exception {0} when sending http get to url {1}'.format(str(e), url)) return None -def rest_post(url, data, timeout): +def rest_post(url, data, timeout, rethrow_exception=False): '''Call rest post method''' try: response = requests.post(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\ data=data, timeout=timeout) return response except Exception as e: + if rethrow_exception is True: + raise print('Get exception {0} when sending http post to url {1}'.format(str(e), url)) return None diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index c4adeac434..29753585b9 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -25,11 +25,13 @@ import logging import shlex import re +import sys +import select from pyhdfs import HdfsClient from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal -from .log_utils import LogType, nni_log +from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType from .metrics_reader import read_experiment_metrics logger = logging.getLogger('trial_keeper') @@ -42,6 +44,11 @@ def main_loop(args): stdout_file = open(STDOUT_FULL_PATH, 'a+') stderr_file = open(STDERR_FULL_PATH, 'a+') + + trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout) + # redirect trial keeper's stdout and stderr to syslog + trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout) + sys.stdout = sys.stderr = trial_keeper_syslogger if args.pai_hdfs_host is not None and args.nni_hdfs_exp_dir is not None: try: @@ -52,15 +59,15 @@ def main_loop(args): copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client) # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior - process = Popen(args.trial_command, shell = True, stdout = stdout_file, stderr = stderr_file) + log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader() + process = Popen(args.trial_command, shell = True, stdout = log_pipe_stdout, stderr = log_pipe_stdout) nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split(args.trial_command))) - + while True: retCode = process.poll() ## Read experiment metrics, to avoid missing metrics - read_experiment_metrics(args.nnimanager_ip, args.nnimanager_port) - - if retCode is not None: + #read_experiment_metrics(args.nnimanager_ip, args.nnimanager_port) + if retCode is not None and log_pipe_stdout.is_read_completed == True: nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode)) if args.pai_hdfs_output_dir is not None: # Copy local directory to hdfs for OpenPAI @@ -102,8 +109,8 @@ def trial_keeper_help_info(*args): main_loop(args) except SystemExit as se: nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code)) - sys.exit(se.code) + os._exit(se.code) except Exception as e: nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e))) - sys.exit(1) + os._exit(1) diff --git a/tools/nni_trial_tool/url_utils.py b/tools/nni_trial_tool/url_utils.py index 69ce14ecb2..d167098d35 100644 --- a/tools/nni_trial_tool/url_utils.py +++ b/tools/nni_trial_tool/url_utils.py @@ -18,8 +18,12 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -from .constants import API_ROOT_URL, UPDATE_METRICS_API +from .constants import API_ROOT_URL, BASE_URL, UPDATE_METRICS_API, STDOUT_API, NNI_TRIAL_JOB_ID, NNI_EXP_ID -def gen_update_metrics_url(base_url, port, exp_id, trial_job_id): +def gen_update_metrics_url(ip, port): '''Generate update trial metrics url''' - return '{0}:{1}{2}{3}/{4}/:{5}'.format(base_url, port, API_ROOT_URL, UPDATE_METRICS_API, exp_id, trial_job_id) \ No newline at end of file + return '{0}:{1}{2}{3}/{4}/{5}'.format(BASE_URL.format(ip), port, API_ROOT_URL, UPDATE_METRICS_API, NNI_EXP_ID, NNI_TRIAL_JOB_ID) + +def gen_send_stdout_url(ip, port): + '''Generate send stdout url''' + return '{0}:{1}{2}{3}/{4}/{5}'.format(BASE_URL.format(ip), port, API_ROOT_URL, STDOUT_API, NNI_EXP_ID, NNI_TRIAL_JOB_ID) \ No newline at end of file