From 0d830038510e47d703a4bc2521b1e71bcb055290 Mon Sep 17 00:00:00 2001 From: Mich Date: Fri, 29 Aug 2014 14:48:25 -0400 Subject: [PATCH 1/9] Fix the nodes filter withing wait_for_running_instances --- starcluster/cluster.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/starcluster/cluster.py b/starcluster/cluster.py index bbbaa127c..667e45692 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -775,13 +775,16 @@ def nodes(self): log.debug('returning self._nodes = %s' % self._nodes) return self._nodes - def get_nodes_or_raise(self): - nodes = self.nodes - if not nodes: + def get_nodes_or_raise(self, nodes=None): + _nodes = self.nodes + if not _nodes: filters = {'instance.group-name': self._security_group} terminated_nodes = self.ec2.get_all_instances(filters=filters) raise exception.NoClusterNodesFound(terminated_nodes) - return nodes + if nodes: + nodes_ids = [n.id for n in nodes] + _nodes = filter(lambda n: n.id in nodes_ids, _nodes) + return _nodes def get_node(self, identifier, nodes=None): """ @@ -1397,19 +1400,19 @@ def wait_for_running_instances(self, nodes=None, Wait until all cluster nodes are in a 'running' state """ log.info("Waiting for all nodes to be in a 'running' state...") - nodes = nodes or self.get_nodes_or_raise() + _nodes = nodes or self.get_nodes_or_raise() pbar = self.progress_bar.reset() - pbar.maxval = len(nodes) + pbar.maxval = len(_nodes) pbar.update(0) now = datetime.datetime.utcnow() timeout = now + datetime.timedelta(minutes=kill_pending_after_mins) while not pbar.finished: - running_nodes = [n for n in nodes if n.state == "running"] - pbar.maxval = len(nodes) + running_nodes = [n for n in _nodes if n.state == "running"] + pbar.maxval = len(_nodes) pbar.update(len(running_nodes)) if not pbar.finished: if datetime.datetime.utcnow() > timeout: - pending = [n for n in nodes if n not in running_nodes] + pending = [n for n in _nodes if n not in running_nodes] log.warn("%d nodes have been pending for >= %d mins " "- terminating" % (len(pending), kill_pending_after_mins)) @@ -1417,7 +1420,7 @@ def wait_for_running_instances(self, nodes=None, node.terminate() else: time.sleep(self.refresh_interval) - nodes = self.get_nodes_or_raise() + _nodes = self.get_nodes_or_raise(nodes) pbar.reset() def wait_for_ssh(self, nodes=None): From ca9eb49f61f5e38403c10fe6a9de70b7d0637a66 Mon Sep 17 00:00:00 2001 From: Mich Date: Fri, 29 Aug 2014 16:47:25 -0400 Subject: [PATCH 2/9] Fix wait_for_active_spots_filter filter --- starcluster/cluster.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/starcluster/cluster.py b/starcluster/cluster.py index bbbaa127c..bfebb5565 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -880,11 +880,14 @@ def spot_requests(self): filters['launch.group-id'] = group_id return self.ec2.get_all_spot_requests(filters=filters) - def get_spot_requests_or_raise(self): - spots = self.spot_requests - if not spots: + def get_spot_requests_or_raise(self, spots): + _spots = self.spot_requests + if not _spots: raise exception.NoClusterSpotRequests - return spots + if spots: + spots_ids = [s.id for s in spots] + _spots = filter(lambda s: s.id in spots_ids, _spots) + return _spots def create_node(self, alias, image_id=None, instance_type=None, zone=None, placement_group=None, spot_bid=None, force_flat=False): @@ -1386,7 +1389,7 @@ def wait_for_active_spots(self, spots=None): pbar.update(len(active_spots)) if not pbar.finished: time.sleep(self.refresh_interval) - spots = self.get_spot_requests_or_raise() + spots = self.get_spot_requests_or_raise(spots) pbar.reset() self.ec2.wait_for_propagation( instances=[s.instance_id for s in spots]) From 7649339055045a2ff80e726f6eb94b9045e6aba9 Mon Sep 17 00:00:00 2001 From: Mich Date: Tue, 2 Sep 2014 10:57:35 -0400 Subject: [PATCH 3/9] Streaming add nodes rather than wait for all --- starcluster/cluster.py | 68 +++++++++++++++++++++++++++++++++++------- starcluster/utils.py | 11 +++++++ 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/starcluster/cluster.py b/starcluster/cluster.py index f7a0332b5..0e87f5024 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -1021,7 +1021,13 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None, if self._make_alias(master=True) in aliases: raise exception.ClusterValidationError( "worker nodes cannot have master as an alias") - if not no_create: + if no_create: + self.wait_for_cluster(msg="Waiting for node(s) to come up...") + log.debug("Adding node(s): %s" % aliases) + for alias in aliases: + node = self.get_node(alias) + self.run_plugins(method_name="on_add_node", node=node) + else: if self.subnet: ip_count = self.subnet.available_ip_address_count if ip_count < len(aliases): @@ -1039,13 +1045,53 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None, spot_bid=spot_bid) if spot_bid or self.spot_bid: self.ec2.wait_for_propagation(spot_requests=resp) + self.streaming_add(spots=resp) else: self.ec2.wait_for_propagation(instances=resp[0].instances) - self.wait_for_cluster(msg="Waiting for node(s) to come up...") - log.debug("Adding node(s): %s" % aliases) - for alias in aliases: - node = self.get_node(alias) - self.run_plugins(method_name="on_add_node", node=node) + self.streaming_add(instances=resp[0].instances) + + def streaming_add(self, spots=[], instances=[]): + """ + As soon as a new node is ready, run the add plugins commands over it. + """ + assert bool(spots) != bool(instances), \ + "You must define either spots or instances" + + interval = self.refresh_interval + log.info("Waiting for one of the new nodes to be up " + "(updating every {}s)".format(interval)) + while True: + ready_instances = [] + if spots: + spots = self.get_spot_requests_or_raise(spots) + instance_ids = [] + + spots = utils.filter_move( + lambda s: s.state != 'active' or s.instance_id is None, + spots, instance_ids, lambda s: s.instance_id) + if spots: + log.info("Still waiting for spots: " + str(spots)) + if instance_ids: + log.info("Instance ids:" + str(instance_ids)) + instances += \ + self.ec2.get_all_instances(instance_ids=instance_ids) + if instances: + instances = self.get_nodes_or_raise(nodes=instances) + instances = utils.filter_move( + lambda i: i.state != 'running' or not i.is_up(), + instances, ready_instances) + if instances: + log.info("Still waiting for instances: " + str(instances)) + for ready_instance in ready_instances: + log.info("Adding node: %s" % ready_instance.alias) + up_nodes = filter(lambda n: n.is_up(), self.nodes) + self.run_plugins(method_name="on_add_node", + node=ready_instance, nodes=up_nodes) + if spots or instances: + time.sleep(interval) + else: + break + def remove_node(self, node=None, terminate=True, force=False): """ @@ -1678,7 +1724,7 @@ def _setup_cluster(self): self.run_plugins() def run_plugins(self, plugins=None, method_name="run", node=None, - reverse=False): + reverse=False, nodes=None): """ Run all plugins specified in this Cluster object's self.plugins list Uses plugins list instead of self.plugins if specified. @@ -1693,9 +1739,11 @@ def run_plugins(self, plugins=None, method_name="run", node=None, if reverse: plugs.reverse() for plug in plugs: - self.run_plugin(plug, method_name=method_name, node=node) + self.run_plugin(plug, method_name=method_name, node=node, + nodes=nodes) - def run_plugin(self, plugin, name='', method_name='run', node=None): + def run_plugin(self, plugin, name='', method_name='run', node=None, + nodes=None): """ Run a StarCluster plugin. @@ -1713,7 +1761,7 @@ def run_plugin(self, plugin, name='', method_name='run', node=None): log.warn("Plugin %s has no %s method...skipping" % (plugin_name, method_name)) return - args = [self.nodes, self.master_node, self.cluster_user, + args = [nodes or self.nodes, self.master_node, self.cluster_user, self.cluster_shell, self.volumes] if node: args.insert(0, node) diff --git a/starcluster/utils.py b/starcluster/utils.py index ec6f0bfff..1f7e31624 100644 --- a/starcluster/utils.py +++ b/starcluster/utils.py @@ -649,3 +649,14 @@ def get_spinner(msg): log.info(msg, extra=dict(__nonewline__=True)) s.start() return s + +def filter_move(keep_fct, in_, out, extract_fct=None): + def _filter(item): + if keep_fct(item): + return True + if extract_fct: + out.append(extract_fct(item)) + else: + out.append(item) + return False + return filter(_filter, in_) From 46a70bd0ccc288ed1f163b5d4219e19092118601 Mon Sep 17 00:00:00 2001 From: Mich Date: Tue, 2 Sep 2014 11:05:01 -0400 Subject: [PATCH 4/9] Fixed pep8 check --- starcluster/cluster.py | 3 +-- starcluster/utils.py | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/starcluster/cluster.py b/starcluster/cluster.py index 0e87f5024..350f71cae 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -1059,7 +1059,7 @@ def streaming_add(self, spots=[], instances=[]): interval = self.refresh_interval log.info("Waiting for one of the new nodes to be up " - "(updating every {}s)".format(interval)) + "(updating every {}s)".format(interval)) while True: ready_instances = [] if spots: @@ -1092,7 +1092,6 @@ def streaming_add(self, spots=[], instances=[]): else: break - def remove_node(self, node=None, terminate=True, force=False): """ Remove a single node from this cluster diff --git a/starcluster/utils.py b/starcluster/utils.py index 1f7e31624..1dda4f2c0 100644 --- a/starcluster/utils.py +++ b/starcluster/utils.py @@ -650,6 +650,7 @@ def get_spinner(msg): s.start() return s + def filter_move(keep_fct, in_, out, extract_fct=None): def _filter(item): if keep_fct(item): From 5a2148dc64b2060ed9898cf7a3ffea15648b63c1 Mon Sep 17 00:00:00 2001 From: Mich Date: Fri, 5 Sep 2014 14:17:24 -0400 Subject: [PATCH 5/9] Streaming propagation as well --- starcluster/awsutils.py | 26 ++++++++++++++++++++++++++ starcluster/cluster.py | 39 ++++++++++++++++++++++++++++++++------- 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/starcluster/awsutils.py b/starcluster/awsutils.py index 904be21af..e39cbc2a1 100644 --- a/starcluster/awsutils.py +++ b/starcluster/awsutils.py @@ -597,6 +597,32 @@ def wait_for_propagation(self, instances=None, spot_requests=None, instance_ids, self.get_all_instances, 'instance-id', 'instances', max_retries=max_retries, interval=interval) + def _check_for_propagation(self, obj_ids, fetch_func, id_filter, obj_name): + filters = {id_filter: obj_ids} + reqs_ids = [] + reqs = fetch_func(filters=filters) + reqs_ids = [req.id for req in reqs] + found = [oid for oid in obj_ids if oid in reqs_ids] + return found + + def check_for_propagation(self, instance_ids=None, spot_ids=None): + """ + Check propagated instances. Returns a tuple where the first item is + a list of the found instances and the second a list of the found + spot requests. + """ + found_instance_ids = [] + found_spot_ids = [] + if spot_ids: + found_instance_ids = self._check_for_propagation( + spot_ids, self.get_all_spot_requests, + 'spot-instance-request-id', 'spot requests') + if instance_ids: + found_spot_ids = self._check_for_propagation( + instance_ids, self.get_all_instances, 'instance-id', + 'instances') + return found_instance_ids, found_spot_ids + def run_instances(self, image_id, instance_type='m1.small', min_count=1, max_count=1, key_name=None, security_groups=None, placement=None, user_data=None, placement_group=None, diff --git a/starcluster/cluster.py b/starcluster/cluster.py index 350f71cae..8f5828408 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -1044,10 +1044,8 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None, placement_group=placement_group, spot_bid=spot_bid) if spot_bid or self.spot_bid: - self.ec2.wait_for_propagation(spot_requests=resp) self.streaming_add(spots=resp) else: - self.ec2.wait_for_propagation(instances=resp[0].instances) self.streaming_add(instances=resp[0].instances) def streaming_add(self, spots=[], instances=[]): @@ -1060,21 +1058,47 @@ def streaming_add(self, spots=[], instances=[]): interval = self.refresh_interval log.info("Waiting for one of the new nodes to be up " "(updating every {}s)".format(interval)) + + unpropagated_spots = spots + spots = [] + unpropagated_instances = instances + instances = [] while True: ready_instances = [] + if unpropagated_spots: + propagated_spot_ids, _ = self.ec2.check_for_propagation( + spot_ids=[s.id for s in unpropagated_spots]) + unpropagated_spots = utils.filter_move( + lambda s: s.id not in propagated_spot_ids, + unpropagated_spots, spots) + if unpropagated_spots: + log.info("Still waiting for unpropagated spots:" + + str(unpropagated_spots)) + if spots: - spots = self.get_spot_requests_or_raise(spots) instance_ids = [] - + spots = self.get_spot_requests_or_raise(spots) spots = utils.filter_move( lambda s: s.state != 'active' or s.instance_id is None, spots, instance_ids, lambda s: s.instance_id) - if spots: - log.info("Still waiting for spots: " + str(spots)) if instance_ids: log.info("Instance ids:" + str(instance_ids)) + # Those one are already propagated instances += \ self.ec2.get_all_instances(instance_ids=instance_ids) + if spots: + log.info("Still waiting for spots: " + str(spots)) + + if unpropagated_instances: + _, propagated_instance_ids = self.ec2.check_for_propagation( + instance_ids=[s.id for s in unpropagated_instances]) + unpropagated_instances = utils.filter_move( + lambda i: i.id not in propagated_instance_ids, + unpropagated_instances, instances) + if unpropagated_instances: + log.info("Still waiting for unpropagated instances: " + + str(unpropagated_instances)) + if instances: instances = self.get_nodes_or_raise(nodes=instances) instances = utils.filter_move( @@ -1087,7 +1111,8 @@ def streaming_add(self, spots=[], instances=[]): up_nodes = filter(lambda n: n.is_up(), self.nodes) self.run_plugins(method_name="on_add_node", node=ready_instance, nodes=up_nodes) - if spots or instances: + if any([unpropagated_spots, spots, + unpropagated_instances, instances]): time.sleep(interval) else: break From 36317a01232947ccf2de536ce9390daf0f3209c1 Mon Sep 17 00:00:00 2001 From: Mich Date: Fri, 5 Sep 2014 16:02:39 -0400 Subject: [PATCH 6/9] streaming_add_nodes parallel ssh is_up check --- starcluster/cluster.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/starcluster/cluster.py b/starcluster/cluster.py index 8f5828408..df1370b34 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -1101,9 +1101,12 @@ def streaming_add(self, spots=[], instances=[]): if instances: instances = self.get_nodes_or_raise(nodes=instances) - instances = utils.filter_move( - lambda i: i.state != 'running' or not i.is_up(), - instances, ready_instances) + ssh_up = self.pool.map(lambda i: i.is_up(), instances) + zip_instances = utils.filter_move( + lambda i: i[0].state != 'running' or not i[1], + zip(instances, ssh_up), ready_instances, + lambda i: i[0]) + instances = [i[0] for i in zip_instances] if instances: log.info("Still waiting for instances: " + str(instances)) for ready_instance in ready_instances: From f16db6bb8667f46f87d536d6dc63ffd8682b998d Mon Sep 17 00:00:00 2001 From: Mich Date: Fri, 5 Sep 2014 16:06:27 -0400 Subject: [PATCH 7/9] Fixed pep8 checks --- starcluster/cluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/starcluster/cluster.py b/starcluster/cluster.py index df1370b34..762e707c0 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -1073,7 +1073,7 @@ def streaming_add(self, spots=[], instances=[]): unpropagated_spots, spots) if unpropagated_spots: log.info("Still waiting for unpropagated spots:" - + str(unpropagated_spots)) + + str(unpropagated_spots)) if spots: instance_ids = [] @@ -1097,7 +1097,7 @@ def streaming_add(self, spots=[], instances=[]): unpropagated_instances, instances) if unpropagated_instances: log.info("Still waiting for unpropagated instances: " - + str(unpropagated_instances)) + + str(unpropagated_instances)) if instances: instances = self.get_nodes_or_raise(nodes=instances) From 2f78028b5e9d31dc5e3fbaf368990a43aed2e126 Mon Sep 17 00:00:00 2001 From: Mich Date: Sat, 6 Sep 2014 11:34:36 -0400 Subject: [PATCH 8/9] don't sleep before cycling if nodes were added --- starcluster/cluster.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/starcluster/cluster.py b/starcluster/cluster.py index 762e707c0..0032a0692 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -1116,6 +1116,10 @@ def streaming_add(self, spots=[], instances=[]): node=ready_instance, nodes=up_nodes) if any([unpropagated_spots, spots, unpropagated_instances, instances]): + if ready_instances: + # Nodes were added, that took + # time so we should loop again now + continue time.sleep(interval) else: break From 1665a781c0eb0ed7214de2ed3b79ec09e0667dac Mon Sep 17 00:00:00 2001 From: Mich Date: Tue, 9 Sep 2014 10:25:06 -0400 Subject: [PATCH 9/9] No sleep if instances ssh needs to be check as this takes time --- starcluster/cluster.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/starcluster/cluster.py b/starcluster/cluster.py index 0032a0692..2b8b4124f 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -1116,8 +1116,9 @@ def streaming_add(self, spots=[], instances=[]): node=ready_instance, nodes=up_nodes) if any([unpropagated_spots, spots, unpropagated_instances, instances]): - if ready_instances: - # Nodes were added, that took + if instances or ready_instances: + # instances means we wait on ssh is_up, no need to sleep + # ready_instances means nodes were added, that took # time so we should loop again now continue time.sleep(interval)