Skip to content

Commit

Permalink
Merge branch 'profile-feature' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
John C. Earls committed Dec 8, 2013
2 parents bdf1475 + 7eaa376 commit e70ad7a
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 27 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
console_scripts = ['starcluster = starcluster.cli:main']
extra = dict(test_suite="starcluster.tests",
tests_require="nose",
install_requires=["paramiko>=1.11.0", "boto>=2.10.0",
install_requires=["paramiko>=1.11.0", "boto>=2.13.2",
"workerpool>=0.9.2", "Jinja2>=2.7",
"decorator>=3.4.0", "iptools>=0.6.1",
"optcomplete>=1.2-devel",
Expand Down
17 changes: 12 additions & 5 deletions starcluster/awsutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ def request_instances(self, image_id, price=None, instance_type='m1.small',
launch_group=None,
availability_zone_group=None, placement=None,
user_data=None, placement_group=None,
block_device_map=None, subnet_id=None):
block_device_map=None, subnet_id=None,
iam_profile=None):
"""
Convenience method for running spot or flat-rate instances
"""
Expand Down Expand Up @@ -483,6 +484,7 @@ def request_instances(self, image_id, price=None, instance_type='m1.small',
placement=placement,
placement_group=placement_group,
user_data=user_data,
iam_profile=iam_profile,
block_device_map=block_device_map)

if price:
Expand All @@ -504,7 +506,9 @@ def request_spot_instances(self, price, image_id, instance_type='m1.small',
availability_zone_group=None,
security_group_ids=None, subnet_id=None,
placement=None, placement_group=None,
user_data=None, block_device_map=None):
block_device_map=None,
iam_profile=None):

kwargs = locals()
kwargs.pop('self')
return self.conn.request_spot_instances(**kwargs)
Expand Down Expand Up @@ -567,7 +571,9 @@ def wait_for_propagation(self, instances=None, spot_requests=None,
def run_instances(self, image_id, instance_type='m1.small', min_count=1,
max_count=1, key_name=None, security_groups=None,
placement=None, user_data=None, placement_group=None,
block_device_map=None, subnet_id=None):
block_device_map=None, subnet_id=None,
iam_profile = None):

kwargs = dict(
instance_type=instance_type,
min_count=min_count,
Expand All @@ -576,8 +582,9 @@ def run_instances(self, image_id, instance_type='m1.small', min_count=1,
subnet_id=subnet_id,
placement=placement,
user_data=user_data,
placement_group=placement_group,
block_device_map=block_device_map
block_device_map=block_device_map,
instance_profile_name=iam_profile,
placement_group=placement_group
)
if subnet_id:
kwargs.update(
Expand Down
64 changes: 43 additions & 21 deletions starcluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def __repr__(self):
return "<ClusterManager: %s>" % self.ec2.region.name

def get_cluster(self, cluster_name, group=None, load_receipt=True,
load_plugins=True, load_volumes=True, require_keys=True):
load_plugins=True, load_volumes=True,
load_iam_profile=True, require_keys=True):
"""
Returns a Cluster object representing an active cluster
"""
Expand All @@ -64,7 +65,8 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True,
cluster_group=group)
if load_receipt:
cl.load_receipt(load_plugins=load_plugins,
load_volumes=load_volumes)
load_volumes=load_volumes,
load_iam_profile=load_iam_profile)
try:
cl.keyname = cl.keyname or cl.master_node.key_name
key_location = self.cfg.get_key(cl.keyname).get('key_location')
Expand All @@ -79,14 +81,15 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True,
except exception.SecurityGroupDoesNotExist:
raise exception.ClusterDoesNotExist(cluster_name)

def get_clusters(self, load_receipt=True, load_plugins=True):
def get_clusters(self, load_receipt=True, load_plugins=True, load_iam_profile=True):
"""
Returns a list of all active clusters
"""
cluster_groups = self.get_cluster_security_groups()
clusters = [self.get_cluster(g.name, group=g,
load_receipt=load_receipt,
load_plugins=load_plugins)
load_plugins=load_plugins,
load_iam_profile=load_iam_profile)
for g in cluster_groups]
return clusters

