Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

NNI logging architecture improvement #539

Merged
merged 15 commits into from
Dec 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/nni_manager/training_service/common/clusterJobRestServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ import * as assert from 'assert';
import { Request, Response, Router } from 'express';
import * as bodyParser from 'body-parser';
import * as component from '../../common/component';
import * as fs from 'fs'
import * as path from 'path'
import { getBasePort, getExperimentId } from '../../common/experimentStartupInfo';
import { RestServer } from '../../common/restServer'
import { getLogDir } from '../../common/utils';
import { Writable } from 'stream';

/**
* Cluster Job Training service Rest server, provides rest API to support Cluster job metrics update
Expand All @@ -33,6 +37,7 @@ import { RestServer } from '../../common/restServer'
@component.Singleton
export abstract class ClusterJobRestServer extends RestServer{
private readonly API_ROOT_URL: string = '/api/v1/nni-pai';
private readonly NNI_METRICS_PATTERN: string = `NNISDK_MEb'(?<metrics>.*?)'`;

private readonly expId: string = getExperimentId();

Expand Down Expand Up @@ -88,6 +93,38 @@ export abstract class ClusterJobRestServer extends RestServer{
}
});

router.post(`/stdout/${this.expId}/:trialId`, (req: Request, res: Response) => {
const trialLogPath: string = path.join(getLogDir(), `trial_${req.params.trialId}.log`);
try {
let skipLogging: boolean = false;
if(req.body.tag === 'trial' && req.body.msg !== undefined) {
const metricsContent = req.body.msg.match(this.NNI_METRICS_PATTERN);
if(metricsContent && metricsContent.groups) {
this.handleTrialMetrics(req.params.trialId, [metricsContent.groups['metrics']]);
skipLogging = true;
}
}

if(!skipLogging){
// Construct write stream to write remote trial's log into local file
const writeStream: Writable = fs.createWriteStream(trialLogPath, {
flags: 'a+',
encoding: 'utf8',
autoClose: true
});

writeStream.write(req.body.msg + '\n');
writeStream.end();
}
res.send();
}
catch(err) {
this.log.error(`json parse stdout data error: ${err}`);
res.status(500);
res.send(err.message);
}
});

return router;
}

Expand Down
1 change: 1 addition & 0 deletions src/nni_manager/training_service/pai/paiTrainingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class PAITrainingService implements TrainingService {
public async run(): Promise<void> {
const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
await restServer.start();

this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
while (!this.stopping) {
await this.updatePaiToken();
Expand Down
3 changes: 3 additions & 0 deletions src/sdk/pynni/nni/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ def _load_env_args():
class _LoggerFileWrapper(TextIOBase):
def __init__(self, logger_file):
self.file = logger_file
self.orig_stdout = sys.stdout

def write(self, s):
if s != '\n':
time = datetime.now().strftime(_time_format)
self.orig_stdout.write(s + '\n')
self.orig_stdout.flush()
self.file.write('[{}] PRINT '.format(time) + s + '\n')
self.file.flush()
return len(s)
Expand Down
22 changes: 15 additions & 7 deletions src/sdk/pynni/nni/platform/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
_outputdir = os.environ['NNI_OUTPUT_DIR']
if not os.path.exists(_outputdir):
os.makedirs(_outputdir)
_log_file_path = os.path.join(_outputdir, 'trial.log')
init_logger(_log_file_path)

_nni_platform = os.environ['NNI_PLATFORM']
if _nni_platform != 'pai':
_log_file_path = os.path.join(_outputdir, 'trial.log')
init_logger(_log_file_path)

_multiphase = os.environ.get('MULTI_PHASE')

Expand Down Expand Up @@ -74,11 +77,16 @@ def get_next_parameter():
return params

def send_metric(string):
data = (string + '\n').encode('utf8')
assert len(data) < 1000000, 'Metric too long'
_metric_file.write(b'ME%06d%b' % (len(data), data))
_metric_file.flush()
subprocess.run(['touch', _metric_file.name], check = True)
if _nni_platform == 'pai':
data = (string).encode('utf8')
assert len(data) < 1000000, 'Metric too long'
print('NNISDK_ME%s' % (data))
else:
data = (string + '\n').encode('utf8')
assert len(data) < 1000000, 'Metric too long'
_metric_file.write(b'ME%06d%b' % (len(data), data))
_metric_file.flush()
subprocess.run(['touch', _metric_file.name], check = True)

def get_sequence_id():
return os.environ['NNI_TRIAL_SEQ_ID']
8 changes: 7 additions & 1 deletion tools/nni_trial_tool/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@

STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr')

UPDATE_METRICS_API = '/update-metrics'
UPDATE_METRICS_API = '/update-metrics'

STDOUT_API = '/stdout'
NNI_SYS_DIR = os.environ['NNI_SYS_DIR']
NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID']
NNI_EXP_ID = os.environ['NNI_EXP_ID']

145 changes: 144 additions & 1 deletion tools/nni_trial_tool/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,23 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import os
import sys
import json
import logging
import logging.handlers
import time
import threading

from datetime import datetime
from enum import Enum, unique
from logging import StreamHandler

from queue import Queue

from .rest_utils import rest_get, rest_post, rest_put, rest_delete
from .constants import NNI_EXP_ID, NNI_TRIAL_JOB_ID, STDOUT_API
from .url_utils import gen_send_stdout_url

@unique
class LogType(Enum):
Expand All @@ -29,7 +44,135 @@ class LogType(Enum):
Error = 'ERROR'
Critical = 'CRITICAL'

@unique
class StdOutputType(Enum):
Stdout = 'stdout',
Stderr = 'stderr'

def nni_log(log_type, log_message):
'''Log message into stdout'''
dt = datetime.now()
print('[{0}] {1} {2}'.format(dt, log_type.value, log_message))
print('[{0}] {1} {2}'.format(dt, log_type.value, log_message))

class NNIRestLogHanlder(StreamHandler):
def __init__(self, host, port, tag, std_output_type=StdOutputType.Stdout):
StreamHandler.__init__(self)
self.host = host
self.port = port
self.tag = tag
self.std_output_type = std_output_type
self.orig_stdout = sys.__stdout__
self.orig_stderr = sys.__stderr__

def emit(self, record):
log_entry = {}
log_entry['tag'] = self.tag
log_entry['stdOutputType'] = self.std_output_type.name
log_entry['msg'] = self.format(record)

try:
response = rest_post(gen_send_stdout_url(self.host, self.port), json.dumps(log_entry), 10, True)
except Exception as e:
self.orig_stderr.write(str(e) + '\n')
self.orig_stderr.flush()

class RemoteLogger(object):
"""
NNI remote logger
"""
def __init__(self, syslog_host, syslog_port, tag, std_output_type, log_level=logging.INFO):
'''
constructor
'''
self.logger = logging.getLogger('nni_syslog_{}'.format(tag))
self.log_level = log_level
self.logger.setLevel(self.log_level)
handler = NNIRestLogHanlder(syslog_host, syslog_port, tag)
self.logger.addHandler(handler)
if std_output_type == StdOutputType.Stdout:
self.orig_stdout = sys.__stdout__
else:
self.orig_stdout = sys.__stderr__

def get_pipelog_reader(self):
'''
Get pipe for remote logger
'''
return PipeLogReader(self.logger, logging.INFO)

def write(self, buf):
'''
Write buffer data into logger/stdout
'''
for line in buf.rstrip().splitlines():
self.orig_stdout.write(line.rstrip() + '\n')
self.orig_stdout.flush()
try:
self.logger.log(self.log_level, line.rstrip())
except Exception as e:
pass

class PipeLogReader(threading.Thread):
"""
The reader thread reads log data from pipe
"""
def __init__(self, logger, log_level=logging.INFO):
"""Setup the object with a logger and a loglevel
and start the thread
"""
threading.Thread.__init__(self)
self.queue = Queue()
self.logger = logger
self.daemon = False
self.log_level = log_level
self.fdRead, self.fdWrite = os.pipe()
self.pipeReader = os.fdopen(self.fdRead)
self.orig_stdout = sys.__stdout__
self._is_read_completed = False

def _populateQueue(stream, queue):
'''
Collect lines from 'stream' and put them in 'quque'.
'''
time.sleep(5)
while True:
try:
line = self.queue.get(True, 5)
try:
self.logger.log(self.log_level, line.rstrip())
self.orig_stdout.write(line.rstrip() + '\n')
self.orig_stdout.flush()
except Exception as e:
pass
except Exception as e:
self._is_read_completed = True
break

self.pip_log_reader_thread = threading.Thread(target = _populateQueue,
args = (self.pipeReader, self.queue))
self.pip_log_reader_thread.daemon = True
self.start()
self.pip_log_reader_thread.start()

def fileno(self):
"""Return the write file descriptor of the pipe
"""
return self.fdWrite

def run(self):
"""Run the thread, logging everything.
"""
for line in iter(self.pipeReader.readline, ''):
self.queue.put(line)
self.pipeReader.close()

def close(self):
"""Close the write end of the pipe.
"""
os.close(self.fdWrite)

@property
def is_read_completed(self):
"""Return if read is completed
"""
return self._is_read_completed
7 changes: 2 additions & 5 deletions tools/nni_trial_tool/metrics_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,11 @@
import requests

from datetime import datetime
from .constants import BASE_URL
from .constants import BASE_URL, NNI_EXP_ID, NNI_TRIAL_JOB_ID, NNI_SYS_DIR
from .log_utils import LogType, nni_log
from .rest_utils import rest_get, rest_post, rest_put, rest_delete
from .url_utils import gen_update_metrics_url

NNI_SYS_DIR = os.environ['NNI_SYS_DIR']
NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID']
NNI_EXP_ID = os.environ['NNI_EXP_ID']
LEN_FIELD_SIZE = 6
MAGIC = 'ME'

Expand Down Expand Up @@ -116,7 +113,7 @@ def read_experiment_metrics(nnimanager_ip, nnimanager_port):
result['metrics'] = reader.read_trial_metrics()
if len(result['metrics']) > 0:
nni_log(LogType.Info, 'Result metrics is {}'.format(json.dumps(result)))
response = rest_post(gen_update_metrics_url(BASE_URL.format(nnimanager_ip), nnimanager_port, NNI_EXP_ID, NNI_TRIAL_JOB_ID), json.dumps(result), 10)
response = rest_post(gen_update_metrics_url(nnimanager_ip, nnimanager_port), json.dumps(result), 10)
nni_log(LogType.Info,'Report metrics to NNI manager completed, http response code is {}'.format(response.status_code))
except Exception as e:
#Error logging
Expand Down
4 changes: 3 additions & 1 deletion tools/nni_trial_tool/rest_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ def rest_get(url, timeout):
print('Get exception {0} when sending http get to url {1}'.format(str(e), url))
return None

def rest_post(url, data, timeout):
def rest_post(url, data, timeout, rethrow_exception=False):
'''Call rest post method'''
try:
response = requests.post(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\
data=data, timeout=timeout)
return response
except Exception as e:
if rethrow_exception is True:
raise
print('Get exception {0} when sending http post to url {1}'.format(str(e), url))
return None

Expand Down
23 changes: 15 additions & 8 deletions tools/nni_trial_tool/trial_keeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import logging
import shlex
import re
import sys
import select
from pyhdfs import HdfsClient

from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal
from .log_utils import LogType, nni_log
from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType
from .metrics_reader import read_experiment_metrics

logger = logging.getLogger('trial_keeper')
Expand All @@ -42,6 +44,11 @@ def main_loop(args):

stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+')

trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout)
# redirect trial keeper's stdout and stderr to syslog
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout)
sys.stdout = sys.stderr = trial_keeper_syslogger

if args.pai_hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
try:
Expand All @@ -52,15 +59,15 @@ def main_loop(args):
copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client)

# Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
process = Popen(args.trial_command, shell = True, stdout = stdout_file, stderr = stderr_file)
log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader()
process = Popen(args.trial_command, shell = True, stdout = log_pipe_stdout, stderr = log_pipe_stdout)
nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split(args.trial_command)))

while True:
retCode = process.poll()
## Read experiment metrics, to avoid missing metrics
read_experiment_metrics(args.nnimanager_ip, args.nnimanager_port)

if retCode is not None:
#read_experiment_metrics(args.nnimanager_ip, args.nnimanager_port)
if retCode is not None and log_pipe_stdout.is_read_completed == True:
nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
if args.pai_hdfs_output_dir is not None:
# Copy local directory to hdfs for OpenPAI
Expand Down Expand Up @@ -102,8 +109,8 @@ def trial_keeper_help_info(*args):
main_loop(args)
except SystemExit as se:
nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code))
sys.exit(se.code)
os._exit(se.code)
except Exception as e:
nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e)))
sys.exit(1)
os._exit(1)

Loading