Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/bioblend/cloudman/launch.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 """ | |
2 Setup and launch a CloudMan instance. | |
3 """ | |
4 import datetime | |
5 import socket | |
6 from http.client import HTTPConnection | |
7 from urllib.parse import urlparse | |
8 | |
9 import boto | |
10 import yaml | |
11 from boto.compat import http_client | |
12 from boto.ec2.regioninfo import RegionInfo | |
13 from boto.exception import EC2ResponseError, S3ResponseError | |
14 from boto.s3.connection import OrdinaryCallingFormat, S3Connection, SubdomainCallingFormat | |
15 | |
16 import bioblend | |
17 from bioblend.util import Bunch | |
18 | |
19 | |
20 # Uncomment the following line if no logging from boto is desired | |
21 # bioblend.logging.getLogger('boto').setLevel(bioblend.logging.CRITICAL) | |
22 # Uncomment the following line if logging at the prompt is desired | |
23 # bioblend.set_stream_logger(__name__) | |
24 def instance_types(cloud_name='generic'): | |
25 """ | |
26 Return a list of dictionaries containing details about the available | |
27 instance types for the given `cloud_name`. | |
28 | |
29 :type cloud_name: str | |
30 :param cloud_name: A name of the cloud for which the list of instance | |
31 types will be returned. Valid values are: `aws`, | |
32 `nectar`, `generic`. | |
33 | |
34 :rtype: list | |
35 :return: A list of dictionaries describing instance types. Each dict will | |
36 contain the following keys: `name`, `model`, and `description`. | |
37 """ | |
38 instance_list = [] | |
39 if cloud_name.lower() == 'aws': | |
40 instance_list.append({"model": "c3.large", | |
41 "name": "Compute optimized Large", | |
42 "description": "2 vCPU/4GB RAM"}) | |
43 instance_list.append({"model": "c3.2xlarge", | |
44 "name": "Compute optimized 2xLarge", | |
45 "description": "8 vCPU/15GB RAM"}) | |
46 instance_list.append({"model": "c3.8xlarge", | |
47 "name": "Compute optimized 8xLarge", | |
48 "description": "32 vCPU/60GB RAM"}) | |
49 elif cloud_name.lower() in ['nectar', 'generic']: | |
50 instance_list.append({"model": "m1.small", | |
51 "name": "Small", | |
52 "description": "1 vCPU / 4GB RAM"}) | |
53 instance_list.append({"model": "m1.medium", | |
54 "name": "Medium", | |
55 "description": "2 vCPU / 8GB RAM"}) | |
56 instance_list.append({"model": "m1.large", | |
57 "name": "Large", | |
58 "description": "4 vCPU / 16GB RAM"}) | |
59 instance_list.append({"model": "m1.xlarge", | |
60 "name": "Extra Large", | |
61 "description": "8 vCPU / 32GB RAM"}) | |
62 instance_list.append({"model": "m1.xxlarge", | |
63 "name": "Extra-extra Large", | |
64 "description": "16 vCPU / 64GB RAM"}) | |
65 return instance_list | |
66 | |
67 | |
68 class CloudManLauncher(object): | |
69 | |
70 def __init__(self, access_key, secret_key, cloud=None): | |
71 """ | |
72 Define the environment in which this instance of CloudMan will be launched. | |
73 | |
74 Besides providing the credentials, optionally provide the ``cloud`` | |
75 object. This object must define the properties required to establish a | |
76 `boto <https://github.com/boto/boto/>`_ connection to that cloud. See | |
77 this method's implementation for an example of the required fields. | |
78 Note that as long the as provided object defines the required fields, | |
79 it can really by implemented as anything (e.g., a Bunch, a database | |
80 object, a custom class). If no value for the ``cloud`` argument is | |
81 provided, the default is to use the Amazon cloud. | |
82 """ | |
83 self.access_key = access_key | |
84 self.secret_key = secret_key | |
85 if cloud is None: | |
86 # Default to an EC2-compatible object | |
87 self.cloud = Bunch(id='1', # for compatibility w/ DB representation | |
88 name="Amazon", | |
89 cloud_type="ec2", | |
90 bucket_default="cloudman", | |
91 region_name="us-east-1", | |
92 region_endpoint="ec2.amazonaws.com", | |
93 ec2_port="", | |
94 ec2_conn_path="/", | |
95 cidr_range="", | |
96 is_secure=True, | |
97 s3_host="s3.amazonaws.com", | |
98 s3_port="", | |
99 s3_conn_path='/') | |
100 else: | |
101 self.cloud = cloud | |
102 self.ec2_conn = self.connect_ec2( | |
103 self.access_key, | |
104 self.secret_key, | |
105 self.cloud) | |
106 self.vpc_conn = self.connect_vpc( | |
107 self.access_key, | |
108 self.secret_key, | |
109 self.cloud) | |
110 # Define exceptions from http_client that we want to catch and retry | |
111 self.http_exceptions = (http_client.HTTPException, socket.error, | |
112 socket.gaierror, http_client.BadStatusLine) | |
113 | |
114 def __repr__(self): | |
115 return "Cloud: {0}; acct ID: {1}".format( | |
116 self.cloud.name, self.access_key) | |
117 | |
118 def launch(self, cluster_name, image_id, instance_type, password, | |
119 kernel_id=None, ramdisk_id=None, key_name='cloudman_key_pair', | |
120 security_groups=None, placement='', subnet_id=None, | |
121 ebs_optimized=False, **kwargs): | |
122 """ | |
123 Check all the prerequisites (key pair and security groups) for | |
124 launching a CloudMan instance, compose the user data based on the | |
125 parameters specified in the arguments and the cloud properties as | |
126 defined in the object's ``cloud`` field. | |
127 | |
128 For the current list of user data fields that can be provided via | |
129 ``kwargs``, see `<https://galaxyproject.org/cloudman/userdata/>`_ | |
130 | |
131 Return a dict containing the properties and info with which an instance | |
132 was launched, namely: ``sg_names`` containing the names of the security | |
133 groups, ``kp_name`` containing the name of the key pair, ``kp_material`` | |
134 containing the private portion of the key pair (*note* that this portion | |
135 of the key is available and can be retrieved *only* at the time the key | |
136 is created, which will happen only if no key with the name provided in | |
137 the ``key_name`` argument exists), ``rs`` containing the | |
138 `boto <https://github.com/boto/boto/>`_ ``ResultSet`` object, | |
139 ``instance_id`` containing the ID of a started instance, and | |
140 ``error`` containing an error message if there was one. | |
141 """ | |
142 if security_groups is None: | |
143 security_groups = ['CloudMan'] | |
144 ret = {'sg_names': [], | |
145 'sg_ids': [], | |
146 'kp_name': '', | |
147 'kp_material': '', | |
148 'rs': None, | |
149 'instance_id': '', | |
150 'error': None} | |
151 # First satisfy the prerequisites | |
152 for sg in security_groups: | |
153 # Get VPC ID in case we're launching into a VPC | |
154 vpc_id = None | |
155 if subnet_id: | |
156 try: | |
157 sn = self.vpc_conn.get_all_subnets(subnet_id)[0] | |
158 vpc_id = sn.vpc_id | |
159 except (EC2ResponseError, IndexError): | |
160 bioblend.log.exception("Trouble fetching subnet %s", subnet_id) | |
161 cmsg = self.create_cm_security_group(sg, vpc_id=vpc_id) | |
162 ret['error'] = cmsg['error'] | |
163 if ret['error']: | |
164 return ret | |
165 if cmsg['name']: | |
166 ret['sg_names'].append(cmsg['name']) | |
167 ret['sg_ids'].append(cmsg['sg_id']) | |
168 if subnet_id: | |
169 # Must setup a network interface if launching into VPC | |
170 security_groups = None | |
171 interface = boto.ec2.networkinterface.NetworkInterfaceSpecification( | |
172 subnet_id=subnet_id, groups=[cmsg['sg_id']], | |
173 associate_public_ip_address=True) | |
174 network_interfaces = (boto.ec2.networkinterface. | |
175 NetworkInterfaceCollection(interface)) | |
176 else: | |
177 network_interfaces = None | |
178 kp_info = self.create_key_pair(key_name) | |
179 ret['kp_name'] = kp_info['name'] | |
180 ret['kp_material'] = kp_info['material'] | |
181 ret['error'] = kp_info['error'] | |
182 if ret['error']: | |
183 return ret | |
184 # If not provided, try to find a placement | |
185 # TODO: Should placement always be checked? To make sure it's correct | |
186 # for existing clusters. | |
187 if not placement: | |
188 placement = self._find_placement( | |
189 cluster_name).get('placement', None) | |
190 # Compose user data for launching an instance, ensuring we have the | |
191 # required fields | |
192 kwargs['access_key'] = self.access_key | |
193 kwargs['secret_key'] = self.secret_key | |
194 kwargs['cluster_name'] = cluster_name | |
195 kwargs['password'] = password | |
196 kwargs['cloud_name'] = self.cloud.name | |
197 ud = self._compose_user_data(kwargs) | |
198 # Now launch an instance | |
199 try: | |
200 | |
201 rs = None | |
202 rs = self.ec2_conn.run_instances(image_id=image_id, | |
203 instance_type=instance_type, | |
204 key_name=key_name, | |
205 security_groups=security_groups, | |
206 # The following two arguments are | |
207 # provided in the network_interface | |
208 # instead of arguments: | |
209 # security_group_ids=security_group_ids, | |
210 # subnet_id=subnet_id, | |
211 network_interfaces=network_interfaces, | |
212 user_data=ud, | |
213 kernel_id=kernel_id, | |
214 ramdisk_id=ramdisk_id, | |
215 placement=placement, | |
216 ebs_optimized=ebs_optimized) | |
217 ret['rs'] = rs | |
218 except EC2ResponseError as e: | |
219 err_msg = "Problem launching an instance: {0} (code {1}; status {2})" \ | |
220 .format(str(e), e.error_code, e.status) | |
221 bioblend.log.exception(err_msg) | |
222 ret['error'] = err_msg | |
223 return ret | |
224 else: | |
225 if rs: | |
226 try: | |
227 bioblend.log.info("Launched an instance with ID %s", rs.instances[0].id) | |
228 ret['instance_id'] = rs.instances[0].id | |
229 ret['instance_ip'] = rs.instances[0].ip_address | |
230 except EC2ResponseError as e: | |
231 err_msg = "Problem with the launched instance object: {0} " \ | |
232 "(code {1}; status {2})" \ | |
233 .format(str(e), e.error_code, e.status) | |
234 bioblend.log.exception(err_msg) | |
235 ret['error'] = err_msg | |
236 else: | |
237 ret['error'] = ("No response after launching an instance. Check " | |
238 "your account permissions and try again.") | |
239 return ret | |
240 | |
241 def create_cm_security_group(self, sg_name='CloudMan', vpc_id=None): | |
242 """ | |
243 Create a security group with all authorizations required to run CloudMan. | |
244 | |
245 If the group already exists, check its rules and add the missing ones. | |
246 | |
247 :type sg_name: str | |
248 :param sg_name: A name for the security group to be created. | |
249 | |
250 :type vpc_id: str | |
251 :param vpc_id: VPC ID under which to create the security group. | |
252 | |
253 :rtype: dict | |
254 :return: A dictionary containing keys ``name`` (with the value being the | |
255 name of the security group that was created), ``error`` | |
256 (with the value being the error message if there was an error | |
257 or ``None`` if no error was encountered), and ``ports`` | |
258 (containing the list of tuples with port ranges that were | |
259 opened or attempted to be opened). | |
260 | |
261 .. versionchanged:: 0.6.1 | |
262 The return value changed from a string to a dict | |
263 """ | |
264 ports = (('20', '21'), # FTP | |
265 ('22', '22'), # SSH | |
266 ('80', '80'), # Web UI | |
267 ('443', '443'), # SSL Web UI | |
268 ('8800', '8800'), # NodeJS Proxy for Galaxy IPython IE | |
269 ('9600', '9700'), # HTCondor | |
270 ('30000', '30100')) # FTP transfer | |
271 progress = {'name': None, | |
272 'sg_id': None, | |
273 'error': None, | |
274 'ports': ports} | |
275 cmsg = None | |
276 filters = None | |
277 if vpc_id: | |
278 filters = {'vpc-id': vpc_id} | |
279 # Check if this security group already exists | |
280 try: | |
281 sgs = self.ec2_conn.get_all_security_groups(filters=filters) | |
282 except EC2ResponseError as e: | |
283 err_msg = ("Problem getting security groups. This could indicate a " | |
284 "problem with your account credentials or permissions: " | |
285 "{0} (code {1}; status {2})" | |
286 .format(str(e), e.error_code, e.status)) | |
287 bioblend.log.exception(err_msg) | |
288 progress['error'] = err_msg | |
289 return progress | |
290 for sg in sgs: | |
291 if sg.name == sg_name: | |
292 cmsg = sg | |
293 bioblend.log.debug("Security group '%s' already exists; will add authorizations next.", sg_name) | |
294 break | |
295 # If it does not exist, create security group | |
296 if cmsg is None: | |
297 bioblend.log.debug("Creating Security Group %s", sg_name) | |
298 try: | |
299 cmsg = self.ec2_conn.create_security_group(sg_name, 'A security ' | |
300 'group for CloudMan', | |
301 vpc_id=vpc_id) | |
302 except EC2ResponseError as e: | |
303 err_msg = "Problem creating security group '{0}': {1} (code {2}; " \ | |
304 "status {3})" \ | |
305 .format(sg_name, str(e), e.error_code, e.status) | |
306 bioblend.log.exception(err_msg) | |
307 progress['error'] = err_msg | |
308 if cmsg: | |
309 progress['name'] = cmsg.name | |
310 progress['sg_id'] = cmsg.id | |
311 # Add appropriate authorization rules | |
312 # If these rules already exist, nothing will be changed in the SG | |
313 for port in ports: | |
314 try: | |
315 if not self.rule_exists( | |
316 cmsg.rules, from_port=port[0], to_port=port[1]): | |
317 cmsg.authorize( | |
318 ip_protocol='tcp', | |
319 from_port=port[0], | |
320 to_port=port[1], | |
321 cidr_ip='0.0.0.0/0') | |
322 else: | |
323 bioblend.log.debug("Rule (%s:%s) already exists in the SG", port[0], port[1]) | |
324 except EC2ResponseError as e: | |
325 err_msg = "A problem adding security group authorizations: {0} " \ | |
326 "(code {1}; status {2})" \ | |
327 .format(str(e), e.error_code, e.status) | |
328 bioblend.log.exception(err_msg) | |
329 progress['error'] = err_msg | |
330 # Add ICMP (i.e., ping) rule required by HTCondor | |
331 try: | |
332 if not self.rule_exists( | |
333 cmsg.rules, from_port='-1', to_port='-1', ip_protocol='icmp'): | |
334 cmsg.authorize( | |
335 ip_protocol='icmp', | |
336 from_port=-1, | |
337 to_port=-1, | |
338 cidr_ip='0.0.0.0/0') | |
339 else: | |
340 bioblend.log.debug( | |
341 "ICMP rule already exists in {0} SG.".format(sg_name)) | |
342 except EC2ResponseError as e: | |
343 err_msg = "A problem with security ICMP rule authorization: {0} " \ | |
344 "(code {1}; status {2})" \ | |
345 .format(str(e), e.error_code, e.status) | |
346 bioblend.log.exception(err_msg) | |
347 progress['err_msg'] = err_msg | |
348 # Add rule that allows communication between instances in the same | |
349 # SG | |
350 # A flag to indicate if group rule already exists | |
351 g_rule_exists = False | |
352 for rule in cmsg.rules: | |
353 for grant in rule.grants: | |
354 if grant.name == cmsg.name: | |
355 g_rule_exists = True | |
356 bioblend.log.debug( | |
357 "Group rule already exists in the SG.") | |
358 if g_rule_exists: | |
359 break | |
360 if not g_rule_exists: | |
361 try: | |
362 cmsg.authorize( | |
363 src_group=cmsg, | |
364 ip_protocol='tcp', | |
365 from_port=0, | |
366 to_port=65535) | |
367 except EC2ResponseError as e: | |
368 err_msg = "A problem with security group group " \ | |
369 "authorization: {0} (code {1}; status {2})" \ | |
370 .format(str(e), e.error_code, e.status) | |
371 bioblend.log.exception(err_msg) | |
372 progress['err_msg'] = err_msg | |
373 bioblend.log.info("Done configuring '%s' security group", cmsg.name) | |
374 else: | |
375 bioblend.log.warning( | |
376 "Did not create security group '{0}'".format(sg_name)) | |
377 return progress | |
378 | |
379 def rule_exists( | |
380 self, rules, from_port, to_port, ip_protocol='tcp', cidr_ip='0.0.0.0/0'): | |
381 """ | |
382 A convenience method to check if an authorization rule in a security group | |
383 already exists. | |
384 """ | |
385 for rule in rules: | |
386 if rule.ip_protocol == ip_protocol and rule.from_port == from_port and \ | |
387 rule.to_port == to_port and cidr_ip in [ip.cidr_ip for ip in rule.grants]: | |
388 return True | |
389 return False | |
390 | |
391 def create_key_pair(self, key_name='cloudman_key_pair'): | |
392 """ | |
393 If a key pair with the provided ``key_name`` does not exist, create it. | |
394 | |
395 :type sg_name: str | |
396 :param sg_name: A name for the key pair to be created. | |
397 | |
398 :rtype: dict | |
399 :return: A dictionary containing keys ``name`` (with the value being the | |
400 name of the key pair that was created), ``error`` | |
401 (with the value being the error message if there was an error | |
402 or ``None`` if no error was encountered), and ``material`` | |
403 (containing the unencrypted PEM encoded RSA private key if the | |
404 key was created or ``None`` if the key already eixsted). | |
405 | |
406 .. versionchanged:: 0.6.1 | |
407 The return value changed from a tuple to a dict | |
408 """ | |
409 progress = {'name': None, | |
410 'material': None, | |
411 'error': None} | |
412 kp = None | |
413 # Check if a key pair under the given name already exists. If it does not, | |
414 # create it, else return. | |
415 try: | |
416 kps = self.ec2_conn.get_all_key_pairs() | |
417 except EC2ResponseError as e: | |
418 err_msg = "Problem getting key pairs: {0} (code {1}; status {2})" \ | |
419 .format(str(e), e.error_code, e.status) | |
420 bioblend.log.exception(err_msg) | |
421 progress['error'] = err_msg | |
422 return progress | |
423 for akp in kps: | |
424 if akp.name == key_name: | |
425 bioblend.log.info("Key pair '%s' already exists; reusing it.", key_name) | |
426 progress['name'] = akp.name | |
427 return progress | |
428 try: | |
429 kp = self.ec2_conn.create_key_pair(key_name) | |
430 except EC2ResponseError as e: | |
431 err_msg = "Problem creating key pair '{0}': {1} (code {2}; status {3})" \ | |
432 .format(key_name, str(e), e.error_code, e.status) | |
433 bioblend.log.exception(err_msg) | |
434 progress['error'] = err_msg | |
435 return progress | |
436 bioblend.log.info("Created key pair '%s'", kp.name) | |
437 progress['name'] = kp.name | |
438 progress['material'] = kp.material | |
439 return progress | |
440 | |
441 def assign_floating_ip(self, ec2_conn, instance): | |
442 try: | |
443 bioblend.log.debug("Allocating a new floating IP address.") | |
444 address = ec2_conn.allocate_address() | |
445 except EC2ResponseError: | |
446 bioblend.log.exception("Exception allocating a new floating IP address") | |
447 bioblend.log.info("Associating floating IP %s to instance %s", address.public_ip, instance.id) | |
448 ec2_conn.associate_address(instance_id=instance.id, | |
449 public_ip=address.public_ip) | |
450 | |
451 def get_status(self, instance_id): | |
452 """ | |
453 Check on the status of an instance. ``instance_id`` needs to be a | |
454 ``boto``-library copatible instance ID (e.g., ``i-8fehrdss``).If | |
455 ``instance_id`` is not provided, the ID obtained when launching | |
456 *the most recent* instance is used. Note that this assumes the instance | |
457 being checked on was launched using this class. Also note that the same | |
458 class may be used to launch multiple instances but only the most recent | |
459 ``instance_id`` is kept while any others will to be explicitly specified. | |
460 | |
461 This method also allows the required ``ec2_conn`` connection object to be | |
462 provided at invocation time. If the object is not provided, credentials | |
463 defined for the class are used (ability to specify a custom ``ec2_conn`` | |
464 helps in case of stateless method invocations). | |
465 | |
466 Return a ``state`` dict containing the following keys: ``instance_state``, | |
467 ``public_ip``, ``placement``, and ``error``, which capture CloudMan's | |
468 current state. For ``instance_state``, expected values are: ``pending``, | |
469 ``booting``, ``running``, or ``error`` and represent the state of the | |
470 underlying instance. Other keys will return an empty value until the | |
471 ``instance_state`` enters ``running`` state. | |
472 """ | |
473 ec2_conn = self.ec2_conn | |
474 rs = None | |
475 state = {'instance_state': "", | |
476 'public_ip': "", | |
477 'placement': "", | |
478 'error': ""} | |
479 | |
480 # Make sure we have an instance ID | |
481 if instance_id is None: | |
482 err = "Missing instance ID, cannot check the state." | |
483 bioblend.log.error(err) | |
484 state['error'] = err | |
485 return state | |
486 try: | |
487 rs = ec2_conn.get_all_instances([instance_id]) | |
488 if rs is not None: | |
489 inst_state = rs[0].instances[0].update() | |
490 public_ip = rs[0].instances[0].ip_address | |
491 state['public_ip'] = public_ip | |
492 if inst_state == 'running': | |
493 # if there's a private ip, but no public ip | |
494 # attempt auto allocation of floating IP | |
495 if rs[0].instances[0].private_ip_address and not public_ip: | |
496 self.assign_floating_ip(ec2_conn, rs[0].instances[0]) | |
497 cm_url = "http://{dns}/cloud".format(dns=public_ip) | |
498 # Wait until the CloudMan URL is accessible to return the | |
499 # data | |
500 if self._checkURL(cm_url) is True: | |
501 state['instance_state'] = inst_state | |
502 state['placement'] = rs[0].instances[0].placement | |
503 else: | |
504 state['instance_state'] = 'booting' | |
505 else: | |
506 state['instance_state'] = inst_state | |
507 except Exception as e: | |
508 err = "Problem updating instance '%s' state: %s" % (instance_id, e) | |
509 bioblend.log.error(err) | |
510 state['error'] = err | |
511 return state | |
512 | |
513 def get_clusters_pd(self, include_placement=True): | |
514 """ | |
515 Return *persistent data* of all existing clusters for this account. | |
516 | |
517 :type include_placement: bool | |
518 :param include_placement: Whether or not to include region placement for | |
519 the clusters. Setting this option will lead | |
520 to a longer function runtime. | |
521 | |
522 :rtype: dict | |
523 :return: A dictionary containing keys ``clusters`` and ``error``. The | |
524 value of ``clusters`` will be a dictionary with the following keys | |
525 ``cluster_name``, ``persistent_data``, ``bucket_name`` and optionally | |
526 ``placement`` or an empty list if no clusters were found or an | |
527 error was encountered. ``persistent_data`` key value is yet | |
528 another dictionary containing given cluster's persistent data. | |
529 The value for the ``error`` key will contain a string with the | |
530 error message. | |
531 | |
532 .. versionadded:: 0.3 | |
533 .. versionchanged:: 0.7.0 | |
534 The return value changed from a list to a dictionary. | |
535 """ | |
536 clusters = [] | |
537 response = {'clusters': clusters, 'error': None} | |
538 s3_conn = self.connect_s3(self.access_key, self.secret_key, self.cloud) | |
539 try: | |
540 buckets = s3_conn.get_all_buckets() | |
541 except S3ResponseError as e: | |
542 response['error'] = "S3ResponseError getting buckets: %s" % e | |
543 except self.http_exceptions as ex: | |
544 response['error'] = "Exception getting buckets: %s" % ex | |
545 if response['error']: | |
546 bioblend.log.exception(response['error']) | |
547 return response | |
548 for bucket in [b for b in buckets if b.name.startswith('cm-')]: | |
549 try: | |
550 # TODO: first lookup if persistent_data.yaml key exists | |
551 pd = bucket.get_key('persistent_data.yaml') | |
552 except S3ResponseError: | |
553 # This can fail for a number of reasons for non-us and/or | |
554 # CNAME'd buckets but it is not a terminal error | |
555 bioblend.log.warning("Problem fetching persistent_data.yaml from bucket %s", bucket) | |
556 continue | |
557 if pd: | |
558 # We are dealing with a CloudMan bucket | |
559 pd_contents = pd.get_contents_as_string() | |
560 pd = yaml.load(pd_contents) | |
561 if 'cluster_name' in pd: | |
562 cluster_name = pd['cluster_name'] | |
563 else: | |
564 for key in bucket.list(): | |
565 if key.name.endswith('.clusterName'): | |
566 cluster_name = key.name.split('.clusterName')[0] | |
567 cluster = {'cluster_name': cluster_name, | |
568 'persistent_data': pd, | |
569 'bucket_name': bucket.name} | |
570 # Look for cluster's placement too | |
571 if include_placement: | |
572 placement = self._find_placement(cluster_name, cluster) | |
573 cluster['placement'] = placement | |
574 clusters.append(cluster) | |
575 response['clusters'] = clusters | |
576 return response | |
577 | |
578 def get_cluster_pd(self, cluster_name): | |
579 """ | |
580 Return *persistent data* (as a dict) associated with a cluster with the | |
581 given ``cluster_name``. If a cluster with the given name is not found, | |
582 return an empty dict. | |
583 | |
584 .. versionadded:: 0.3 | |
585 """ | |
586 cluster = {} | |
587 clusters = self.get_clusters_pd().get('clusters', []) | |
588 for c in clusters: | |
589 if c['cluster_name'] == cluster_name: | |
590 cluster = c | |
591 break | |
592 return cluster | |
593 | |
594 def connect_ec2(self, a_key, s_key, cloud=None): | |
595 """ | |
596 Create and return an EC2-compatible connection object for the given cloud. | |
597 | |
598 See ``_get_cloud_info`` method for more details on the requirements for | |
599 the ``cloud`` parameter. If no value is provided, the class field is used. | |
600 """ | |
601 if cloud is None: | |
602 cloud = self.cloud | |
603 ci = self._get_cloud_info(cloud) | |
604 r = RegionInfo(name=ci['region_name'], endpoint=ci['region_endpoint']) | |
605 ec2_conn = boto.connect_ec2(aws_access_key_id=a_key, | |
606 aws_secret_access_key=s_key, | |
607 is_secure=ci['is_secure'], | |
608 region=r, | |
609 port=ci['ec2_port'], | |
610 path=ci['ec2_conn_path'], | |
611 validate_certs=False) | |
612 return ec2_conn | |
613 | |
614 def connect_s3(self, a_key, s_key, cloud=None): | |
615 """ | |
616 Create and return an S3-compatible connection object for the given cloud. | |
617 | |
618 See ``_get_cloud_info`` method for more details on the requirements for | |
619 the ``cloud`` parameter. If no value is provided, the class field is used. | |
620 """ | |
621 if cloud is None: | |
622 cloud = self.cloud | |
623 ci = self._get_cloud_info(cloud) | |
624 if ci['cloud_type'] == 'amazon': | |
625 calling_format = SubdomainCallingFormat() | |
626 else: | |
627 calling_format = OrdinaryCallingFormat() | |
628 s3_conn = S3Connection( | |
629 aws_access_key_id=a_key, aws_secret_access_key=s_key, | |
630 is_secure=ci['is_secure'], port=ci['s3_port'], host=ci['s3_host'], | |
631 path=ci['s3_conn_path'], calling_format=calling_format) | |
632 return s3_conn | |
633 | |
634 def connect_vpc(self, a_key, s_key, cloud=None): | |
635 """ | |
636 Establish a connection to the VPC service. | |
637 | |
638 TODO: Make this work with non-default clouds as well. | |
639 """ | |
640 if cloud is None: | |
641 cloud = self.cloud | |
642 ci = self._get_cloud_info(cloud) | |
643 r = RegionInfo(name=ci['region_name'], endpoint=ci['region_endpoint']) | |
644 vpc_conn = boto.connect_vpc( | |
645 aws_access_key_id=a_key, | |
646 aws_secret_access_key=s_key, | |
647 is_secure=ci['is_secure'], | |
648 region=r, | |
649 port=ci['ec2_port'], | |
650 path=ci['ec2_conn_path'], | |
651 validate_certs=False) | |
652 return vpc_conn | |
653 | |
654 def _compose_user_data(self, user_provided_data): | |
655 """ | |
656 A convenience method used to compose and properly format the user data | |
657 required when requesting an instance. | |
658 | |
659 ``user_provided_data`` is the data provided by a user required to identify | |
660 a cluster and user other user requirements. | |
661 """ | |
662 form_data = {} | |
663 # Do not include the following fields in the user data but do include | |
664 # any 'advanced startup fields' that might be added in the future | |
665 excluded_fields = ['sg_name', 'image_id', 'instance_id', 'kp_name', | |
666 'cloud', 'cloud_type', 'public_dns', 'cidr_range', | |
667 'kp_material', 'placement', 'flavor_id'] | |
668 for key, value in user_provided_data.items(): | |
669 if key not in excluded_fields: | |
670 form_data[key] = value | |
671 # If the following user data keys are empty, do not include them in the | |
672 # request user data | |
673 udkeys = [ | |
674 'post_start_script_url', | |
675 'worker_post_start_script_url', | |
676 'bucket_default', | |
677 'share_string'] | |
678 for udkey in udkeys: | |
679 if udkey in form_data and form_data[udkey] == '': | |
680 del form_data[udkey] | |
681 # If bucket_default was not provided, add a default value to the user data | |
682 # (missing value does not play nicely with CloudMan's ec2autorun.py) | |
683 if not form_data.get( | |
684 'bucket_default', None) and self.cloud.bucket_default: | |
685 form_data['bucket_default'] = self.cloud.bucket_default | |
686 # Reuse the ``password`` for the ``freenxpass`` user data option | |
687 if 'freenxpass' not in form_data and 'password' in form_data: | |
688 form_data['freenxpass'] = form_data['password'] | |
689 # Convert form_data into the YAML format | |
690 ud = yaml.dump(form_data, default_flow_style=False, allow_unicode=False) | |
691 # Also include connection info about the selected cloud | |
692 ci = self._get_cloud_info(self.cloud, as_str=True) | |
693 return ud + "\n" + ci | |
694 | |
695 def _get_cloud_info(self, cloud, as_str=False): | |
696 """ | |
697 Get connection information about a given cloud | |
698 """ | |
699 ci = {} | |
700 ci['cloud_type'] = cloud.cloud_type | |
701 ci['region_name'] = cloud.region_name | |
702 ci['region_endpoint'] = cloud.region_endpoint | |
703 ci['is_secure'] = cloud.is_secure | |
704 ci['ec2_port'] = cloud.ec2_port if cloud.ec2_port != '' else None | |
705 ci['ec2_conn_path'] = cloud.ec2_conn_path | |
706 # Include cidr_range only if not empty | |
707 if cloud.cidr_range != '': | |
708 ci['cidr_range'] = cloud.cidr_range | |
709 ci['s3_host'] = cloud.s3_host | |
710 ci['s3_port'] = cloud.s3_port if cloud.s3_port != '' else None | |
711 ci['s3_conn_path'] = cloud.s3_conn_path | |
712 if as_str: | |
713 ci = yaml.dump(ci, default_flow_style=False, allow_unicode=False) | |
714 return ci | |
715 | |
716 def _get_volume_placement(self, vol_id): | |
717 """ | |
718 Returns the placement of a volume (or None, if it cannot be determined) | |
719 """ | |
720 try: | |
721 vol = self.ec2_conn.get_all_volumes(volume_ids=[vol_id]) | |
722 except EC2ResponseError as ec2e: | |
723 bioblend.log.error("EC2ResponseError querying for volume {0}: {1}" | |
724 .format(vol_id, ec2e)) | |
725 vol = None | |
726 if vol: | |
727 return vol[0].zone | |
728 else: | |
729 bioblend.log.error("Requested placement of a volume '%s' that does not exist.", vol_id) | |
730 return None | |
731 | |
732 def _find_placement(self, cluster_name, cluster=None): | |
733 """ | |
734 Find a placement zone for a cluster with the name ``cluster_name``. | |
735 | |
736 By default, this method will search for and fetch given cluster's | |
737 *persistent data*; alternatively, *persistent data* can be provided via | |
738 the ``cluster`` parameter. This dict needs to have ``persistent_data`` | |
739 key with the contents of cluster's *persistent data*. | |
740 If the cluster or the volume associated with the cluster cannot be found, | |
741 cluster placement is set to ``None``. | |
742 | |
743 :rtype: dict | |
744 :return: A dictionary with ``placement`` and ``error`` keywords. | |
745 | |
746 .. versionchanged:: 0.7.0 | |
747 The return value changed from a list to a dictionary. | |
748 """ | |
749 placement = None | |
750 response = {'placement': placement, 'error': None} | |
751 cluster = cluster or self.get_cluster_pd(cluster_name) | |
752 if cluster and 'persistent_data' in cluster: | |
753 pd = cluster['persistent_data'] | |
754 try: | |
755 if 'placement' in pd: | |
756 response['placement'] = pd['placement'] | |
757 elif 'data_filesystems' in pd: | |
758 # We have v1 format persistent data so get the volume first and | |
759 # then the placement zone | |
760 vol_id = pd['data_filesystems']['galaxyData'][0]['vol_id'] | |
761 response['placement'] = self._get_volume_placement(vol_id) | |
762 elif 'filesystems' in pd: | |
763 # V2 format. | |
764 for fs in [fs for fs in pd['filesystems'] if fs.get( | |
765 'kind', None) == 'volume' and 'ids' in fs]: | |
766 # All volumes must be in the same zone | |
767 vol_id = fs['ids'][0] | |
768 response['placement'] = self._get_volume_placement( | |
769 vol_id) | |
770 # No need to continue to iterate through | |
771 # filesystems, if we found one with a volume. | |
772 break | |
773 except Exception as exc: | |
774 response['error'] = ("Exception while finding placement for " | |
775 "cluster '{0}'. This can indicate malformed " | |
776 "instance data. Or that this method is " | |
777 "broken: {1}".format(cluster_name, exc)) | |
778 bioblend.log.error(response['error']) | |
779 response['placement'] = None | |
780 else: | |
781 bioblend.log.debug("Insufficient info about cluster {0} to get placement." | |
782 .format(cluster_name)) | |
783 return response | |
784 | |
785 def find_placements( | |
786 self, ec2_conn, instance_type, cloud_type, cluster_name=None): | |
787 """ | |
788 Find a list of placement zones that support the specified instance type. | |
789 | |
790 If ``cluster_name`` is given and a cluster with the given name exist, | |
791 return a list with only one entry where the given cluster lives. | |
792 | |
793 Searching for available zones for a given instance type is done by | |
794 checking the spot prices in the potential availability zones for | |
795 support before deciding on a region: | |
796 http://blog.piefox.com/2011/07/ec2-availability-zones-and-instance.html | |
797 | |
798 Note that, currently, instance-type based zone selection applies only to | |
799 AWS. For other clouds, all the available zones are returned (unless a | |
800 cluster is being recreated, in which case the cluster's placement zone is | |
801 returned sa stored in its persistent data. | |
802 | |
803 :rtype: dict | |
804 :return: A dictionary with ``zones`` and ``error`` keywords. | |
805 | |
806 .. versionchanged:: 0.3 | |
807 Changed method name from ``_find_placements`` to ``find_placements``. | |
808 Also added ``cluster_name`` parameter. | |
809 | |
810 .. versionchanged:: 0.7.0 | |
811 The return value changed from a list to a dictionary. | |
812 """ | |
813 # First look for a specific zone a given cluster is bound to | |
814 zones = [] | |
815 response = {'zones': zones, 'error': None} | |
816 if cluster_name: | |
817 placement = self._find_placement(cluster_name) | |
818 if placement.get('error'): | |
819 response['error'] = placement['error'] | |
820 return response | |
821 response['zones'] = placement.get('placement', []) | |
822 # If placement is not found, look for a list of available zones | |
823 if not response['zones']: | |
824 in_the_past = datetime.datetime.now() - datetime.timedelta(hours=1) | |
825 back_compatible_zone = "us-east-1e" | |
826 for zone in [ | |
827 z for z in ec2_conn.get_all_zones() if z.state == 'available']: | |
828 # Non EC2 clouds may not support get_spot_price_history | |
829 if instance_type is None or cloud_type != 'ec2': | |
830 zones.append(zone.name) | |
831 elif ec2_conn.get_spot_price_history(instance_type=instance_type, | |
832 end_time=in_the_past.isoformat(), | |
833 availability_zone=zone.name): | |
834 zones.append(zone.name) | |
835 # Higher-lettered zones seem to have more availability currently | |
836 zones.sort(reverse=True) | |
837 if back_compatible_zone in zones: | |
838 zones = [back_compatible_zone] + \ | |
839 [z for z in zones if z != back_compatible_zone] | |
840 if len(zones) == 0: | |
841 response['error'] = ("Did not find availabilty zone for {0}" | |
842 .format(instance_type)) | |
843 bioblend.log.error(response['error']) | |
844 zones.append(back_compatible_zone) | |
845 return response | |
846 | |
847 def _checkURL(self, url): | |
848 """ | |
849 Check if the ``url`` is *alive* (i.e., remote server returns code 200(OK) | |
850 or 401 (unauthorized)). | |
851 """ | |
852 try: | |
853 p = urlparse(url) | |
854 h = HTTPConnection(p[1]) | |
855 h.putrequest('HEAD', p[2]) | |
856 h.endheaders() | |
857 r = h.getresponse() | |
858 # CloudMan UI is pwd protected so include 401 | |
859 if r.status in (200, 401): | |
860 return True | |
861 except Exception: | |
862 # No response or no good response | |
863 pass | |
864 return False |