From 215400881d6bbb800cb240958f57868fdff5813f Mon Sep 17 00:00:00 2001 From: Daniel Treiman Date: Wed, 4 Oct 2017 13:30:29 -0700 Subject: [PATCH] Patched in Add an argument to ipcluster plugin to specify the number of engines (https://github.com/jtriley/StarCluster/pull/547) --- starcluster/plugins/ipcluster.py | 51 ++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/starcluster/plugins/ipcluster.py b/starcluster/plugins/ipcluster.py index 1939a76a2..ed623a129 100644 --- a/starcluster/plugins/ipcluster.py +++ b/starcluster/plugins/ipcluster.py @@ -71,7 +71,7 @@ def _start_engines(node, user, n_engines=None, kill_existing=False): n_engines = node.num_processors node.ssh.switch_user(user) if kill_existing: - node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True) + node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True) node.ssh.execute("ipcluster engines --n=%i --daemonize" % n_engines) node.ssh.switch_user('root') @@ -91,7 +91,7 @@ class IPCluster(DefaultClusterSetup): """ def __init__(self, enable_notebook=False, notebook_passwd=None, - notebook_directory=None, packer=None, log_level='INFO'): + notebook_directory=None, packer=None, master_engines=None, node_engines=None, log_level='INFO'): super(IPCluster, self).__init__() if isinstance(enable_notebook, basestring): self.enable_notebook = enable_notebook.lower().strip() == 'true' @@ -100,6 +100,8 @@ def __init__(self, enable_notebook=False, notebook_passwd=None, self.notebook_passwd = notebook_passwd or utils.generate_passwd(16) self.notebook_directory = notebook_directory self.log_level = log_level + self.master_engines = master_engines + self.node_engines = node_engines if packer not in (None, 'json', 'pickle', 'msgpack'): log.error("Unsupported packer: %s", packer) self.packer = None @@ -163,7 +165,11 @@ def _write_config(self, master, user, profile_dir): f.close() def _start_cluster(self, master, profile_dir): - n_engines = max(1, master.num_processors - 1) + if self.master_engines is None: + n_engines = max(1, master.num_processors - 1) + else: + n_engines = int(self.master_engines) + print "Setting master engines to '%s'" % self.master_engines log.info("Starting the IPython controller and %i engines on master" % n_engines) # cleanup existing connection files, to prevent their use @@ -215,7 +221,7 @@ def _start_cluster(self, master, profile_dir): self._authorize_port(master, (1000, 65535), "IPython controller") return local_json, n_engines - def _start_notebook(self, master, user, profile_dir): + def _start_notebook(self, master, user, profile_dir, time_to_dead=60.0): log.info("Setting up IPython web notebook for user: %s" % user) user_cert = posixpath.join(profile_dir, '%s.pem' % user) ssl_cert = posixpath.join(profile_dir, '%s.pem' % user) @@ -242,6 +248,7 @@ def _start_notebook(self, master, user, profile_dir): "c.NotebookApp.open_browser = False", "c.NotebookApp.password = u'%s'" % sha1pass, "c.NotebookApp.port = %d" % notebook_port, + "c.NotebookApp.time_to_dead = %d" % time_to_dead, ])) f.close() if self.notebook_directory is not None: @@ -288,12 +295,16 @@ def run(self, nodes, master, user, user_shell, volumes): cfile, n_engines_master = self._start_cluster(master, profile_dir) # Start engines on each of the non-master nodes non_master_nodes = [node for node in nodes if not node.is_master()] + n_engines_non_master = 0 for node in non_master_nodes: + if self.node_engines is None: + n_engines = node.num_processors + else: + n_engines = int(self.node_engines) self.pool.simple_job( - _start_engines, (node, user, node.num_processors), + _start_engines, (node, user, n_engines), jobid=node.alias) - n_engines_non_master = sum(node.num_processors - for node in non_master_nodes) + n_engines_non_master += n_engines if len(non_master_nodes) > 0: log.info("Adding %d engines on %d nodes", n_engines_non_master, len(non_master_nodes)) @@ -310,9 +321,12 @@ def run(self, nodes, master, user, user_shell, volumes): def on_add_node(self, node, nodes, master, user, user_shell, volumes): self._check_ipython_installed(node) - n_engines = node.num_processors + if self.node_engines is None: + n_engines = node.num_processors + else: + n_engines = int(self.node_engines) log.info("Adding %d engines on %s", n_engines, node.alias) - _start_engines(node, user) + _start_engines(node, user, n_engines) def on_remove_node(self, node, nodes, master, user, user_shell, volumes): raise NotImplementedError("on_remove_node method not implemented") @@ -332,7 +346,7 @@ def run(self, nodes, master, user, user_shell, volumes): master.ssh.execute("ipcluster stop", ignore_exit_status=True) time.sleep(2) log.info("Stopping IPython controller on %s", master.alias) - master.ssh.execute("pkill -f ipcontrollerapp", + master.ssh.execute("pkill -f IPython.parallel.controller", ignore_exit_status=True) master.ssh.execute("pkill -f 'ipython notebook'", ignore_exit_status=True) @@ -344,7 +358,7 @@ def run(self, nodes, master, user, user_shell, volumes): def _stop_engines(self, node, user): node.ssh.switch_user(user) - node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True) + node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True) node.ssh.switch_user('root') def on_add_node(self, node, nodes, master, user, user_shell, volumes): @@ -354,7 +368,7 @@ def on_remove_node(self, node, nodes, master, user, user_shell, volumes): raise NotImplementedError("on_remove_node method not implemented") -class IPClusterRestartEngines(DefaultClusterSetup): +class IPClusterRestartEngines(IPCluster): """Plugin to kill and restart all engines of an IPython cluster This plugin can be useful to hard-reset the all the engines, for instance @@ -364,14 +378,19 @@ class IPClusterRestartEngines(DefaultClusterSetup): This plugin is meant to be run manually with: starcluster runplugin plugin_conf_name cluster_name - """ def run(self, nodes, master, user, user_shell, volumes): n_total = 0 for node in nodes: - n_engines = node.num_processors - if node.is_master() and n_engines > 2: - n_engines -= 1 + if node.is_master() and (self.master_engines is not None): + n_engines = int(self.master_engines) + elif self.node_engines is not None: + n_engines = int(self.node_engines) + elif node.is_master(): + # and n_engines > 2: # XXX I'm not sure I understand this logic yet. + n_engines = node.num_processors - 1 + else: + n_engines = node.num_processors self.pool.simple_job( _start_engines, (node, user, n_engines, True), jobid=node.alias)