Skip to content

Commit

Permalink
Patched in Add an argument to ipcluster plugin to specify the number …
Browse files Browse the repository at this point in the history
…of engines (jtriley#547)
  • Loading branch information
dantreiman committed Oct 4, 2017
1 parent a2e7e2f commit 2154008
Showing 1 changed file with 35 additions and 16 deletions.
51 changes: 35 additions & 16 deletions starcluster/plugins/ipcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _start_engines(node, user, n_engines=None, kill_existing=False):
n_engines = node.num_processors
node.ssh.switch_user(user)
if kill_existing:
node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True)
node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True)
node.ssh.execute("ipcluster engines --n=%i --daemonize" % n_engines)
node.ssh.switch_user('root')

Expand All @@ -91,7 +91,7 @@ class IPCluster(DefaultClusterSetup):
"""
def __init__(self, enable_notebook=False, notebook_passwd=None,
notebook_directory=None, packer=None, log_level='INFO'):
notebook_directory=None, packer=None, master_engines=None, node_engines=None, log_level='INFO'):
super(IPCluster, self).__init__()
if isinstance(enable_notebook, basestring):
self.enable_notebook = enable_notebook.lower().strip() == 'true'
Expand All @@ -100,6 +100,8 @@ def __init__(self, enable_notebook=False, notebook_passwd=None,
self.notebook_passwd = notebook_passwd or utils.generate_passwd(16)
self.notebook_directory = notebook_directory
self.log_level = log_level
self.master_engines = master_engines
self.node_engines = node_engines
if packer not in (None, 'json', 'pickle', 'msgpack'):
log.error("Unsupported packer: %s", packer)
self.packer = None
Expand Down Expand Up @@ -163,7 +165,11 @@ def _write_config(self, master, user, profile_dir):
f.close()

def _start_cluster(self, master, profile_dir):
n_engines = max(1, master.num_processors - 1)
if self.master_engines is None:
n_engines = max(1, master.num_processors - 1)
else:
n_engines = int(self.master_engines)
print "Setting master engines to '%s'" % self.master_engines
log.info("Starting the IPython controller and %i engines on master"
% n_engines)
# cleanup existing connection files, to prevent their use
Expand Down Expand Up @@ -215,7 +221,7 @@ def _start_cluster(self, master, profile_dir):
self._authorize_port(master, (1000, 65535), "IPython controller")
return local_json, n_engines

def _start_notebook(self, master, user, profile_dir):
def _start_notebook(self, master, user, profile_dir, time_to_dead=60.0):
log.info("Setting up IPython web notebook for user: %s" % user)
user_cert = posixpath.join(profile_dir, '%s.pem' % user)
ssl_cert = posixpath.join(profile_dir, '%s.pem' % user)
Expand All @@ -242,6 +248,7 @@ def _start_notebook(self, master, user, profile_dir):
"c.NotebookApp.open_browser = False",
"c.NotebookApp.password = u'%s'" % sha1pass,
"c.NotebookApp.port = %d" % notebook_port,
"c.NotebookApp.time_to_dead = %d" % time_to_dead,
]))
f.close()
if self.notebook_directory is not None:
Expand Down Expand Up @@ -288,12 +295,16 @@ def run(self, nodes, master, user, user_shell, volumes):
cfile, n_engines_master = self._start_cluster(master, profile_dir)
# Start engines on each of the non-master nodes
non_master_nodes = [node for node in nodes if not node.is_master()]
n_engines_non_master = 0
for node in non_master_nodes:
if self.node_engines is None:
n_engines = node.num_processors
else:
n_engines = int(self.node_engines)
self.pool.simple_job(
_start_engines, (node, user, node.num_processors),
_start_engines, (node, user, n_engines),
jobid=node.alias)
n_engines_non_master = sum(node.num_processors
for node in non_master_nodes)
n_engines_non_master += n_engines
if len(non_master_nodes) > 0:
log.info("Adding %d engines on %d nodes",
n_engines_non_master, len(non_master_nodes))
Expand All @@ -310,9 +321,12 @@ def run(self, nodes, master, user, user_shell, volumes):

def on_add_node(self, node, nodes, master, user, user_shell, volumes):
self._check_ipython_installed(node)
n_engines = node.num_processors
if self.node_engines is None:
n_engines = node.num_processors
else:
n_engines = int(self.node_engines)
log.info("Adding %d engines on %s", n_engines, node.alias)
_start_engines(node, user)
_start_engines(node, user, n_engines)

def on_remove_node(self, node, nodes, master, user, user_shell, volumes):
raise NotImplementedError("on_remove_node method not implemented")
Expand All @@ -332,7 +346,7 @@ def run(self, nodes, master, user, user_shell, volumes):
master.ssh.execute("ipcluster stop", ignore_exit_status=True)
time.sleep(2)
log.info("Stopping IPython controller on %s", master.alias)
master.ssh.execute("pkill -f ipcontrollerapp",
master.ssh.execute("pkill -f IPython.parallel.controller",
ignore_exit_status=True)
master.ssh.execute("pkill -f 'ipython notebook'",
ignore_exit_status=True)
Expand All @@ -344,7 +358,7 @@ def run(self, nodes, master, user, user_shell, volumes):

def _stop_engines(self, node, user):
node.ssh.switch_user(user)
node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True)
node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True)
node.ssh.switch_user('root')

def on_add_node(self, node, nodes, master, user, user_shell, volumes):
Expand All @@ -354,7 +368,7 @@ def on_remove_node(self, node, nodes, master, user, user_shell, volumes):
raise NotImplementedError("on_remove_node method not implemented")


class IPClusterRestartEngines(DefaultClusterSetup):
class IPClusterRestartEngines(IPCluster):
"""Plugin to kill and restart all engines of an IPython cluster
This plugin can be useful to hard-reset the all the engines, for instance
Expand All @@ -364,14 +378,19 @@ class IPClusterRestartEngines(DefaultClusterSetup):
This plugin is meant to be run manually with:
starcluster runplugin plugin_conf_name cluster_name
"""
def run(self, nodes, master, user, user_shell, volumes):
n_total = 0
for node in nodes:
n_engines = node.num_processors
if node.is_master() and n_engines > 2:
n_engines -= 1
if node.is_master() and (self.master_engines is not None):
n_engines = int(self.master_engines)
elif self.node_engines is not None:
n_engines = int(self.node_engines)
elif node.is_master():
# and n_engines > 2: # XXX I'm not sure I understand this logic yet.
n_engines = node.num_processors - 1
else:
n_engines = node.num_processors
self.pool.simple_job(
_start_engines, (node, user, n_engines, True),
jobid=node.alias)
Expand Down

0 comments on commit 2154008

Please sign in to comment.