From 55e304e82d950cbe308af95ebaface39917efafb Mon Sep 17 00:00:00 2001 From: "John C. Earls" Date: Sat, 7 Dec 2013 16:24:07 -0800 Subject: [PATCH 1/2] Added ec2 instance profiles for IAM --- starcluster/awsutils.py | 17 +++++++---- starcluster/cluster.py | 64 +++++++++++++++++++++++++++-------------- starcluster/node.py | 12 ++++++++ starcluster/static.py | 1 + 4 files changed, 68 insertions(+), 26 deletions(-) diff --git a/starcluster/awsutils.py b/starcluster/awsutils.py index ccc96a15c..617d0b48e 100644 --- a/starcluster/awsutils.py +++ b/starcluster/awsutils.py @@ -436,7 +436,8 @@ def request_instances(self, image_id, price=None, instance_type='m1.small', launch_group=None, availability_zone_group=None, placement=None, user_data=None, placement_group=None, - block_device_map=None, subnet_id=None): + block_device_map=None, subnet_id=None, + instance_profile_name=None): """ Convenience method for running spot or flat-rate instances """ @@ -476,6 +477,7 @@ def request_instances(self, image_id, price=None, instance_type='m1.small', placement=placement, placement_group=placement_group, user_data=user_data, + instance_profile_name=instance_profile_name, block_device_map=block_device_map) if price: @@ -497,7 +499,9 @@ def request_spot_instances(self, price, image_id, instance_type='m1.small', availability_zone_group=None, security_group_ids=None, subnet_id=None, placement=None, placement_group=None, - user_data=None, block_device_map=None): + block_device_map=None, + instance_profile_name=None): + kwargs = locals() kwargs.pop('self') return self.conn.request_spot_instances(**kwargs) @@ -560,7 +564,9 @@ def wait_for_propagation(self, instances=None, spot_requests=None, def run_instances(self, image_id, instance_type='m1.small', min_count=1, max_count=1, key_name=None, security_groups=None, placement=None, user_data=None, placement_group=None, - block_device_map=None, subnet_id=None): + block_device_map=None, subnet_id=None, + instance_profile_name = None): + kwargs = dict( instance_type=instance_type, min_count=min_count, @@ -569,8 +575,9 @@ def run_instances(self, image_id, instance_type='m1.small', min_count=1, subnet_id=subnet_id, placement=placement, user_data=user_data, - placement_group=placement_group, - block_device_map=block_device_map + block_device_map=block_device_map, + instance_profile_name=instance_profile_name, + placement_group=placement_group ) if subnet_id: kwargs.update( diff --git a/starcluster/cluster.py b/starcluster/cluster.py index 4c0fb9a7d..8fade641d 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -51,7 +51,8 @@ def __repr__(self): return "" % self.ec2.region.name def get_cluster(self, cluster_name, group=None, load_receipt=True, - load_plugins=True, load_volumes=True, require_keys=True): + load_plugins=True, load_volumes=True, + load_instance_profile_name=True, require_keys=True): """ Returns a Cluster object representing an active cluster """ @@ -64,7 +65,8 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True, cluster_group=group) if load_receipt: cl.load_receipt(load_plugins=load_plugins, - load_volumes=load_volumes) + load_volumes=load_volumes, + load_instance_profile_name=load_instance_profile_name) try: cl.keyname = cl.keyname or cl.master_node.key_name key_location = self.cfg.get_key(cl.keyname).get('key_location') @@ -79,14 +81,15 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True, except exception.SecurityGroupDoesNotExist: raise exception.ClusterDoesNotExist(cluster_name) - def get_clusters(self, load_receipt=True, load_plugins=True): + def get_clusters(self, load_receipt=True, load_plugins=True, load_instance_profile_name=True): """ Returns a list of all active clusters """ cluster_groups = self.get_cluster_security_groups() clusters = [self.get_cluster(g.name, group=g, load_receipt=load_receipt, - load_plugins=load_plugins) + load_plugins=load_plugins, + load_instance_profile_name=load_instance_profile_name) for g in cluster_groups] return clusters @@ -169,24 +172,30 @@ def _get_cluster_name(self, cluster_name): def add_node(self, cluster_name, alias=None, no_create=False, image_id=None, instance_type=None, zone=None, - placement_group=None, spot_bid=None): + placement_group=None, spot_bid=None, + instance_profile_name=None): cl = self.get_cluster(cluster_name) return cl.add_node(alias=alias, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, - no_create=no_create) + no_create=no_create, + instance_profile_name=instance_profile_name) def add_nodes(self, cluster_name, num_nodes, aliases=None, no_create=False, image_id=None, instance_type=None, zone=None, - placement_group=None, spot_bid=None): + placement_group=None, spot_bid=None, + instance_profile_name=None): """ Add one or more nodes to cluster """ + print __file__,191 + print instance_profile_name cl = self.get_cluster(cluster_name) return cl.add_nodes(num_nodes, aliases=aliases, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, - no_create=no_create) + no_create=no_create, + instance_profile_name=instance_profile_name) def remove_node(self, cluster_name, alias, terminate=True, force=False): """ @@ -281,7 +290,8 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False): tag = self.get_tag_from_sg(scg.name) try: cl = self.get_cluster(tag, group=scg, load_plugins=False, - load_volumes=False, require_keys=False) + load_volumes=False, require_keys=False, + load_instance_profile_name=True) except exception.IncompatibleCluster as e: sep = '*' * 60 log.error('\n'.join([sep, e.msg, sep]), @@ -307,6 +317,8 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False): print 'Uptime: %s' % uptime print 'Zone: %s' % getattr(n, 'placement', 'N/A') print 'Keypair: %s' % getattr(n, 'key_name', 'N/A') + ipn = cl.instance_profile_name if cl.instance_profile_name else 'N/A' + print 'IAM instance profile: %s' % ipn ebs_vols = [] for node in nodes: devices = node.attached_vols @@ -369,7 +381,6 @@ def run_plugin(self, plugin_name, cluster_tag): class Cluster(object): - def __init__(self, ec2_conn=None, spot_bid=None, @@ -400,6 +411,7 @@ def __init__(self, disable_cloudinit=False, vpc_id=None, subnet_id=None, + instance_profile_name=None, **kwargs): # validation if vpc_id or subnet_id: @@ -427,9 +439,11 @@ def __init__(self, self.cluster_size = cluster_size or 0 self.volumes = self.load_volumes(volumes) self.plugins = self.load_plugins(plugins) + self.userdata_scripts = userdata_scripts or [] self.dns_prefix = dns_prefix and cluster_tag - + + self._cluster_group = None self._placement_group = None self._zone = None @@ -568,13 +582,13 @@ def __str__(self): cfg = self.__getstate__() return pprint.pformat(cfg) - def load_receipt(self, load_plugins=True, load_volumes=True): + def load_receipt(self, load_plugins=True, load_volumes=True, load_instance_profile_name=True): """ Load the original settings used to launch this cluster into this Cluster object. Settings are loaded from cluster group tags and the master node's user data. """ - if not (load_plugins or load_volumes): + if not (load_plugins or load_volumes or load_instance_profile_name): return True try: tags = self.cluster_group.tags @@ -609,6 +623,8 @@ def load_receipt(self, load_plugins=True, load_volumes=True): self.plugins = self.load_plugins(master.get_plugins()) if load_volumes: self.volumes = master.get_volumes() + if load_instance_profile_name: + self.instance_profile_name = master.get_instance_profile_name() except exception.PluginError: log.error("An error occurred while loading plugins: ", exc_info=True) @@ -817,11 +833,12 @@ def get_spot_requests_or_raise(self): return spots def create_node(self, alias, image_id=None, instance_type=None, zone=None, - placement_group=None, spot_bid=None, force_flat=False): + placement_group=None, spot_bid=None, force_flat=False, instance_profile_name=None): return self.create_nodes([alias], image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, - spot_bid=spot_bid, force_flat=force_flat)[0] + spot_bid=spot_bid, force_flat=force_flat, + instance_profile_name=instance_profile_name)[0] def _get_cluster_userdata(self, aliases): alias_file = utils.string_to_file('\n'.join(['#ignored'] + aliases), @@ -843,7 +860,7 @@ def _get_cluster_userdata(self, aliases): def create_nodes(self, aliases, image_id=None, instance_type=None, zone=None, placement_group=None, spot_bid=None, - force_flat=False): + force_flat=False, instance_profile_name=None): """ Convenience method for requesting instances with this cluster's settings. All settings (kwargs) except force_flat default to cluster @@ -868,6 +885,7 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, image_id = image_id or self.node_image_id count = len(aliases) if not spot_bid else 1 user_data = self._get_cluster_userdata(aliases) + instance_profile_name = instance_profile_name or self.instance_profile_name kwargs = dict(price=spot_bid, instance_type=instance_type, min_count=count, max_count=count, count=count, key_name=self.keyname, security_groups=[cluster_sg], @@ -876,7 +894,9 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, placement=zone or getattr(self.zone, 'name', None), user_data=user_data, placement_group=placement_group, - subnet_id=self.subnet_id) + subnet_id=self.subnet_id, + instance_profile_name=instance_profile_name + ) resvs = [] if spot_bid: security_group_id = self.cluster_group.id @@ -907,7 +927,7 @@ def _get_next_node_num(self): def add_node(self, alias=None, no_create=False, image_id=None, instance_type=None, zone=None, placement_group=None, - spot_bid=None): + spot_bid=None, instance_profile_name=None): """ Add a single node to this cluster """ @@ -917,11 +937,12 @@ def add_node(self, alias=None, no_create=False, image_id=None, return self.add_nodes(1, aliases=aliases, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, - spot_bid=spot_bid, no_create=no_create) + spot_bid=spot_bid, no_create=no_create, + instance_profile_name=instance_profile_name) def add_nodes(self, num_nodes, aliases=None, image_id=None, instance_type=None, zone=None, placement_group=None, - spot_bid=None, no_create=False): + spot_bid=None, no_create=False, instance_profile_name=None): """ Add new nodes to this cluster @@ -948,7 +969,8 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None, resp = self.create_nodes(aliases, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, - spot_bid=spot_bid) + spot_bid=spot_bid, + instance_profile_name=instance_profile_name) if spot_bid or self.spot_bid: self.ec2.wait_for_propagation(spot_requests=resp) else: diff --git a/starcluster/node.py b/starcluster/node.py index 9574535c3..698ac9f48 100644 --- a/starcluster/node.py +++ b/starcluster/node.py @@ -186,6 +186,14 @@ def get_volumes(self): payload = volstxt.split('\n', 2)[2] return utils.decode_uncompress_load(payload) + def get_instance_profile_name(self): + if self.instance.instance_profile: + arn = self.instance.instance_profile['arn'] + match = re.match(r'arn:aws:iam::\d{12}:instance-profile/(\S+)', arn) + return match.group(1) + else: + return None + def _remove_all_tags(self): tags = self.tags.keys()[:] for t in tags: @@ -236,6 +244,10 @@ def memory(self): "free -m | grep -i mem | awk '{print $2}'")[0]) return self._memory + @property + def instance_profile(self): + return self.instance.instance_profile + @property def ip_address(self): return self.instance.ip_address diff --git a/starcluster/static.py b/starcluster/static.py index a607c5c9d..a8e460455 100644 --- a/starcluster/static.py +++ b/starcluster/static.py @@ -255,4 +255,5 @@ def create_sc_config_dirs(): 'force_spot_master': (bool, False, False, None, None), 'disable_cloudinit': (bool, False, False, None, None), 'dns_prefix': (bool, False, False, None, None), + 'instance_profile_name': (str, False, None, None, None), } From 7eaa3761a124db8498eaa03994d662e5ddc6a27e Mon Sep 17 00:00:00 2001 From: "John C. Earls" Date: Sat, 7 Dec 2013 16:50:13 -0800 Subject: [PATCH 2/2] Shortened instance_profile_name to iam_profile Also upped boto requirements. --- setup.py | 2 +- starcluster/awsutils.py | 10 ++++----- starcluster/cluster.py | 50 ++++++++++++++++++++--------------------- starcluster/node.py | 2 +- starcluster/static.py | 2 +- 5 files changed, 33 insertions(+), 33 deletions(-) diff --git a/setup.py b/setup.py index bf0caa0e4..03c09df7d 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ console_scripts = ['starcluster = starcluster.cli:main'] extra = dict(test_suite="starcluster.tests", tests_require="nose", - install_requires=["paramiko>=1.11.0", "boto>=2.10.0", + install_requires=["paramiko>=1.11.0", "boto>=2.13.2", "workerpool>=0.9.2", "Jinja2>=2.7", "decorator>=3.4.0", "iptools>=0.6.1", "optcomplete>=1.2-devel", diff --git a/starcluster/awsutils.py b/starcluster/awsutils.py index 617d0b48e..be61725c9 100644 --- a/starcluster/awsutils.py +++ b/starcluster/awsutils.py @@ -437,7 +437,7 @@ def request_instances(self, image_id, price=None, instance_type='m1.small', availability_zone_group=None, placement=None, user_data=None, placement_group=None, block_device_map=None, subnet_id=None, - instance_profile_name=None): + iam_profile=None): """ Convenience method for running spot or flat-rate instances """ @@ -477,7 +477,7 @@ def request_instances(self, image_id, price=None, instance_type='m1.small', placement=placement, placement_group=placement_group, user_data=user_data, - instance_profile_name=instance_profile_name, + iam_profile=iam_profile, block_device_map=block_device_map) if price: @@ -500,7 +500,7 @@ def request_spot_instances(self, price, image_id, instance_type='m1.small', security_group_ids=None, subnet_id=None, placement=None, placement_group=None, block_device_map=None, - instance_profile_name=None): + iam_profile=None): kwargs = locals() kwargs.pop('self') @@ -565,7 +565,7 @@ def run_instances(self, image_id, instance_type='m1.small', min_count=1, max_count=1, key_name=None, security_groups=None, placement=None, user_data=None, placement_group=None, block_device_map=None, subnet_id=None, - instance_profile_name = None): + iam_profile = None): kwargs = dict( instance_type=instance_type, @@ -576,7 +576,7 @@ def run_instances(self, image_id, instance_type='m1.small', min_count=1, placement=placement, user_data=user_data, block_device_map=block_device_map, - instance_profile_name=instance_profile_name, + instance_profile_name=iam_profile, placement_group=placement_group ) if subnet_id: diff --git a/starcluster/cluster.py b/starcluster/cluster.py index 8fade641d..df6b5103d 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -52,7 +52,7 @@ def __repr__(self): def get_cluster(self, cluster_name, group=None, load_receipt=True, load_plugins=True, load_volumes=True, - load_instance_profile_name=True, require_keys=True): + load_iam_profile=True, require_keys=True): """ Returns a Cluster object representing an active cluster """ @@ -66,7 +66,7 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True, if load_receipt: cl.load_receipt(load_plugins=load_plugins, load_volumes=load_volumes, - load_instance_profile_name=load_instance_profile_name) + load_iam_profile=load_iam_profile) try: cl.keyname = cl.keyname or cl.master_node.key_name key_location = self.cfg.get_key(cl.keyname).get('key_location') @@ -81,7 +81,7 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True, except exception.SecurityGroupDoesNotExist: raise exception.ClusterDoesNotExist(cluster_name) - def get_clusters(self, load_receipt=True, load_plugins=True, load_instance_profile_name=True): + def get_clusters(self, load_receipt=True, load_plugins=True, load_iam_profile=True): """ Returns a list of all active clusters """ @@ -89,7 +89,7 @@ def get_clusters(self, load_receipt=True, load_plugins=True, load_instance_profi clusters = [self.get_cluster(g.name, group=g, load_receipt=load_receipt, load_plugins=load_plugins, - load_instance_profile_name=load_instance_profile_name) + load_iam_profile=load_iam_profile) for g in cluster_groups] return clusters @@ -173,29 +173,29 @@ def _get_cluster_name(self, cluster_name): def add_node(self, cluster_name, alias=None, no_create=False, image_id=None, instance_type=None, zone=None, placement_group=None, spot_bid=None, - instance_profile_name=None): + iam_profile=None): cl = self.get_cluster(cluster_name) return cl.add_node(alias=alias, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, no_create=no_create, - instance_profile_name=instance_profile_name) + iam_profile=iam_profile) def add_nodes(self, cluster_name, num_nodes, aliases=None, no_create=False, image_id=None, instance_type=None, zone=None, placement_group=None, spot_bid=None, - instance_profile_name=None): + iam_profile=None): """ Add one or more nodes to cluster """ print __file__,191 - print instance_profile_name + print iam_profile cl = self.get_cluster(cluster_name) return cl.add_nodes(num_nodes, aliases=aliases, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, no_create=no_create, - instance_profile_name=instance_profile_name) + iam_profile=iam_profile) def remove_node(self, cluster_name, alias, terminate=True, force=False): """ @@ -291,7 +291,7 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False): try: cl = self.get_cluster(tag, group=scg, load_plugins=False, load_volumes=False, require_keys=False, - load_instance_profile_name=True) + load_iam_profile=True) except exception.IncompatibleCluster as e: sep = '*' * 60 log.error('\n'.join([sep, e.msg, sep]), @@ -317,7 +317,7 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False): print 'Uptime: %s' % uptime print 'Zone: %s' % getattr(n, 'placement', 'N/A') print 'Keypair: %s' % getattr(n, 'key_name', 'N/A') - ipn = cl.instance_profile_name if cl.instance_profile_name else 'N/A' + ipn = cl.iam_profile if cl.iam_profile else 'N/A' print 'IAM instance profile: %s' % ipn ebs_vols = [] for node in nodes: @@ -411,7 +411,7 @@ def __init__(self, disable_cloudinit=False, vpc_id=None, subnet_id=None, - instance_profile_name=None, + iam_profile=None, **kwargs): # validation if vpc_id or subnet_id: @@ -582,13 +582,13 @@ def __str__(self): cfg = self.__getstate__() return pprint.pformat(cfg) - def load_receipt(self, load_plugins=True, load_volumes=True, load_instance_profile_name=True): + def load_receipt(self, load_plugins=True, load_volumes=True, load_iam_profile=True): """ Load the original settings used to launch this cluster into this Cluster object. Settings are loaded from cluster group tags and the master node's user data. """ - if not (load_plugins or load_volumes or load_instance_profile_name): + if not (load_plugins or load_volumes or load_iam_profile): return True try: tags = self.cluster_group.tags @@ -623,8 +623,8 @@ def load_receipt(self, load_plugins=True, load_volumes=True, load_instance_profi self.plugins = self.load_plugins(master.get_plugins()) if load_volumes: self.volumes = master.get_volumes() - if load_instance_profile_name: - self.instance_profile_name = master.get_instance_profile_name() + if load_iam_profile: + self.iam_profile = master.get_iam_profile() except exception.PluginError: log.error("An error occurred while loading plugins: ", exc_info=True) @@ -833,12 +833,12 @@ def get_spot_requests_or_raise(self): return spots def create_node(self, alias, image_id=None, instance_type=None, zone=None, - placement_group=None, spot_bid=None, force_flat=False, instance_profile_name=None): + placement_group=None, spot_bid=None, force_flat=False, iam_profile=None): return self.create_nodes([alias], image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, force_flat=force_flat, - instance_profile_name=instance_profile_name)[0] + iam_profile=iam_profile)[0] def _get_cluster_userdata(self, aliases): alias_file = utils.string_to_file('\n'.join(['#ignored'] + aliases), @@ -860,7 +860,7 @@ def _get_cluster_userdata(self, aliases): def create_nodes(self, aliases, image_id=None, instance_type=None, zone=None, placement_group=None, spot_bid=None, - force_flat=False, instance_profile_name=None): + force_flat=False, iam_profile=None): """ Convenience method for requesting instances with this cluster's settings. All settings (kwargs) except force_flat default to cluster @@ -885,7 +885,7 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, image_id = image_id or self.node_image_id count = len(aliases) if not spot_bid else 1 user_data = self._get_cluster_userdata(aliases) - instance_profile_name = instance_profile_name or self.instance_profile_name + iam_profile = iam_profile or self.iam_profile kwargs = dict(price=spot_bid, instance_type=instance_type, min_count=count, max_count=count, count=count, key_name=self.keyname, security_groups=[cluster_sg], @@ -895,7 +895,7 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, user_data=user_data, placement_group=placement_group, subnet_id=self.subnet_id, - instance_profile_name=instance_profile_name + iam_profile=iam_profile ) resvs = [] if spot_bid: @@ -927,7 +927,7 @@ def _get_next_node_num(self): def add_node(self, alias=None, no_create=False, image_id=None, instance_type=None, zone=None, placement_group=None, - spot_bid=None, instance_profile_name=None): + spot_bid=None, iam_profile=None): """ Add a single node to this cluster """ @@ -938,11 +938,11 @@ def add_node(self, alias=None, no_create=False, image_id=None, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, no_create=no_create, - instance_profile_name=instance_profile_name) + iam_profile=iam_profile) def add_nodes(self, num_nodes, aliases=None, image_id=None, instance_type=None, zone=None, placement_group=None, - spot_bid=None, no_create=False, instance_profile_name=None): + spot_bid=None, no_create=False, iam_profile=None): """ Add new nodes to this cluster @@ -970,7 +970,7 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, - instance_profile_name=instance_profile_name) + iam_profile=iam_profile) if spot_bid or self.spot_bid: self.ec2.wait_for_propagation(spot_requests=resp) else: diff --git a/starcluster/node.py b/starcluster/node.py index 698ac9f48..53b60d4b7 100644 --- a/starcluster/node.py +++ b/starcluster/node.py @@ -186,7 +186,7 @@ def get_volumes(self): payload = volstxt.split('\n', 2)[2] return utils.decode_uncompress_load(payload) - def get_instance_profile_name(self): + def get_iam_profile(self): if self.instance.instance_profile: arn = self.instance.instance_profile['arn'] match = re.match(r'arn:aws:iam::\d{12}:instance-profile/(\S+)', arn) diff --git a/starcluster/static.py b/starcluster/static.py index a8e460455..fddb249ed 100644 --- a/starcluster/static.py +++ b/starcluster/static.py @@ -255,5 +255,5 @@ def create_sc_config_dirs(): 'force_spot_master': (bool, False, False, None, None), 'disable_cloudinit': (bool, False, False, None, None), 'dns_prefix': (bool, False, False, None, None), - 'instance_profile_name': (str, False, None, None, None), + 'iam_profile': (str, False, None, None, None), }