Expand Down Expand Up @@ -169,24 +172,30 @@ def _get_cluster_name(self, cluster_name):

def add_node(self, cluster_name, alias=None, no_create=False,
image_id=None, instance_type=None, zone=None,
placement_group=None, spot_bid=None):
placement_group=None, spot_bid=None,
iam_profile=None):
cl = self.get_cluster(cluster_name)
return cl.add_node(alias=alias, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group, spot_bid=spot_bid,
no_create=no_create)
no_create=no_create,
iam_profile=iam_profile)

def add_nodes(self, cluster_name, num_nodes, aliases=None, no_create=False,
image_id=None, instance_type=None, zone=None,
placement_group=None, spot_bid=None):
placement_group=None, spot_bid=None,
iam_profile=None):
"""
Add one or more nodes to cluster
"""
print __file__,191
print iam_profile
cl = self.get_cluster(cluster_name)
return cl.add_nodes(num_nodes, aliases=aliases, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group, spot_bid=spot_bid,
no_create=no_create)
no_create=no_create,
iam_profile=iam_profile)

def remove_node(self, cluster_name, alias, terminate=True, force=False):
"""
Expand Down Expand Up @@ -281,7 +290,8 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False):
tag = self.get_tag_from_sg(scg.name)
try:
cl = self.get_cluster(tag, group=scg, load_plugins=False,
load_volumes=False, require_keys=False)
load_volumes=False, require_keys=False,
load_iam_profile=True)
except exception.IncompatibleCluster as e:
sep = '*' * 60
log.error('\n'.join([sep, e.msg, sep]),
Expand All @@ -307,6 +317,8 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False):
print 'Uptime: %s' % uptime
print 'Zone: %s' % getattr(n, 'placement', 'N/A')
print 'Keypair: %s' % getattr(n, 'key_name', 'N/A')
ipn = cl.iam_profile if cl.iam_profile else 'N/A'
print 'IAM instance profile: %s' % ipn
ebs_vols = []
for node in nodes:
devices = node.attached_vols
Expand Down Expand Up @@ -369,7 +381,6 @@ def run_plugin(self, plugin_name, cluster_tag):


class Cluster(object):

def __init__(self,
ec2_conn=None,
spot_bid=None,
Expand Down Expand Up @@ -400,6 +411,7 @@ def __init__(self,
disable_cloudinit=False,
vpc_id=None,
subnet_id=None,
iam_profile=None,
**kwargs):
# validation
if vpc_id or subnet_id:
Expand Down Expand Up @@ -427,9 +439,11 @@ def __init__(self,
self.cluster_size = cluster_size or 0
self.volumes = self.load_volumes(volumes)
self.plugins = self.load_plugins(plugins)

self.userdata_scripts = userdata_scripts or []
self.dns_prefix = dns_prefix and cluster_tag



self._cluster_group = None
self._placement_group = None
self._zone = None
Expand Down Expand Up @@ -568,13 +582,13 @@ def __str__(self):
cfg = self.__getstate__()
return pprint.pformat(cfg)

def load_receipt(self, load_plugins=True, load_volumes=True):
def load_receipt(self, load_plugins=True, load_volumes=True, load_iam_profile=True):
"""
Load the original settings used to launch this cluster into this
Cluster object. Settings are loaded from cluster group tags and the
master node's user data.
"""
if not (load_plugins or load_volumes):
if not (load_plugins or load_volumes or load_iam_profile):
return True
try:
tags = self.cluster_group.tags
Expand Down Expand Up @@ -609,6 +623,8 @@ def load_receipt(self, load_plugins=True, load_volumes=True):
self.plugins = self.load_plugins(master.get_plugins())
if load_volumes:
self.volumes = master.get_volumes()
if load_iam_profile:
self.iam_profile = master.get_iam_profile()
except exception.PluginError:
log.error("An error occurred while loading plugins: ",
exc_info=True)
Expand Down Expand Up @@ -817,11 +833,12 @@ def get_spot_requests_or_raise(self):
return spots

def create_node(self, alias, image_id=None, instance_type=None, zone=None,
placement_group=None, spot_bid=None, force_flat=False):
placement_group=None, spot_bid=None, force_flat=False, iam_profile=None):
return self.create_nodes([alias], image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group,
spot_bid=spot_bid, force_flat=force_flat)[0]
spot_bid=spot_bid, force_flat=force_flat,
iam_profile=iam_profile)[0]

def _get_cluster_userdata(self, aliases):
alias_file = utils.string_to_file('\n'.join(['#ignored'] + aliases),
Expand All @@ -843,7 +860,7 @@ def _get_cluster_userdata(self, aliases):

def create_nodes(self, aliases, image_id=None, instance_type=None,
zone=None, placement_group=None, spot_bid=None,
force_flat=False):
force_flat=False, iam_profile=None):
"""
Convenience method for requesting instances with this cluster's
settings. All settings (kwargs) except force_flat default to cluster
Expand All @@ -868,6 +885,7 @@ def create_nodes(self, aliases, image_id=None, instance_type=None,
image_id = image_id or self.node_image_id
count = len(aliases) if not spot_bid else 1
user_data = self._get_cluster_userdata(aliases)
iam_profile = iam_profile or self.iam_profile
kwargs = dict(price=spot_bid, instance_type=instance_type,
min_count=count, max_count=count, count=count,
key_name=self.keyname, security_groups=[cluster_sg],
Expand All @@ -876,7 +894,9 @@ def create_nodes(self, aliases, image_id=None, instance_type=None,
placement=zone or getattr(self.zone, 'name', None),
user_data=user_data,
placement_group=placement_group,
subnet_id=self.subnet_id)
subnet_id=self.subnet_id,
iam_profile=iam_profile
)
resvs = []
if spot_bid:
security_group_id = self.cluster_group.id
Expand Down Expand Up @@ -907,7 +927,7 @@ def _get_next_node_num(self):

def add_node(self, alias=None, no_create=False, image_id=None,
instance_type=None, zone=None, placement_group=None,
spot_bid=None):
spot_bid=None, iam_profile=None):
"""
Add a single node to this cluster
"""
Expand All @@ -917,11 +937,12 @@ def add_node(self, alias=None, no_create=False, image_id=None,
return self.add_nodes(1, aliases=aliases, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group,
spot_bid=spot_bid, no_create=no_create)
spot_bid=spot_bid, no_create=no_create,
iam_profile=iam_profile)

