Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

fix-pylint #1664

Merged
merged 1 commit into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tools/nni_trial_tool/hdfsClientUtility.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import os
import posixpath
from pyhdfs import HdfsClient
from .log_utils import LogType, nni_log

def copyHdfsDirectoryToLocal(hdfsDirectory, localDirectory, hdfsClient):
Expand Down Expand Up @@ -79,7 +78,8 @@ def copyDirectoryToHdfs(localDirectory, hdfsDirectory, hdfsClient):
try:
result = result and copyDirectoryToHdfs(file_path, hdfs_directory, hdfsClient)
except Exception as exception:
nni_log(LogType.Error, 'Copy local directory {0} to hdfs directory {1} error: {2}'.format(file_path, hdfs_directory, str(exception)))
nni_log(LogType.Error,
'Copy local directory {0} to hdfs directory {1} error: {2}'.format(file_path, hdfs_directory, str(exception)))
result = False
else:
hdfs_file_path = os.path.join(hdfsDirectory, file)
Expand Down
6 changes: 2 additions & 4 deletions tools/nni_trial_tool/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@

from queue import Queue

from .rest_utils import rest_get, rest_post, rest_put, rest_delete
from .constants import NNI_EXP_ID, NNI_TRIAL_JOB_ID, STDOUT_API
from .rest_utils import rest_post
from .url_utils import gen_send_stdout_url

@unique
Expand Down Expand Up @@ -154,8 +153,7 @@ def _populateQueue(stream, queue):
self._is_read_completed = True
break

self.pip_log_reader_thread = threading.Thread(target = _populateQueue,
args = (self.pipeReader, self.queue))
self.pip_log_reader_thread = threading.Thread(target=_populateQueue, args=(self.pipeReader, self.queue))
self.pip_log_reader_thread.daemon = True
self.start()
self.pip_log_reader_thread.start()
Expand Down
1 change: 0 additions & 1 deletion tools/nni_trial_tool/rest_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


import time
import requests

def rest_get(url, timeout):
Expand Down
14 changes: 8 additions & 6 deletions tools/nni_trial_tool/test/test_hdfsClientUtility.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import os
import shutil
import random
import string
import unittest
import json
import sys
from pyhdfs import HdfsClient
from tools.nni_trial_tool.hdfsClientUtility import copyFileToHdfs, copyDirectoryToHdfs
sys.path.append("..")
from trial.hdfsClientUtility import copyFileToHdfs, copyDirectoryToHdfs
import os
import shutil
import random
import string


class HDFSClientUtilityTest(unittest.TestCase):
'''Unit test for hdfsClientUtility.py'''
Expand Down Expand Up @@ -82,7 +83,8 @@ def test_copy_directory_run(self):
with open('./{0}/{1}'.format(directory_name, file_name), 'w') as file:
file.write(file_content)

result = copyDirectoryToHdfs('./{}'.format(directory_name), '/{0}/{1}'.format(self.hdfs_config['userName'], directory_name), self.hdfs_client)
result = copyDirectoryToHdfs('./{}'.format(directory_name),
'/{0}/{1}'.format(self.hdfs_config['userName'], directory_name), self.hdfs_client)
self.assertTrue(result)

directory_list = self.hdfs_client.listdir('/{0}'.format(self.hdfs_config['userName']))
Expand Down
61 changes: 39 additions & 22 deletions tools/nni_trial_tool/trial_keeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,31 @@
# ============================================================================================================================== #

import argparse
import sys
import os
from subprocess import Popen, PIPE
from subprocess import Popen
import time
import logging
import shlex
import re
import sys
import select
import json
import threading
from pyhdfs import HdfsClient
import pkg_resources
from .rest_utils import rest_post, rest_get
from .url_utils import gen_send_stdout_url, gen_send_version_url, gen_parameter_meta_url
from .url_utils import gen_send_version_url, gen_parameter_meta_url

from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, \
from .constants import LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, \
MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR, NNI_EXP_ID
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal
from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType
from .log_utils import LogType, nni_log, RemoteLogger, StdOutputType

logger = logging.getLogger('trial_keeper')
regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,1}).*')

