diff --git a/setup.py b/setup.py index bf0caa0e4..03c09df7d 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ console_scripts = ['starcluster = starcluster.cli:main'] extra = dict(test_suite="starcluster.tests", tests_require="nose", - install_requires=["paramiko>=1.11.0", "boto>=2.10.0", + install_requires=["paramiko>=1.11.0", "boto>=2.13.2", "workerpool>=0.9.2", "Jinja2>=2.7", "decorator>=3.4.0", "iptools>=0.6.1", "optcomplete>=1.2-devel", diff --git a/starcluster/awsutils.py b/starcluster/awsutils.py index bb4cc88dd..1e8a78d57 100644 --- a/starcluster/awsutils.py +++ b/starcluster/awsutils.py @@ -443,7 +443,8 @@ def request_instances(self, image_id, price=None, instance_type='m1.small', launch_group=None, availability_zone_group=None, placement=None, user_data=None, placement_group=None, - block_device_map=None, subnet_id=None): + block_device_map=None, subnet_id=None, + iam_profile=None): """ Convenience method for running spot or flat-rate instances """ @@ -483,6 +484,7 @@ def request_instances(self, image_id, price=None, instance_type='m1.small', placement=placement, placement_group=placement_group, user_data=user_data, + iam_profile=iam_profile, block_device_map=block_device_map) if price: @@ -504,7 +506,9 @@ def request_spot_instances(self, price, image_id, instance_type='m1.small', availability_zone_group=None, security_group_ids=None, subnet_id=None, placement=None, placement_group=None, - user_data=None, block_device_map=None): + block_device_map=None, + iam_profile=None): + kwargs = locals() kwargs.pop('self') return self.conn.request_spot_instances(**kwargs) @@ -567,7 +571,9 @@ def wait_for_propagation(self, instances=None, spot_requests=None, def run_instances(self, image_id, instance_type='m1.small', min_count=1, max_count=1, key_name=None, security_groups=None, placement=None, user_data=None, placement_group=None, - block_device_map=None, subnet_id=None): + block_device_map=None, subnet_id=None, + iam_profile = None): + kwargs = dict( instance_type=instance_type, min_count=min_count, @@ -576,8 +582,9 @@ def run_instances(self, image_id, instance_type='m1.small', min_count=1, subnet_id=subnet_id, placement=placement, user_data=user_data, - placement_group=placement_group, - block_device_map=block_device_map + block_device_map=block_device_map, + instance_profile_name=iam_profile, + placement_group=placement_group ) if subnet_id: kwargs.update( diff --git a/starcluster/cluster.py b/starcluster/cluster.py index 379de0f79..8054ceae8 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -51,7 +51,8 @@ def __repr__(self): return "" % self.ec2.region.name def get_cluster(self, cluster_name, group=None, load_receipt=True, - load_plugins=True, load_volumes=True, require_keys=True): + load_plugins=True, load_volumes=True, + load_iam_profile=True, require_keys=True): """ Returns a Cluster object representing an active cluster """ @@ -64,7 +65,8 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True, cluster_group=group) if load_receipt: cl.load_receipt(load_plugins=load_plugins, - load_volumes=load_volumes) + load_volumes=load_volumes, + load_iam_profile=load_iam_profile) try: cl.keyname = cl.keyname or cl.master_node.key_name key_location = self.cfg.get_key(cl.keyname).get('key_location') @@ -79,14 +81,15 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True, except exception.SecurityGroupDoesNotExist: raise exception.ClusterDoesNotExist(cluster_name) - def get_clusters(self, load_receipt=True, load_plugins=True): + def get_clusters(self, load_receipt=True, load_plugins=True, load_iam_profile=True): """ Returns a list of all active clusters """ cluster_groups = self.get_cluster_security_groups() clusters = [self.get_cluster(g.name, group=g, load_receipt=load_receipt, - load_plugins=load_plugins) + load_plugins=load_plugins, + load_iam_profile=load_iam_profile) for g in cluster_groups] return clusters @@ -169,24 +172,30 @@ def _get_cluster_name(self, cluster_name): def add_node(self, cluster_name, alias=None, no_create=False, image_id=None, instance_type=None, zone=None, - placement_group=None, spot_bid=None): + placement_group=None, spot_bid=None, + iam_profile=None): cl = self.get_cluster(cluster_name) return cl.add_node(alias=alias, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, - no_create=no_create) + no_create=no_create, + iam_profile=iam_profile) def add_nodes(self, cluster_name, num_nodes, aliases=None, no_create=False, image_id=None, instance_type=None, zone=None, - placement_group=None, spot_bid=None): + placement_group=None, spot_bid=None, + iam_profile=None): """ Add one or more nodes to cluster """ + print __file__,191 + print iam_profile cl = self.get_cluster(cluster_name) return cl.add_nodes(num_nodes, aliases=aliases, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, - no_create=no_create) + no_create=no_create, + iam_profile=iam_profile) def remove_node(self, cluster_name, alias, terminate=True, force=False): """ @@ -281,7 +290,8 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False): tag = self.get_tag_from_sg(scg.name) try: cl = self.get_cluster(tag, group=scg, load_plugins=False, - load_volumes=False, require_keys=False) + load_volumes=False, require_keys=False, + load_iam_profile=True) except exception.IncompatibleCluster as e: sep = '*' * 60 log.error('\n'.join([sep, e.msg, sep]), @@ -307,6 +317,8 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False): print 'Uptime: %s' % uptime print 'Zone: %s' % getattr(n, 'placement', 'N/A') print 'Keypair: %s' % getattr(n, 'key_name', 'N/A') + ipn = cl.iam_profile if cl.iam_profile else 'N/A' + print 'IAM instance profile: %s' % ipn ebs_vols = [] for node in nodes: devices = node.attached_vols @@ -369,7 +381,6 @@ def run_plugin(self, plugin_name, cluster_tag): class Cluster(object): - def __init__(self, ec2_conn=None, spot_bid=None, @@ -400,6 +411,7 @@ def __init__(self, disable_cloudinit=False, vpc_id=None, subnet_id=None, + iam_profile=None, **kwargs): # validation if vpc_id or subnet_id: @@ -427,9 +439,11 @@ def __init__(self, self.cluster_size = cluster_size or 0 self.volumes = self.load_volumes(volumes) self.plugins = self.load_plugins(plugins) + self.userdata_scripts = userdata_scripts or [] self.dns_prefix = dns_prefix and cluster_tag - + + self._cluster_group = None self._placement_group = None self._zone = None @@ -568,13 +582,13 @@ def __str__(self): cfg = self.__getstate__() return pprint.pformat(cfg) - def load_receipt(self, load_plugins=True, load_volumes=True): + def load_receipt(self, load_plugins=True, load_volumes=True, load_iam_profile=True): """ Load the original settings used to launch this cluster into this Cluster object. Settings are loaded from cluster group tags and the master node's user data. """ - if not (load_plugins or load_volumes): + if not (load_plugins or load_volumes or load_iam_profile): return True try: tags = self.cluster_group.tags @@ -609,6 +623,8 @@ def load_receipt(self, load_plugins=True, load_volumes=True): self.plugins = self.load_plugins(master.get_plugins()) if load_volumes: self.volumes = master.get_volumes() + if load_iam_profile: + self.iam_profile = master.get_iam_profile() except exception.PluginError: log.error("An error occurred while loading plugins: ", exc_info=True) @@ -817,11 +833,12 @@ def get_spot_requests_or_raise(self): return spots def create_node(self, alias, image_id=None, instance_type=None, zone=None, - placement_group=None, spot_bid=None, force_flat=False): + placement_group=None, spot_bid=None, force_flat=False, iam_profile=None): return self.create_nodes([alias], image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, - spot_bid=spot_bid, force_flat=force_flat)[0] + spot_bid=spot_bid, force_flat=force_flat, + iam_profile=iam_profile)[0] def _get_cluster_userdata(self, aliases): alias_file = utils.string_to_file('\n'.join(['#ignored'] + aliases), @@ -843,7 +860,7 @@ def _get_cluster_userdata(self, aliases): def create_nodes(self, aliases, image_id=None, instance_type=None, zone=None, placement_group=None, spot_bid=None, - force_flat=False): + force_flat=False, iam_profile=None): """ Convenience method for requesting instances with this cluster's settings. All settings (kwargs) except force_flat default to cluster @@ -868,6 +885,7 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, image_id = image_id or self.node_image_id count = len(aliases) if not spot_bid else 1 user_data = self._get_cluster_userdata(aliases) + iam_profile = iam_profile or self.iam_profile kwargs = dict(price=spot_bid, instance_type=instance_type, min_count=count, max_count=count, count=count, key_name=self.keyname, security_groups=[cluster_sg], @@ -876,7 +894,9 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, placement=zone or getattr(self.zone, 'name', None), user_data=user_data, placement_group=placement_group, - subnet_id=self.subnet_id) + subnet_id=self.subnet_id, + iam_profile=iam_profile + ) resvs = [] if spot_bid: security_group_id = self.cluster_group.id @@ -907,7 +927,7 @@ def _get_next_node_num(self): def add_node(self, alias=None, no_create=False, image_id=None, instance_type=None, zone=None, placement_group=None, - spot_bid=None): + spot_bid=None, iam_profile=None): """ Add a single node to this cluster """ @@ -917,11 +937,12 @@ def add_node(self, alias=None, no_create=False, image_id=None, return self.add_nodes(1, aliases=aliases, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, - spot_bid=spot_bid, no_create=no_create) + spot_bid=spot_bid, no_create=no_create, + iam_profile=iam_profile) def add_nodes(self, num_nodes, aliases=None, image_id=None, instance_type=None, zone=None, placement_group=None, - spot_bid=None, no_create=False): + spot_bid=None, no_create=False, iam_profile=None): """ Add new nodes to this cluster @@ -948,7 +969,8 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None, resp = self.create_nodes(aliases, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, - spot_bid=spot_bid) + spot_bid=spot_bid, + iam_profile=iam_profile) if spot_bid or self.spot_bid: self.ec2.wait_for_propagation(spot_requests=resp) else: diff --git a/starcluster/node.py b/starcluster/node.py index 9574535c3..53b60d4b7 100644 --- a/starcluster/node.py +++ b/starcluster/node.py @@ -186,6 +186,14 @@ def get_volumes(self): payload = volstxt.split('\n', 2)[2] return utils.decode_uncompress_load(payload) + def get_iam_profile(self): + if self.instance.instance_profile: + arn = self.instance.instance_profile['arn'] + match = re.match(r'arn:aws:iam::\d{12}:instance-profile/(\S+)', arn) + return match.group(1) + else: + return None + def _remove_all_tags(self): tags = self.tags.keys()[:] for t in tags: @@ -236,6 +244,10 @@ def memory(self): "free -m | grep -i mem | awk '{print $2}'")[0]) return self._memory + @property + def instance_profile(self): + return self.instance.instance_profile + @property def ip_address(self): return self.instance.ip_address diff --git a/starcluster/static.py b/starcluster/static.py index a607c5c9d..fddb249ed 100644 --- a/starcluster/static.py +++ b/starcluster/static.py @@ -255,4 +255,5 @@ def create_sc_config_dirs(): 'force_spot_master': (bool, False, False, None, None), 'disable_cloudinit': (bool, False, False, None, None), 'dns_prefix': (bool, False, False, None, None), + 'iam_profile': (str, False, None, None, None), }