comparison env/lib/python3.7/site-packages/bioblend/cloudman/launch.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
comparison
equal deleted inserted replaced
4:79f47841a781 5:9b1c78e6ba9c
1 """
2 Setup and launch a CloudMan instance.
3 """
4 import datetime
5 import socket
6
7 import boto
8 import six
9 import yaml
10 from boto.compat import http_client
11 from boto.ec2.regioninfo import RegionInfo
12 from boto.exception import EC2ResponseError, S3ResponseError
13 from boto.s3.connection import OrdinaryCallingFormat, S3Connection, SubdomainCallingFormat
14 from six.moves.http_client import HTTPConnection
15 from six.moves.urllib.parse import urlparse
16
17 import bioblend
18 from bioblend.util import Bunch
19
20
21 # Uncomment the following line if no logging from boto is desired
22 # bioblend.logging.getLogger('boto').setLevel(bioblend.logging.CRITICAL)
23 # Uncomment the following line if logging at the prompt is desired
24 # bioblend.set_stream_logger(__name__)
25 def instance_types(cloud_name='generic'):
26 """
27 Return a list of dictionaries containing details about the available
28 instance types for the given `cloud_name`.
29
30 :type cloud_name: str
31 :param cloud_name: A name of the cloud for which the list of instance
32 types will be returned. Valid values are: `aws`,
33 `nectar`, `generic`.
34
35 :rtype: list
36 :return: A list of dictionaries describing instance types. Each dict will
37 contain the following keys: `name`, `model`, and `description`.
38 """
39 instance_list = []
40 if cloud_name.lower() == 'aws':
41 instance_list.append({"model": "c3.large",
42 "name": "Compute optimized Large",
43 "description": "2 vCPU/4GB RAM"})
44 instance_list.append({"model": "c3.2xlarge",
45 "name": "Compute optimized 2xLarge",
46 "description": "8 vCPU/15GB RAM"})
47 instance_list.append({"model": "c3.8xlarge",
48 "name": "Compute optimized 8xLarge",
49 "description": "32 vCPU/60GB RAM"})
50 elif cloud_name.lower() in ['nectar', 'generic']:
51 instance_list.append({"model": "m1.small",
52 "name": "Small",
53 "description": "1 vCPU / 4GB RAM"})
54 instance_list.append({"model": "m1.medium",
55 "name": "Medium",
56 "description": "2 vCPU / 8GB RAM"})
57 instance_list.append({"model": "m1.large",
58 "name": "Large",
59 "description": "4 vCPU / 16GB RAM"})
60 instance_list.append({"model": "m1.xlarge",
61 "name": "Extra Large",
62 "description": "8 vCPU / 32GB RAM"})
63 instance_list.append({"model": "m1.xxlarge",
64 "name": "Extra-extra Large",
65 "description": "16 vCPU / 64GB RAM"})
66 return instance_list
67
68
69 class CloudManLauncher(object):
70
71 def __init__(self, access_key, secret_key, cloud=None):
72 """
73 Define the environment in which this instance of CloudMan will be launched.
74
75 Besides providing the credentials, optionally provide the ``cloud``
76 object. This object must define the properties required to establish a
77 `boto <https://github.com/boto/boto/>`_ connection to that cloud. See
78 this method's implementation for an example of the required fields.
79 Note that as long the as provided object defines the required fields,
80 it can really by implemented as anything (e.g., a Bunch, a database
81 object, a custom class). If no value for the ``cloud`` argument is
82 provided, the default is to use the Amazon cloud.
83 """
84 self.access_key = access_key
85 self.secret_key = secret_key
86 if cloud is None:
87 # Default to an EC2-compatible object
88 self.cloud = Bunch(id='1', # for compatibility w/ DB representation
89 name="Amazon",
90 cloud_type="ec2",
91 bucket_default="cloudman",
92 region_name="us-east-1",
93 region_endpoint="ec2.amazonaws.com",
94 ec2_port="",
95 ec2_conn_path="/",
96 cidr_range="",
97 is_secure=True,
98 s3_host="s3.amazonaws.com",
99 s3_port="",
100 s3_conn_path='/')
101 else:
102 self.cloud = cloud
103 self.ec2_conn = self.connect_ec2(
104 self.access_key,
105 self.secret_key,
106 self.cloud)
107 self.vpc_conn = self.connect_vpc(
108 self.access_key,
109 self.secret_key,
110 self.cloud)
111 # Define exceptions from http_client that we want to catch and retry
112 self.http_exceptions = (http_client.HTTPException, socket.error,
113 socket.gaierror, http_client.BadStatusLine)
114
115 def __repr__(self):
116 return "Cloud: {0}; acct ID: {1}".format(
117 self.cloud.name, self.access_key)
118
119 def launch(self, cluster_name, image_id, instance_type, password,
120 kernel_id=None, ramdisk_id=None, key_name='cloudman_key_pair',
121 security_groups=['CloudMan'], placement='', subnet_id=None,
122 ebs_optimized=False, **kwargs):
123 """
124 Check all the prerequisites (key pair and security groups) for
125 launching a CloudMan instance, compose the user data based on the
126 parameters specified in the arguments and the cloud properties as
127 defined in the object's ``cloud`` field.
128
129 For the current list of user data fields that can be provided via
130 ``kwargs``, see `<https://galaxyproject.org/cloudman/userdata/>`_
131
132 Return a dict containing the properties and info with which an instance
133 was launched, namely: ``sg_names`` containing the names of the security
134 groups, ``kp_name`` containing the name of the key pair, ``kp_material``
135 containing the private portion of the key pair (*note* that this portion
136 of the key is available and can be retrieved *only* at the time the key
137 is created, which will happen only if no key with the name provided in
138 the ``key_name`` argument exists), ``rs`` containing the
139 `boto <https://github.com/boto/boto/>`_ ``ResultSet`` object,
140 ``instance_id`` containing the ID of a started instance, and
141 ``error`` containing an error message if there was one.
142 """
143 ret = {'sg_names': [],
144 'sg_ids': [],
145 'kp_name': '',
146 'kp_material': '',
147 'rs': None,
148 'instance_id': '',
149 'error': None}
150 # First satisfy the prerequisites
151 for sg in security_groups:
152 # Get VPC ID in case we're launching into a VPC
153 vpc_id = None
154 if subnet_id:
155 try:
156 sn = self.vpc_conn.get_all_subnets(subnet_id)[0]
157 vpc_id = sn.vpc_id
158 except (EC2ResponseError, IndexError) as e:
159 bioblend.log.exception("Trouble fetching subnet %s: %s" %
160 (subnet_id, e))
161 cmsg = self.create_cm_security_group(sg, vpc_id=vpc_id)
162 ret['error'] = cmsg['error']
163 if ret['error']:
164 return ret
165 if cmsg['name']:
166 ret['sg_names'].append(cmsg['name'])
167 ret['sg_ids'].append(cmsg['sg_id'])
168 if subnet_id:
169 # Must setup a network interface if launching into VPC
170 security_groups = None
171 interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(
172 subnet_id=subnet_id, groups=[cmsg['sg_id']],
173 associate_public_ip_address=True)
174 network_interfaces = (boto.ec2.networkinterface.
175 NetworkInterfaceCollection(interface))
176 else:
177 network_interfaces = None
178 kp_info = self.create_key_pair(key_name)
179 ret['kp_name'] = kp_info['name']
180 ret['kp_material'] = kp_info['material']
181 ret['error'] = kp_info['error']
182 if ret['error']:
183 return ret
184 # If not provided, try to find a placement
185 # TODO: Should placement always be checked? To make sure it's correct
186 # for existing clusters.
187 if not placement:
188 placement = self._find_placement(
189 cluster_name).get('placement', None)
190 # Compose user data for launching an instance, ensuring we have the
191 # required fields
192 kwargs['access_key'] = self.access_key
193 kwargs['secret_key'] = self.secret_key
194 kwargs['cluster_name'] = cluster_name
195 kwargs['password'] = password
196 kwargs['cloud_name'] = self.cloud.name
197 ud = self._compose_user_data(kwargs)
198 # Now launch an instance
199 try:
200
201 rs = None
202 rs = self.ec2_conn.run_instances(image_id=image_id,
203 instance_type=instance_type,
204 key_name=key_name,
205 security_groups=security_groups,
206 # The following two arguments are
207 # provided in the network_interface
208 # instead of arguments:
209 # security_group_ids=security_group_ids,
210 # subnet_id=subnet_id,
211 network_interfaces=network_interfaces,
212 user_data=ud,
213 kernel_id=kernel_id,
214 ramdisk_id=ramdisk_id,
215 placement=placement,
216 ebs_optimized=ebs_optimized)
217 ret['rs'] = rs
218 except EC2ResponseError as e:
219 err_msg = "Problem launching an instance: {0} (code {1}; status {2})" \
220 .format(e.message, e.error_code, e.status)
221 bioblend.log.exception(err_msg)
222 ret['error'] = err_msg
223 return ret
224 else:
225 if rs:
226 try:
227 bioblend.log.info(
228 "Launched an instance with ID %s" %
229 rs.instances[0].id)
230 ret['instance_id'] = rs.instances[0].id
231 ret['instance_ip'] = rs.instances[0].ip_address
232 except EC2ResponseError as e:
233 err_msg = "Problem with the launched instance object: {0} " \
234 "(code {1}; status {2})" \
235 .format(e.message, e.error_code, e.status)
236 bioblend.log.exception(err_msg)
237 ret['error'] = err_msg
238 else:
239 ret['error'] = ("No response after launching an instance. Check "
240 "your account permissions and try again.")
241 return ret
242
243 def create_cm_security_group(self, sg_name='CloudMan', vpc_id=None):
244 """
245 Create a security group with all authorizations required to run CloudMan.
246
247 If the group already exists, check its rules and add the missing ones.
248
249 :type sg_name: str
250 :param sg_name: A name for the security group to be created.
251
252 :type vpc_id: str
253 :param vpc_id: VPC ID under which to create the security group.
254
255 :rtype: dict
256 :return: A dictionary containing keys ``name`` (with the value being the
257 name of the security group that was created), ``error``
258 (with the value being the error message if there was an error
259 or ``None`` if no error was encountered), and ``ports``
260 (containing the list of tuples with port ranges that were
261 opened or attempted to be opened).
262
263 .. versionchanged:: 0.6.1
264 The return value changed from a string to a dict
265 """
266 ports = (('20', '21'), # FTP
267 ('22', '22'), # SSH
268 ('80', '80'), # Web UI
269 ('443', '443'), # SSL Web UI
270 ('8800', '8800'), # NodeJS Proxy for Galaxy IPython IE
271 ('9600', '9700'), # HTCondor
272 ('30000', '30100')) # FTP transfer
273 progress = {'name': None,
274 'sg_id': None,
275 'error': None,
276 'ports': ports}
277 cmsg = None
278 filters = None
279 if vpc_id:
280 filters = {'vpc-id': vpc_id}
281 # Check if this security group already exists
282 try:
283 sgs = self.ec2_conn.get_all_security_groups(filters=filters)
284 except EC2ResponseError as e:
285 err_msg = ("Problem getting security groups. This could indicate a "
286 "problem with your account credentials or permissions: "
287 "{0} (code {1}; status {2})"
288 .format(e.message, e.error_code, e.status))
289 bioblend.log.exception(err_msg)
290 progress['error'] = err_msg
291 return progress
292 for sg in sgs:
293 if sg.name == sg_name:
294 cmsg = sg
295 bioblend.log.debug("Security group '%s' already exists; will "
296 "add authorizations next." % sg_name)
297 break
298 # If it does not exist, create security group
299 if cmsg is None:
300 bioblend.log.debug("Creating Security Group %s" % sg_name)
301 try:
302 cmsg = self.ec2_conn.create_security_group(sg_name, 'A security '
303 'group for CloudMan',
304 vpc_id=vpc_id)
305 except EC2ResponseError as e:
306 err_msg = "Problem creating security group '{0}': {1} (code {2}; " \
307 "status {3})" \
308 .format(sg_name, e.message, e.error_code, e.status)
309 bioblend.log.exception(err_msg)
310 progress['error'] = err_msg
311 if cmsg:
312 progress['name'] = cmsg.name
313 progress['sg_id'] = cmsg.id
314 # Add appropriate authorization rules
315 # If these rules already exist, nothing will be changed in the SG
316 for port in ports:
317 try:
318 if not self.rule_exists(
319 cmsg.rules, from_port=port[0], to_port=port[1]):
320 cmsg.authorize(
321 ip_protocol='tcp',
322 from_port=port[0],
323 to_port=port[1],
324 cidr_ip='0.0.0.0/0')
325 else:
326 bioblend.log.debug(
327 "Rule (%s:%s) already exists in the SG" %
328 (port[0], port[1]))
329 except EC2ResponseError as e:
330 err_msg = "A problem adding security group authorizations: {0} " \
331 "(code {1}; status {2})" \
332 .format(e.message, e.error_code, e.status)
333 bioblend.log.exception(err_msg)
334 progress['error'] = err_msg
335 # Add ICMP (i.e., ping) rule required by HTCondor
336 try:
337 if not self.rule_exists(
338 cmsg.rules, from_port='-1', to_port='-1', ip_protocol='icmp'):
339 cmsg.authorize(
340 ip_protocol='icmp',
341 from_port=-1,
342 to_port=-1,
343 cidr_ip='0.0.0.0/0')
344 else:
345 bioblend.log.debug(
346 "ICMP rule already exists in {0} SG.".format(sg_name))
347 except EC2ResponseError as e:
348 err_msg = "A problem with security ICMP rule authorization: {0} " \
349 "(code {1}; status {2})" \
350 .format(e.message, e.error_code, e.status)
351 bioblend.log.exception(err_msg)
352 progress['err_msg'] = err_msg
353 # Add rule that allows communication between instances in the same
354 # SG
355 # A flag to indicate if group rule already exists
356 g_rule_exists = False
357 for rule in cmsg.rules:
358 for grant in rule.grants:
359 if grant.name == cmsg.name:
360 g_rule_exists = True
361 bioblend.log.debug(
362 "Group rule already exists in the SG.")
363 if g_rule_exists:
364 break
365 if not g_rule_exists:
366 try:
367 cmsg.authorize(
368 src_group=cmsg,
369 ip_protocol='tcp',
370 from_port=0,
371 to_port=65535)
372 except EC2ResponseError as e:
373 err_msg = "A problem with security group group " \
374 "authorization: {0} (code {1}; status {2})" \
375 .format(e.message, e.error_code, e.status)
376 bioblend.log.exception(err_msg)
377 progress['err_msg'] = err_msg
378 bioblend.log.info(
379 "Done configuring '%s' security group" %
380 cmsg.name)
381 else:
382 bioblend.log.warning(
383 "Did not create security group '{0}'".format(sg_name))
384 return progress
385
386 def rule_exists(
387 self, rules, from_port, to_port, ip_protocol='tcp', cidr_ip='0.0.0.0/0'):
388 """
389 A convenience method to check if an authorization rule in a security group
390 already exists.
391 """
392 for rule in rules:
393 if rule.ip_protocol == ip_protocol and rule.from_port == from_port and \
394 rule.to_port == to_port and cidr_ip in [ip.cidr_ip for ip in rule.grants]:
395 return True
396 return False
397
398 def create_key_pair(self, key_name='cloudman_key_pair'):
399 """
400 If a key pair with the provided ``key_name`` does not exist, create it.
401
402 :type sg_name: str
403 :param sg_name: A name for the key pair to be created.
404
405 :rtype: dict
406 :return: A dictionary containing keys ``name`` (with the value being the
407 name of the key pair that was created), ``error``
408 (with the value being the error message if there was an error
409 or ``None`` if no error was encountered), and ``material``
410 (containing the unencrypted PEM encoded RSA private key if the
411 key was created or ``None`` if the key already eixsted).
412
413 .. versionchanged:: 0.6.1
414 The return value changed from a tuple to a dict
415 """
416 progress = {'name': None,
417 'material': None,
418 'error': None}
419 kp = None
420 # Check if a key pair under the given name already exists. If it does not,
421 # create it, else return.
422 try:
423 kps = self.ec2_conn.get_all_key_pairs()
424 except EC2ResponseError as e:
425 err_msg = "Problem getting key pairs: {0} (code {1}; status {2})" \
426 .format(e.message, e.error_code, e.status)
427 bioblend.log.exception(err_msg)
428 progress['error'] = err_msg
429 return progress
430 for akp in kps:
431 if akp.name == key_name:
432 bioblend.log.info(
433 "Key pair '%s' already exists; reusing it." %
434 key_name)
435 progress['name'] = akp.name
436 return progress
437 try:
438 kp = self.ec2_conn.create_key_pair(key_name)
439 except EC2ResponseError as e:
440 err_msg = "Problem creating key pair '{0}': {1} (code {2}; status {3})" \
441 .format(key_name, e.message, e.error_code, e.status)
442 bioblend.log.exception(err_msg)
443 progress['error'] = err_msg
444 return progress
445 bioblend.log.info("Created key pair '%s'" % kp.name)
446 progress['name'] = kp.name
447 progress['material'] = kp.material
448 return progress
449
450 def assign_floating_ip(self, ec2_conn, instance):
451 try:
452 bioblend.log.debug("Allocating a new floating IP address.")
453 address = ec2_conn.allocate_address()
454 except EC2ResponseError as e:
455 bioblend.log.exception("Exception allocating a new floating IP "
456 "address: %s" % e)
457 bioblend.log.info("Associating floating IP %s to instance %s" %
458 (address.public_ip, instance.id))
459 ec2_conn.associate_address(instance_id=instance.id,
460 public_ip=address.public_ip)
461
462 def get_status(self, instance_id):
463 """
464 Check on the status of an instance. ``instance_id`` needs to be a
465 ``boto``-library copatible instance ID (e.g., ``i-8fehrdss``).If
466 ``instance_id`` is not provided, the ID obtained when launching
467 *the most recent* instance is used. Note that this assumes the instance
468 being checked on was launched using this class. Also note that the same
469 class may be used to launch multiple instances but only the most recent
470 ``instance_id`` is kept while any others will to be explicitly specified.
471
472 This method also allows the required ``ec2_conn`` connection object to be
473 provided at invocation time. If the object is not provided, credentials
474 defined for the class are used (ability to specify a custom ``ec2_conn``
475 helps in case of stateless method invocations).
476
477 Return a ``state`` dict containing the following keys: ``instance_state``,
478 ``public_ip``, ``placement``, and ``error``, which capture CloudMan's
479 current state. For ``instance_state``, expected values are: ``pending``,
480 ``booting``, ``running``, or ``error`` and represent the state of the
481 underlying instance. Other keys will return an empty value until the
482 ``instance_state`` enters ``running`` state.
483 """
484 ec2_conn = self.ec2_conn
485 rs = None
486 state = {'instance_state': "",
487 'public_ip': "",
488 'placement': "",
489 'error': ""}
490
491 # Make sure we have an instance ID
492 if instance_id is None:
493 err = "Missing instance ID, cannot check the state."
494 bioblend.log.error(err)
495 state['error'] = err
496 return state
497 try:
498 rs = ec2_conn.get_all_instances([instance_id])
499 if rs is not None:
500 inst_state = rs[0].instances[0].update()
501 public_ip = rs[0].instances[0].ip_address
502 state['public_ip'] = public_ip
503 if inst_state == 'running':
504 # if there's a private ip, but no public ip
505 # attempt auto allocation of floating IP
506 if rs[0].instances[0].private_ip_address and not public_ip:
507 self.assign_floating_ip(ec2_conn, rs[0].instances[0])
508 cm_url = "http://{dns}/cloud".format(dns=public_ip)
509 # Wait until the CloudMan URL is accessible to return the
510 # data
511 if self._checkURL(cm_url) is True:
512 state['instance_state'] = inst_state
513 state['placement'] = rs[0].instances[0].placement
514 else:
515 state['instance_state'] = 'booting'
516 else:
517 state['instance_state'] = inst_state
518 except Exception as e:
519 err = "Problem updating instance '%s' state: %s" % (instance_id, e)
520 bioblend.log.error(err)
521 state['error'] = err
522 return state
523
524 def get_clusters_pd(self, include_placement=True):
525 """
526 Return *persistent data* of all existing clusters for this account.
527
528 :type include_placement: bool
529 :param include_placement: Whether or not to include region placement for
530 the clusters. Setting this option will lead
531 to a longer function runtime.
532
533 :rtype: dict
534 :return: A dictionary containing keys ``clusters`` and ``error``. The
535 value of ``clusters`` will be a dictionary with the following keys
536 ``cluster_name``, ``persistent_data``, ``bucket_name`` and optionally
537 ``placement`` or an empty list if no clusters were found or an
538 error was encountered. ``persistent_data`` key value is yet
539 another dictionary containing given cluster's persistent data.
540 The value for the ``error`` key will contain a string with the
541 error message.
542
543 .. versionadded:: 0.3
544 .. versionchanged:: 0.7.0
545 The return value changed from a list to a dictionary.
546 """
547 clusters = []
548 response = {'clusters': clusters, 'error': None}
549 s3_conn = self.connect_s3(self.access_key, self.secret_key, self.cloud)
550 try:
551 buckets = s3_conn.get_all_buckets()
552 except S3ResponseError as e:
553 response['error'] = "S3ResponseError getting buckets: %s" % e
554 except self.http_exceptions as ex:
555 response['error'] = "Exception getting buckets: %s" % ex
556 if response['error']:
557 bioblend.log.exception(response['error'])
558 return response
559 for bucket in [b for b in buckets if b.name.startswith('cm-')]:
560 try:
561 # TODO: first lookup if persistent_data.yaml key exists
562 pd = bucket.get_key('persistent_data.yaml')
563 except S3ResponseError:
564 # This can fail for a number of reasons for non-us and/or
565 # CNAME'd buckets but it is not a terminal error
566 bioblend.log.warning("Problem fetching persistent_data.yaml "
567 "from bucket %s" % bucket)
568 continue
569 if pd:
570 # We are dealing with a CloudMan bucket
571 pd_contents = pd.get_contents_as_string()
572 pd = yaml.load(pd_contents)
573 if 'cluster_name' in pd:
574 cluster_name = pd['cluster_name']
575 else:
576 for key in bucket.list():
577 if key.name.endswith('.clusterName'):
578 cluster_name = key.name.split('.clusterName')[0]
579 cluster = {'cluster_name': cluster_name,
580 'persistent_data': pd,
581 'bucket_name': bucket.name}
582 # Look for cluster's placement too
583 if include_placement:
584 placement = self._find_placement(cluster_name, cluster)
585 cluster['placement'] = placement
586 clusters.append(cluster)
587 response['clusters'] = clusters
588 return response
589
590 def get_cluster_pd(self, cluster_name):
591 """
592 Return *persistent data* (as a dict) associated with a cluster with the
593 given ``cluster_name``. If a cluster with the given name is not found,
594 return an empty dict.
595
596 .. versionadded:: 0.3
597 """
598 cluster = {}
599 clusters = self.get_clusters_pd().get('clusters', [])
600 for c in clusters:
601 if c['cluster_name'] == cluster_name:
602 cluster = c
603 break
604 return cluster
605
606 def connect_ec2(self, a_key, s_key, cloud=None):
607 """
608 Create and return an EC2-compatible connection object for the given cloud.
609
610 See ``_get_cloud_info`` method for more details on the requirements for
611 the ``cloud`` parameter. If no value is provided, the class field is used.
612 """
613 if cloud is None:
614 cloud = self.cloud
615 ci = self._get_cloud_info(cloud)
616 r = RegionInfo(name=ci['region_name'], endpoint=ci['region_endpoint'])
617 ec2_conn = boto.connect_ec2(aws_access_key_id=a_key,
618 aws_secret_access_key=s_key,
619 is_secure=ci['is_secure'],
620 region=r,
621 port=ci['ec2_port'],
622 path=ci['ec2_conn_path'],
623 validate_certs=False)
624 return ec2_conn
625
626 def connect_s3(self, a_key, s_key, cloud=None):
627 """
628 Create and return an S3-compatible connection object for the given cloud.
629
630 See ``_get_cloud_info`` method for more details on the requirements for
631 the ``cloud`` parameter. If no value is provided, the class field is used.
632 """
633 if cloud is None:
634 cloud = self.cloud
635 ci = self._get_cloud_info(cloud)
636 if ci['cloud_type'] == 'amazon':
637 calling_format = SubdomainCallingFormat()
638 else:
639 calling_format = OrdinaryCallingFormat()
640 s3_conn = S3Connection(
641 aws_access_key_id=a_key, aws_secret_access_key=s_key,
642 is_secure=ci['is_secure'], port=ci['s3_port'], host=ci['s3_host'],
643 path=ci['s3_conn_path'], calling_format=calling_format)
644 return s3_conn
645
646 def connect_vpc(self, a_key, s_key, cloud=None):
647 """
648 Establish a connection to the VPC service.
649
650 TODO: Make this work with non-default clouds as well.
651 """
652 if cloud is None:
653 cloud = self.cloud
654 ci = self._get_cloud_info(cloud)
655 r = RegionInfo(name=ci['region_name'], endpoint=ci['region_endpoint'])
656 vpc_conn = boto.connect_vpc(
657 aws_access_key_id=a_key,
658 aws_secret_access_key=s_key,
659 is_secure=ci['is_secure'],
660 region=r,
661 port=ci['ec2_port'],
662 path=ci['ec2_conn_path'],
663 validate_certs=False)
664 return vpc_conn
665
666 def _compose_user_data(self, user_provided_data):
667 """
668 A convenience method used to compose and properly format the user data
669 required when requesting an instance.
670
671 ``user_provided_data`` is the data provided by a user required to identify
672 a cluster and user other user requirements.
673 """
674 form_data = {}
675 # Do not include the following fields in the user data but do include
676 # any 'advanced startup fields' that might be added in the future
677 excluded_fields = ['sg_name', 'image_id', 'instance_id', 'kp_name',
678 'cloud', 'cloud_type', 'public_dns', 'cidr_range',
679 'kp_material', 'placement', 'flavor_id']
680 for key, value in six.iteritems(user_provided_data):
681 if key not in excluded_fields:
682 form_data[key] = value
683 # If the following user data keys are empty, do not include them in the
684 # request user data
685 udkeys = [
686 'post_start_script_url',
687 'worker_post_start_script_url',
688 'bucket_default',
689 'share_string']
690 for udkey in udkeys:
691 if udkey in form_data and form_data[udkey] == '':
692 del form_data[udkey]
693 # If bucket_default was not provided, add a default value to the user data
694 # (missing value does not play nicely with CloudMan's ec2autorun.py)
695 if not form_data.get(
696 'bucket_default', None) and self.cloud.bucket_default:
697 form_data['bucket_default'] = self.cloud.bucket_default
698 # Reuse the ``password`` for the ``freenxpass`` user data option
699 if 'freenxpass' not in form_data and 'password' in form_data:
700 form_data['freenxpass'] = form_data['password']
701 # Convert form_data into the YAML format
702 ud = yaml.dump(form_data, default_flow_style=False, allow_unicode=False)
703 # Also include connection info about the selected cloud
704 ci = self._get_cloud_info(self.cloud, as_str=True)
705 return ud + "\n" + ci
706
707 def _get_cloud_info(self, cloud, as_str=False):
708 """
709 Get connection information about a given cloud
710 """
711 ci = {}
712 ci['cloud_type'] = cloud.cloud_type
713 ci['region_name'] = cloud.region_name
714 ci['region_endpoint'] = cloud.region_endpoint
715 ci['is_secure'] = cloud.is_secure
716 ci['ec2_port'] = cloud.ec2_port if cloud.ec2_port != '' else None
717 ci['ec2_conn_path'] = cloud.ec2_conn_path
718 # Include cidr_range only if not empty
719 if cloud.cidr_range != '':
720 ci['cidr_range'] = cloud.cidr_range
721 ci['s3_host'] = cloud.s3_host
722 ci['s3_port'] = cloud.s3_port if cloud.s3_port != '' else None
723 ci['s3_conn_path'] = cloud.s3_conn_path
724 if as_str:
725 ci = yaml.dump(ci, default_flow_style=False, allow_unicode=False)
726 return ci
727
728 def _get_volume_placement(self, vol_id):
729 """
730 Returns the placement of a volume (or None, if it cannot be determined)
731 """
732 try:
733 vol = self.ec2_conn.get_all_volumes(volume_ids=[vol_id])
734 except EC2ResponseError as ec2e:
735 bioblend.log.error("EC2ResponseError querying for volume {0}: {1}"
736 .format(vol_id, ec2e))
737 vol = None
738 if vol:
739 return vol[0].zone
740 else:
741 bioblend.log.error(
742 "Requested placement of a volume '%s' that does not exist." %
743 vol_id)
744 return None
745
746 def _find_placement(self, cluster_name, cluster=None):
747 """
748 Find a placement zone for a cluster with the name ``cluster_name``.
749
750 By default, this method will search for and fetch given cluster's
751 *persistent data*; alternatively, *persistent data* can be provided via
752 the ``cluster`` parameter. This dict needs to have ``persistent_data``
753 key with the contents of cluster's *persistent data*.
754 If the cluster or the volume associated with the cluster cannot be found,
755 cluster placement is set to ``None``.
756
757 :rtype: dict
758 :return: A dictionary with ``placement`` and ``error`` keywords.
759
760 .. versionchanged:: 0.7.0
761 The return value changed from a list to a dictionary.
762 """
763 placement = None
764 response = {'placement': placement, 'error': None}
765 cluster = cluster or self.get_cluster_pd(cluster_name)
766 if cluster and 'persistent_data' in cluster:
767 pd = cluster['persistent_data']
768 try:
769 if 'placement' in pd:
770 response['placement'] = pd['placement']
771 elif 'data_filesystems' in pd:
772 # We have v1 format persistent data so get the volume first and
773 # then the placement zone
774 vol_id = pd['data_filesystems']['galaxyData'][0]['vol_id']
775 response['placement'] = self._get_volume_placement(vol_id)
776 elif 'filesystems' in pd:
777 # V2 format.
778 for fs in [fs for fs in pd['filesystems'] if fs.get(
779 'kind', None) == 'volume' and 'ids' in fs]:
780 # All volumes must be in the same zone
781 vol_id = fs['ids'][0]
782 response['placement'] = self._get_volume_placement(
783 vol_id)
784 # No need to continue to iterate through
785 # filesystems, if we found one with a volume.
786 break
787 except Exception as exc:
788 response['error'] = ("Exception while finding placement for "
789 "cluster '{0}'. This can indicate malformed "
790 "instance data. Or that this method is "
791 "broken: {1}".format(cluster_name, exc))
792 bioblend.log.error(response['error'])
793 response['placement'] = None
794 else:
795 bioblend.log.debug("Insufficient info about cluster {0} to get placement."
796 .format(cluster_name))
797 return response
798
799 def find_placements(
800 self, ec2_conn, instance_type, cloud_type, cluster_name=None):
801 """
802 Find a list of placement zones that support the specified instance type.
803
804 If ``cluster_name`` is given and a cluster with the given name exist,
805 return a list with only one entry where the given cluster lives.
806
807 Searching for available zones for a given instance type is done by
808 checking the spot prices in the potential availability zones for
809 support before deciding on a region:
810 http://blog.piefox.com/2011/07/ec2-availability-zones-and-instance.html
811
812 Note that, currently, instance-type based zone selection applies only to
813 AWS. For other clouds, all the available zones are returned (unless a
814 cluster is being recreated, in which case the cluster's placement zone is
815 returned sa stored in its persistent data.
816
817 :rtype: dict
818 :return: A dictionary with ``zones`` and ``error`` keywords.
819
820 .. versionchanged:: 0.3
821 Changed method name from ``_find_placements`` to ``find_placements``.
822 Also added ``cluster_name`` parameter.
823
824 .. versionchanged:: 0.7.0
825 The return value changed from a list to a dictionary.
826 """
827 # First look for a specific zone a given cluster is bound to
828 zones = []
829 response = {'zones': zones, 'error': None}
830 if cluster_name:
831 placement = self._find_placement(cluster_name)
832 if placement.get('error'):
833 response['error'] = placement['error']
834 return response
835 response['zones'] = placement.get('placement', [])
836 # If placement is not found, look for a list of available zones
837 if not response['zones']:
838 in_the_past = datetime.datetime.now() - datetime.timedelta(hours=1)
839 back_compatible_zone = "us-east-1e"
840 for zone in [
841 z for z in ec2_conn.get_all_zones() if z.state == 'available']:
842 # Non EC2 clouds may not support get_spot_price_history
843 if instance_type is None or cloud_type != 'ec2':
844 zones.append(zone.name)
845 elif ec2_conn.get_spot_price_history(instance_type=instance_type,
846 end_time=in_the_past.isoformat(),
847 availability_zone=zone.name):
848 zones.append(zone.name)
849 # Higher-lettered zones seem to have more availability currently
850 zones.sort(reverse=True)
851 if back_compatible_zone in zones:
852 zones = [back_compatible_zone] + \
853 [z for z in zones if z != back_compatible_zone]
854 if len(zones) == 0:
855 response['error'] = ("Did not find availabilty zone for {0}"
856 .format(instance_type))
857 bioblend.log.error(response['error'])
858 zones.append(back_compatible_zone)
859 return response
860
861 def _checkURL(self, url):
862 """
863 Check if the ``url`` is *alive* (i.e., remote server returns code 200(OK)
864 or 401 (unauthorized)).
865 """
866 try:
867 p = urlparse(url)
868 h = HTTPConnection(p[1])
869 h.putrequest('HEAD', p[2])
870 h.endheaders()
871 r = h.getresponse()
872 # CloudMan UI is pwd protected so include 401
873 if r.status in (200, 401):
874 return True
875 except Exception:
876 # No response or no good response
877 pass
878 return False