def add_nodes(self, num_nodes, aliases=None, image_id=None,
instance_type=None, zone=None, placement_group=None,
spot_bid=None, no_create=False):
spot_bid=None, no_create=False, iam_profile=None):
"""
Add new nodes to this cluster
Expand All @@ -948,7 +969,8 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None,
resp = self.create_nodes(aliases, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group,
spot_bid=spot_bid)
spot_bid=spot_bid,
iam_profile=iam_profile)
if spot_bid or self.spot_bid:
self.ec2.wait_for_propagation(spot_requests=resp)
else:
Expand Down
12 changes: 12 additions & 0 deletions starcluster/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ def get_volumes(self):
payload = volstxt.split('\n', 2)[2]
return utils.decode_uncompress_load(payload)

def get_iam_profile(self):
if self.instance.instance_profile:
arn = self.instance.instance_profile['arn']
match = re.match(r'arn:aws:iam::\d{12}:instance-profile/(\S+)', arn)
return match.group(1)
else:
return None

def _remove_all_tags(self):
tags = self.tags.keys()[:]
for t in tags:
Expand Down Expand Up @@ -236,6 +244,10 @@ def memory(self):
"free -m | grep -i mem | awk '{print $2}'")[0])
return self._memory

@property
def instance_profile(self):
return self.instance.instance_profile

@property
def ip_address(self):
return self.instance.ip_address
Expand Down
1 change: 1 addition & 0 deletions starcluster/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,5 @@ def create_sc_config_dirs():
'force_spot_master': (bool, False, False, None, None),
'disable_cloudinit': (bool, False, False, None, None),
'dns_prefix': (bool, False, False, None, None),
'iam_profile': (str, False, None, None, None),
}

0 comments on commit e70ad7a

Please sign in to comment.