_hdfs_client = None


def get_hdfs_client(args):
global _hdfs_client

Expand All @@ -62,15 +61,18 @@ def get_hdfs_client(args):
if hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
try:
if args.webhdfs_path:
_hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5)
_hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name,
webhdfs_path=args.webhdfs_path, timeout=5)
else:
# backward compatibility
_hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
_hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name,
timeout=5)
except Exception as e:
nni_log(LogType.Error, 'Create HDFS client error: ' + str(e))
raise e
return _hdfs_client


def main_loop(args):
'''main loop logic for trial keeper'''

Expand All @@ -79,9 +81,11 @@ def main_loop(args):

stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+')
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection)
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper',
StdOutputType.Stdout, args.log_collection)
# redirect trial keeper's stdout and stderr to syslog
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection)
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout,
args.log_collection)
sys.stdout = sys.stderr = trial_keeper_syslogger
hdfs_output_dir = None

Expand All @@ -97,8 +101,10 @@ def main_loop(args):

# Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader()
process = Popen(args.trial_command, shell = True, stdout = log_pipe_stdout, stderr = log_pipe_stdout)
nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split(args.trial_command)))
process = Popen(args.trial_command, shell=True, stdout=log_pipe_stdout, stderr=log_pipe_stdout)
nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid,
shlex.split(
args.trial_command)))

while True:
retCode = process.poll()
Expand All @@ -110,9 +116,11 @@ def main_loop(args):
nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
try:
if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client):
nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir))
nni_log(LogType.Info,
'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir))
else:
nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir))
nni_log(LogType.Info,
'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir))
except Exception as e:
nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e))
raise e
Expand All @@ -123,14 +131,16 @@ def main_loop(args):

time.sleep(2)


def trial_keeper_help_info(*args):
print('please run --help to see guidance')


def check_version(args):
try:
trial_keeper_version = pkg_resources.get_distribution('nni').version
except pkg_resources.ResolutionError as err:
#package nni does not exist, try nni-tool package
# package nni does not exist, try nni-tool package
nni_log(LogType.Error, 'Package nni does not exist!')
os._exit(1)
if not args.nni_manager_version:
Expand All @@ -145,21 +155,26 @@ def check_version(args):
log_entry = {}
if trial_keeper_version != nni_manager_version:
nni_log(LogType.Error, 'Version does not match!')
error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format(nni_manager_version, trial_keeper_version)
error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format(
nni_manager_version, trial_keeper_version)
log_entry['tag'] = 'VCFail'
log_entry['msg'] = error_message
rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False)
rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10,
False)
os._exit(1)
else:
nni_log(LogType.Info, 'Version match!')
log_entry['tag'] = 'VCSuccess'
rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False)
rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10,
False)
except AttributeError as err:
nni_log(LogType.Error, err)


def is_multi_phase():
return MULTI_PHASE and (MULTI_PHASE in ['True', 'true'])


def download_parameter(meta_list, args):
"""
Download parameter file to local working directory.
Expand All @@ -171,7 +186,8 @@ def download_parameter(meta_list, args):
]
"""
nni_log(LogType.Debug, str(meta_list))
nni_log(LogType.Debug, 'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID))
nni_log(LogType.Debug,
'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID))
nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR)))
for meta in meta_list:
if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID:
Expand All @@ -180,6 +196,7 @@ def download_parameter(meta_list, args):
hdfs_client = get_hdfs_client(args)
copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False)


def fetch_parameter_file(args):
class FetchThread(threading.Thread):
def __init__(self, args):
Expand All @@ -203,16 +220,17 @@ def run(self):
fetch_file_thread = FetchThread(args)
fetch_file_thread.start()


if __name__ == '__main__':
'''NNI Trial Keeper main function'''
PARSER = argparse.ArgumentParser()
PARSER.set_defaults(func=trial_keeper_help_info)
PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process')
PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP')
PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port')
PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility
PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility
PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs')
PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility
PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility
PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs')
PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs')
PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
Expand All @@ -233,4 +251,3 @@ def run(self):
except Exception as e:
nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e)))
os._exit(1)