Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/bioblend/cloudman/launch.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac | 
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 | 
| parents | 79f47841a781 | 
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| 4:79f47841a781 | 5:9b1c78e6ba9c | 
|---|---|
| 1 """ | |
| 2 Setup and launch a CloudMan instance. | |
| 3 """ | |
| 4 import datetime | |
| 5 import socket | |
| 6 | |
| 7 import boto | |
| 8 import six | |
| 9 import yaml | |
| 10 from boto.compat import http_client | |
| 11 from boto.ec2.regioninfo import RegionInfo | |
| 12 from boto.exception import EC2ResponseError, S3ResponseError | |
| 13 from boto.s3.connection import OrdinaryCallingFormat, S3Connection, SubdomainCallingFormat | |
| 14 from six.moves.http_client import HTTPConnection | |
| 15 from six.moves.urllib.parse import urlparse | |
| 16 | |
| 17 import bioblend | |
| 18 from bioblend.util import Bunch | |
| 19 | |
| 20 | |
| 21 # Uncomment the following line if no logging from boto is desired | |
| 22 # bioblend.logging.getLogger('boto').setLevel(bioblend.logging.CRITICAL) | |
| 23 # Uncomment the following line if logging at the prompt is desired | |
| 24 # bioblend.set_stream_logger(__name__) | |
| 25 def instance_types(cloud_name='generic'): | |
| 26 """ | |
| 27 Return a list of dictionaries containing details about the available | |
| 28 instance types for the given `cloud_name`. | |
| 29 | |
| 30 :type cloud_name: str | |
| 31 :param cloud_name: A name of the cloud for which the list of instance | |
| 32 types will be returned. Valid values are: `aws`, | |
| 33 `nectar`, `generic`. | |
| 34 | |
| 35 :rtype: list | |
| 36 :return: A list of dictionaries describing instance types. Each dict will | |
| 37 contain the following keys: `name`, `model`, and `description`. | |
| 38 """ | |
| 39 instance_list = [] | |
| 40 if cloud_name.lower() == 'aws': | |
| 41 instance_list.append({"model": "c3.large", | |
| 42 "name": "Compute optimized Large", | |
| 43 "description": "2 vCPU/4GB RAM"}) | |
| 44 instance_list.append({"model": "c3.2xlarge", | |
| 45 "name": "Compute optimized 2xLarge", | |
| 46 "description": "8 vCPU/15GB RAM"}) | |
| 47 instance_list.append({"model": "c3.8xlarge", | |
| 48 "name": "Compute optimized 8xLarge", | |
| 49 "description": "32 vCPU/60GB RAM"}) | |
| 50 elif cloud_name.lower() in ['nectar', 'generic']: | |
| 51 instance_list.append({"model": "m1.small", | |
| 52 "name": "Small", | |
| 53 "description": "1 vCPU / 4GB RAM"}) | |
| 54 instance_list.append({"model": "m1.medium", | |
| 55 "name": "Medium", | |
| 56 "description": "2 vCPU / 8GB RAM"}) | |
| 57 instance_list.append({"model": "m1.large", | |
| 58 "name": "Large", | |
| 59 "description": "4 vCPU / 16GB RAM"}) | |
| 60 instance_list.append({"model": "m1.xlarge", | |
| 61 "name": "Extra Large", | |
| 62 "description": "8 vCPU / 32GB RAM"}) | |
| 63 instance_list.append({"model": "m1.xxlarge", | |
| 64 "name": "Extra-extra Large", | |
| 65 "description": "16 vCPU / 64GB RAM"}) | |
| 66 return instance_list | |
| 67 | |
| 68 | |
| 69 class CloudManLauncher(object): | |
| 70 | |
| 71 def __init__(self, access_key, secret_key, cloud=None): | |
| 72 """ | |
| 73 Define the environment in which this instance of CloudMan will be launched. | |
| 74 | |
| 75 Besides providing the credentials, optionally provide the ``cloud`` | |
| 76 object. This object must define the properties required to establish a | |
| 77 `boto <https://github.com/boto/boto/>`_ connection to that cloud. See | |
| 78 this method's implementation for an example of the required fields. | |
| 79 Note that as long the as provided object defines the required fields, | |
| 80 it can really by implemented as anything (e.g., a Bunch, a database | |
| 81 object, a custom class). If no value for the ``cloud`` argument is | |
| 82 provided, the default is to use the Amazon cloud. | |
| 83 """ | |
| 84 self.access_key = access_key | |
| 85 self.secret_key = secret_key | |
| 86 if cloud is None: | |
| 87 # Default to an EC2-compatible object | |
| 88 self.cloud = Bunch(id='1', # for compatibility w/ DB representation | |
| 89 name="Amazon", | |
| 90 cloud_type="ec2", | |
| 91 bucket_default="cloudman", | |
| 92 region_name="us-east-1", | |
| 93 region_endpoint="ec2.amazonaws.com", | |
| 94 ec2_port="", | |
| 95 ec2_conn_path="/", | |
| 96 cidr_range="", | |
| 97 is_secure=True, | |
| 98 s3_host="s3.amazonaws.com", | |
| 99 s3_port="", | |
| 100 s3_conn_path='/') | |
| 101 else: | |
| 102 self.cloud = cloud | |
| 103 self.ec2_conn = self.connect_ec2( | |
| 104 self.access_key, | |
| 105 self.secret_key, | |
| 106 self.cloud) | |
| 107 self.vpc_conn = self.connect_vpc( | |
| 108 self.access_key, | |
| 109 self.secret_key, | |
| 110 self.cloud) | |
| 111 # Define exceptions from http_client that we want to catch and retry | |
| 112 self.http_exceptions = (http_client.HTTPException, socket.error, | |
| 113 socket.gaierror, http_client.BadStatusLine) | |
| 114 | |
| 115 def __repr__(self): | |
| 116 return "Cloud: {0}; acct ID: {1}".format( | |
| 117 self.cloud.name, self.access_key) | |
| 118 | |
| 119 def launch(self, cluster_name, image_id, instance_type, password, | |
| 120 kernel_id=None, ramdisk_id=None, key_name='cloudman_key_pair', | |
| 121 security_groups=['CloudMan'], placement='', subnet_id=None, | |
| 122 ebs_optimized=False, **kwargs): | |
| 123 """ | |
| 124 Check all the prerequisites (key pair and security groups) for | |
| 125 launching a CloudMan instance, compose the user data based on the | |
| 126 parameters specified in the arguments and the cloud properties as | |
| 127 defined in the object's ``cloud`` field. | |
| 128 | |
| 129 For the current list of user data fields that can be provided via | |
| 130 ``kwargs``, see `<https://galaxyproject.org/cloudman/userdata/>`_ | |
| 131 | |
| 132 Return a dict containing the properties and info with which an instance | |
| 133 was launched, namely: ``sg_names`` containing the names of the security | |
| 134 groups, ``kp_name`` containing the name of the key pair, ``kp_material`` | |
| 135 containing the private portion of the key pair (*note* that this portion | |
| 136 of the key is available and can be retrieved *only* at the time the key | |
| 137 is created, which will happen only if no key with the name provided in | |
| 138 the ``key_name`` argument exists), ``rs`` containing the | |
| 139 `boto <https://github.com/boto/boto/>`_ ``ResultSet`` object, | |
| 140 ``instance_id`` containing the ID of a started instance, and | |
| 141 ``error`` containing an error message if there was one. | |
| 142 """ | |
| 143 ret = {'sg_names': [], | |
| 144 'sg_ids': [], | |
| 145 'kp_name': '', | |
| 146 'kp_material': '', | |
| 147 'rs': None, | |
| 148 'instance_id': '', | |
| 149 'error': None} | |
| 150 # First satisfy the prerequisites | |
| 151 for sg in security_groups: | |
| 152 # Get VPC ID in case we're launching into a VPC | |
| 153 vpc_id = None | |
| 154 if subnet_id: | |
| 155 try: | |
| 156 sn = self.vpc_conn.get_all_subnets(subnet_id)[0] | |
| 157 vpc_id = sn.vpc_id | |
| 158 except (EC2ResponseError, IndexError) as e: | |
| 159 bioblend.log.exception("Trouble fetching subnet %s: %s" % | |
| 160 (subnet_id, e)) | |
| 161 cmsg = self.create_cm_security_group(sg, vpc_id=vpc_id) | |
| 162 ret['error'] = cmsg['error'] | |
| 163 if ret['error']: | |
| 164 return ret | |
| 165 if cmsg['name']: | |
| 166 ret['sg_names'].append(cmsg['name']) | |
| 167 ret['sg_ids'].append(cmsg['sg_id']) | |
| 168 if subnet_id: | |
| 169 # Must setup a network interface if launching into VPC | |
| 170 security_groups = None | |
| 171 interface = boto.ec2.networkinterface.NetworkInterfaceSpecification( | |
| 172 subnet_id=subnet_id, groups=[cmsg['sg_id']], | |
| 173 associate_public_ip_address=True) | |
| 174 network_interfaces = (boto.ec2.networkinterface. | |
| 175 NetworkInterfaceCollection(interface)) | |
| 176 else: | |
| 177 network_interfaces = None | |
| 178 kp_info = self.create_key_pair(key_name) | |
| 179 ret['kp_name'] = kp_info['name'] | |
| 180 ret['kp_material'] = kp_info['material'] | |
| 181 ret['error'] = kp_info['error'] | |
| 182 if ret['error']: | |
| 183 return ret | |
| 184 # If not provided, try to find a placement | |
| 185 # TODO: Should placement always be checked? To make sure it's correct | |
| 186 # for existing clusters. | |
| 187 if not placement: | |
| 188 placement = self._find_placement( | |
| 189 cluster_name).get('placement', None) | |
| 190 # Compose user data for launching an instance, ensuring we have the | |
| 191 # required fields | |
| 192 kwargs['access_key'] = self.access_key | |
| 193 kwargs['secret_key'] = self.secret_key | |
| 194 kwargs['cluster_name'] = cluster_name | |
| 195 kwargs['password'] = password | |
| 196 kwargs['cloud_name'] = self.cloud.name | |
| 197 ud = self._compose_user_data(kwargs) | |
| 198 # Now launch an instance | |
| 199 try: | |
| 200 | |
| 201 rs = None | |
| 202 rs = self.ec2_conn.run_instances(image_id=image_id, | |
| 203 instance_type=instance_type, | |
| 204 key_name=key_name, | |
| 205 security_groups=security_groups, | |
| 206 # The following two arguments are | |
| 207 # provided in the network_interface | |
| 208 # instead of arguments: | |
| 209 # security_group_ids=security_group_ids, | |
| 210 # subnet_id=subnet_id, | |
| 211 network_interfaces=network_interfaces, | |
| 212 user_data=ud, | |
| 213 kernel_id=kernel_id, | |
| 214 ramdisk_id=ramdisk_id, | |
| 215 placement=placement, | |
| 216 ebs_optimized=ebs_optimized) | |
| 217 ret['rs'] = rs | |
| 218 except EC2ResponseError as e: | |
| 219 err_msg = "Problem launching an instance: {0} (code {1}; status {2})" \ | |
| 220 .format(e.message, e.error_code, e.status) | |
| 221 bioblend.log.exception(err_msg) | |
| 222 ret['error'] = err_msg | |
| 223 return ret | |
| 224 else: | |
| 225 if rs: | |
| 226 try: | |
| 227 bioblend.log.info( | |
| 228 "Launched an instance with ID %s" % | |
| 229 rs.instances[0].id) | |
| 230 ret['instance_id'] = rs.instances[0].id | |
| 231 ret['instance_ip'] = rs.instances[0].ip_address | |
| 232 except EC2ResponseError as e: | |
| 233 err_msg = "Problem with the launched instance object: {0} " \ | |
| 234 "(code {1}; status {2})" \ | |
| 235 .format(e.message, e.error_code, e.status) | |
| 236 bioblend.log.exception(err_msg) | |
| 237 ret['error'] = err_msg | |
| 238 else: | |
| 239 ret['error'] = ("No response after launching an instance. Check " | |
| 240 "your account permissions and try again.") | |
| 241 return ret | |
| 242 | |
| 243 def create_cm_security_group(self, sg_name='CloudMan', vpc_id=None): | |
| 244 """ | |
| 245 Create a security group with all authorizations required to run CloudMan. | |
| 246 | |
| 247 If the group already exists, check its rules and add the missing ones. | |
| 248 | |
| 249 :type sg_name: str | |
| 250 :param sg_name: A name for the security group to be created. | |
| 251 | |
| 252 :type vpc_id: str | |
| 253 :param vpc_id: VPC ID under which to create the security group. | |
| 254 | |
| 255 :rtype: dict | |
| 256 :return: A dictionary containing keys ``name`` (with the value being the | |
| 257 name of the security group that was created), ``error`` | |
| 258 (with the value being the error message if there was an error | |
| 259 or ``None`` if no error was encountered), and ``ports`` | |
| 260 (containing the list of tuples with port ranges that were | |
| 261 opened or attempted to be opened). | |
| 262 | |
| 263 .. versionchanged:: 0.6.1 | |
| 264 The return value changed from a string to a dict | |
| 265 """ | |
| 266 ports = (('20', '21'), # FTP | |
| 267 ('22', '22'), # SSH | |
| 268 ('80', '80'), # Web UI | |
| 269 ('443', '443'), # SSL Web UI | |
| 270 ('8800', '8800'), # NodeJS Proxy for Galaxy IPython IE | |
| 271 ('9600', '9700'), # HTCondor | |
| 272 ('30000', '30100')) # FTP transfer | |
| 273 progress = {'name': None, | |
| 274 'sg_id': None, | |
| 275 'error': None, | |
| 276 'ports': ports} | |
| 277 cmsg = None | |
| 278 filters = None | |
| 279 if vpc_id: | |
| 280 filters = {'vpc-id': vpc_id} | |
| 281 # Check if this security group already exists | |
| 282 try: | |
| 283 sgs = self.ec2_conn.get_all_security_groups(filters=filters) | |
| 284 except EC2ResponseError as e: | |
| 285 err_msg = ("Problem getting security groups. This could indicate a " | |
| 286 "problem with your account credentials or permissions: " | |
| 287 "{0} (code {1}; status {2})" | |
| 288 .format(e.message, e.error_code, e.status)) | |
| 289 bioblend.log.exception(err_msg) | |
| 290 progress['error'] = err_msg | |
| 291 return progress | |
| 292 for sg in sgs: | |
| 293 if sg.name == sg_name: | |
| 294 cmsg = sg | |
| 295 bioblend.log.debug("Security group '%s' already exists; will " | |
| 296 "add authorizations next." % sg_name) | |
| 297 break | |
| 298 # If it does not exist, create security group | |
| 299 if cmsg is None: | |
| 300 bioblend.log.debug("Creating Security Group %s" % sg_name) | |
| 301 try: | |
| 302 cmsg = self.ec2_conn.create_security_group(sg_name, 'A security ' | |
| 303 'group for CloudMan', | |
| 304 vpc_id=vpc_id) | |
| 305 except EC2ResponseError as e: | |
| 306 err_msg = "Problem creating security group '{0}': {1} (code {2}; " \ | |
| 307 "status {3})" \ | |
| 308 .format(sg_name, e.message, e.error_code, e.status) | |
| 309 bioblend.log.exception(err_msg) | |
| 310 progress['error'] = err_msg | |
| 311 if cmsg: | |
| 312 progress['name'] = cmsg.name | |
| 313 progress['sg_id'] = cmsg.id | |
| 314 # Add appropriate authorization rules | |
| 315 # If these rules already exist, nothing will be changed in the SG | |
| 316 for port in ports: | |
| 317 try: | |
| 318 if not self.rule_exists( | |
| 319 cmsg.rules, from_port=port[0], to_port=port[1]): | |
| 320 cmsg.authorize( | |
| 321 ip_protocol='tcp', | |
| 322 from_port=port[0], | |
| 323 to_port=port[1], | |
| 324 cidr_ip='0.0.0.0/0') | |
| 325 else: | |
| 326 bioblend.log.debug( | |
| 327 "Rule (%s:%s) already exists in the SG" % | |
| 328 (port[0], port[1])) | |
| 329 except EC2ResponseError as e: | |
| 330 err_msg = "A problem adding security group authorizations: {0} " \ | |
| 331 "(code {1}; status {2})" \ | |
| 332 .format(e.message, e.error_code, e.status) | |
| 333 bioblend.log.exception(err_msg) | |
| 334 progress['error'] = err_msg | |
| 335 # Add ICMP (i.e., ping) rule required by HTCondor | |
| 336 try: | |
| 337 if not self.rule_exists( | |
| 338 cmsg.rules, from_port='-1', to_port='-1', ip_protocol='icmp'): | |
| 339 cmsg.authorize( | |
| 340 ip_protocol='icmp', | |
| 341 from_port=-1, | |
| 342 to_port=-1, | |
| 343 cidr_ip='0.0.0.0/0') | |
| 344 else: | |
| 345 bioblend.log.debug( | |
| 346 "ICMP rule already exists in {0} SG.".format(sg_name)) | |
| 347 except EC2ResponseError as e: | |
| 348 err_msg = "A problem with security ICMP rule authorization: {0} " \ | |
| 349 "(code {1}; status {2})" \ | |
| 350 .format(e.message, e.error_code, e.status) | |
| 351 bioblend.log.exception(err_msg) | |
| 352 progress['err_msg'] = err_msg | |
| 353 # Add rule that allows communication between instances in the same | |
| 354 # SG | |
| 355 # A flag to indicate if group rule already exists | |
| 356 g_rule_exists = False | |
| 357 for rule in cmsg.rules: | |
| 358 for grant in rule.grants: | |
| 359 if grant.name == cmsg.name: | |
| 360 g_rule_exists = True | |
| 361 bioblend.log.debug( | |
| 362 "Group rule already exists in the SG.") | |
| 363 if g_rule_exists: | |
| 364 break | |
| 365 if not g_rule_exists: | |
| 366 try: | |
| 367 cmsg.authorize( | |
| 368 src_group=cmsg, | |
| 369 ip_protocol='tcp', | |
| 370 from_port=0, | |
| 371 to_port=65535) | |
| 372 except EC2ResponseError as e: | |
| 373 err_msg = "A problem with security group group " \ | |
| 374 "authorization: {0} (code {1}; status {2})" \ | |
| 375 .format(e.message, e.error_code, e.status) | |
| 376 bioblend.log.exception(err_msg) | |
| 377 progress['err_msg'] = err_msg | |
| 378 bioblend.log.info( | |
| 379 "Done configuring '%s' security group" % | |
| 380 cmsg.name) | |
| 381 else: | |
| 382 bioblend.log.warning( | |
| 383 "Did not create security group '{0}'".format(sg_name)) | |
| 384 return progress | |
| 385 | |
| 386 def rule_exists( | |
| 387 self, rules, from_port, to_port, ip_protocol='tcp', cidr_ip='0.0.0.0/0'): | |
| 388 """ | |
| 389 A convenience method to check if an authorization rule in a security group | |
| 390 already exists. | |
| 391 """ | |
| 392 for rule in rules: | |
| 393 if rule.ip_protocol == ip_protocol and rule.from_port == from_port and \ | |
| 394 rule.to_port == to_port and cidr_ip in [ip.cidr_ip for ip in rule.grants]: | |
| 395 return True | |
| 396 return False | |
| 397 | |
| 398 def create_key_pair(self, key_name='cloudman_key_pair'): | |
| 399 """ | |
| 400 If a key pair with the provided ``key_name`` does not exist, create it. | |
| 401 | |
| 402 :type sg_name: str | |
| 403 :param sg_name: A name for the key pair to be created. | |
| 404 | |
| 405 :rtype: dict | |
| 406 :return: A dictionary containing keys ``name`` (with the value being the | |
| 407 name of the key pair that was created), ``error`` | |
| 408 (with the value being the error message if there was an error | |
| 409 or ``None`` if no error was encountered), and ``material`` | |
| 410 (containing the unencrypted PEM encoded RSA private key if the | |
| 411 key was created or ``None`` if the key already eixsted). | |
| 412 | |
| 413 .. versionchanged:: 0.6.1 | |
| 414 The return value changed from a tuple to a dict | |
| 415 """ | |
| 416 progress = {'name': None, | |
| 417 'material': None, | |
| 418 'error': None} | |
| 419 kp = None | |
| 420 # Check if a key pair under the given name already exists. If it does not, | |
| 421 # create it, else return. | |
| 422 try: | |
| 423 kps = self.ec2_conn.get_all_key_pairs() | |
| 424 except EC2ResponseError as e: | |
| 425 err_msg = "Problem getting key pairs: {0} (code {1}; status {2})" \ | |
| 426 .format(e.message, e.error_code, e.status) | |
| 427 bioblend.log.exception(err_msg) | |
| 428 progress['error'] = err_msg | |
| 429 return progress | |
| 430 for akp in kps: | |
| 431 if akp.name == key_name: | |
| 432 bioblend.log.info( | |
| 433 "Key pair '%s' already exists; reusing it." % | |
| 434 key_name) | |
| 435 progress['name'] = akp.name | |
| 436 return progress | |
| 437 try: | |
| 438 kp = self.ec2_conn.create_key_pair(key_name) | |
| 439 except EC2ResponseError as e: | |
| 440 err_msg = "Problem creating key pair '{0}': {1} (code {2}; status {3})" \ | |
| 441 .format(key_name, e.message, e.error_code, e.status) | |
| 442 bioblend.log.exception(err_msg) | |
| 443 progress['error'] = err_msg | |
| 444 return progress | |
| 445 bioblend.log.info("Created key pair '%s'" % kp.name) | |
| 446 progress['name'] = kp.name | |
| 447 progress['material'] = kp.material | |
| 448 return progress | |
| 449 | |
| 450 def assign_floating_ip(self, ec2_conn, instance): | |
| 451 try: | |
| 452 bioblend.log.debug("Allocating a new floating IP address.") | |
| 453 address = ec2_conn.allocate_address() | |
| 454 except EC2ResponseError as e: | |
| 455 bioblend.log.exception("Exception allocating a new floating IP " | |
| 456 "address: %s" % e) | |
| 457 bioblend.log.info("Associating floating IP %s to instance %s" % | |
| 458 (address.public_ip, instance.id)) | |
| 459 ec2_conn.associate_address(instance_id=instance.id, | |
| 460 public_ip=address.public_ip) | |
| 461 | |
| 462 def get_status(self, instance_id): | |
| 463 """ | |
| 464 Check on the status of an instance. ``instance_id`` needs to be a | |
| 465 ``boto``-library copatible instance ID (e.g., ``i-8fehrdss``).If | |
| 466 ``instance_id`` is not provided, the ID obtained when launching | |
| 467 *the most recent* instance is used. Note that this assumes the instance | |
| 468 being checked on was launched using this class. Also note that the same | |
| 469 class may be used to launch multiple instances but only the most recent | |
| 470 ``instance_id`` is kept while any others will to be explicitly specified. | |
| 471 | |
| 472 This method also allows the required ``ec2_conn`` connection object to be | |
| 473 provided at invocation time. If the object is not provided, credentials | |
| 474 defined for the class are used (ability to specify a custom ``ec2_conn`` | |
| 475 helps in case of stateless method invocations). | |
| 476 | |
| 477 Return a ``state`` dict containing the following keys: ``instance_state``, | |
| 478 ``public_ip``, ``placement``, and ``error``, which capture CloudMan's | |
| 479 current state. For ``instance_state``, expected values are: ``pending``, | |
| 480 ``booting``, ``running``, or ``error`` and represent the state of the | |
| 481 underlying instance. Other keys will return an empty value until the | |
| 482 ``instance_state`` enters ``running`` state. | |
| 483 """ | |
| 484 ec2_conn = self.ec2_conn | |
| 485 rs = None | |
| 486 state = {'instance_state': "", | |
| 487 'public_ip': "", | |
| 488 'placement': "", | |
| 489 'error': ""} | |
| 490 | |
| 491 # Make sure we have an instance ID | |
| 492 if instance_id is None: | |
| 493 err = "Missing instance ID, cannot check the state." | |
| 494 bioblend.log.error(err) | |
| 495 state['error'] = err | |
| 496 return state | |
| 497 try: | |
| 498 rs = ec2_conn.get_all_instances([instance_id]) | |
| 499 if rs is not None: | |
| 500 inst_state = rs[0].instances[0].update() | |
| 501 public_ip = rs[0].instances[0].ip_address | |
| 502 state['public_ip'] = public_ip | |
| 503 if inst_state == 'running': | |
| 504 # if there's a private ip, but no public ip | |
| 505 # attempt auto allocation of floating IP | |
| 506 if rs[0].instances[0].private_ip_address and not public_ip: | |
| 507 self.assign_floating_ip(ec2_conn, rs[0].instances[0]) | |
| 508 cm_url = "http://{dns}/cloud".format(dns=public_ip) | |
| 509 # Wait until the CloudMan URL is accessible to return the | |
| 510 # data | |
| 511 if self._checkURL(cm_url) is True: | |
| 512 state['instance_state'] = inst_state | |
| 513 state['placement'] = rs[0].instances[0].placement | |
| 514 else: | |
| 515 state['instance_state'] = 'booting' | |
| 516 else: | |
| 517 state['instance_state'] = inst_state | |
| 518 except Exception as e: | |
| 519 err = "Problem updating instance '%s' state: %s" % (instance_id, e) | |
| 520 bioblend.log.error(err) | |
| 521 state['error'] = err | |
| 522 return state | |
| 523 | |
| 524 def get_clusters_pd(self, include_placement=True): | |
| 525 """ | |
| 526 Return *persistent data* of all existing clusters for this account. | |
| 527 | |
| 528 :type include_placement: bool | |
| 529 :param include_placement: Whether or not to include region placement for | |
| 530 the clusters. Setting this option will lead | |
| 531 to a longer function runtime. | |
| 532 | |
| 533 :rtype: dict | |
| 534 :return: A dictionary containing keys ``clusters`` and ``error``. The | |
| 535 value of ``clusters`` will be a dictionary with the following keys | |
| 536 ``cluster_name``, ``persistent_data``, ``bucket_name`` and optionally | |
| 537 ``placement`` or an empty list if no clusters were found or an | |
| 538 error was encountered. ``persistent_data`` key value is yet | |
| 539 another dictionary containing given cluster's persistent data. | |
| 540 The value for the ``error`` key will contain a string with the | |
| 541 error message. | |
| 542 | |
| 543 .. versionadded:: 0.3 | |
| 544 .. versionchanged:: 0.7.0 | |
| 545 The return value changed from a list to a dictionary. | |
| 546 """ | |
| 547 clusters = [] | |
| 548 response = {'clusters': clusters, 'error': None} | |
| 549 s3_conn = self.connect_s3(self.access_key, self.secret_key, self.cloud) | |
| 550 try: | |
| 551 buckets = s3_conn.get_all_buckets() | |
| 552 except S3ResponseError as e: | |
| 553 response['error'] = "S3ResponseError getting buckets: %s" % e | |
| 554 except self.http_exceptions as ex: | |
| 555 response['error'] = "Exception getting buckets: %s" % ex | |
| 556 if response['error']: | |
| 557 bioblend.log.exception(response['error']) | |
| 558 return response | |
| 559 for bucket in [b for b in buckets if b.name.startswith('cm-')]: | |
| 560 try: | |
| 561 # TODO: first lookup if persistent_data.yaml key exists | |
| 562 pd = bucket.get_key('persistent_data.yaml') | |
| 563 except S3ResponseError: | |
| 564 # This can fail for a number of reasons for non-us and/or | |
| 565 # CNAME'd buckets but it is not a terminal error | |
| 566 bioblend.log.warning("Problem fetching persistent_data.yaml " | |
| 567 "from bucket %s" % bucket) | |
| 568 continue | |
| 569 if pd: | |
| 570 # We are dealing with a CloudMan bucket | |
| 571 pd_contents = pd.get_contents_as_string() | |
| 572 pd = yaml.load(pd_contents) | |
| 573 if 'cluster_name' in pd: | |
| 574 cluster_name = pd['cluster_name'] | |
| 575 else: | |
| 576 for key in bucket.list(): | |
| 577 if key.name.endswith('.clusterName'): | |
| 578 cluster_name = key.name.split('.clusterName')[0] | |
| 579 cluster = {'cluster_name': cluster_name, | |
| 580 'persistent_data': pd, | |
| 581 'bucket_name': bucket.name} | |
| 582 # Look for cluster's placement too | |
| 583 if include_placement: | |
| 584 placement = self._find_placement(cluster_name, cluster) | |
| 585 cluster['placement'] = placement | |
| 586 clusters.append(cluster) | |
| 587 response['clusters'] = clusters | |
| 588 return response | |
| 589 | |
| 590 def get_cluster_pd(self, cluster_name): | |
| 591 """ | |
| 592 Return *persistent data* (as a dict) associated with a cluster with the | |
| 593 given ``cluster_name``. If a cluster with the given name is not found, | |
| 594 return an empty dict. | |
| 595 | |
| 596 .. versionadded:: 0.3 | |
| 597 """ | |
| 598 cluster = {} | |
| 599 clusters = self.get_clusters_pd().get('clusters', []) | |
| 600 for c in clusters: | |
| 601 if c['cluster_name'] == cluster_name: | |
| 602 cluster = c | |
| 603 break | |
| 604 return cluster | |
| 605 | |
| 606 def connect_ec2(self, a_key, s_key, cloud=None): | |
| 607 """ | |
| 608 Create and return an EC2-compatible connection object for the given cloud. | |
| 609 | |
| 610 See ``_get_cloud_info`` method for more details on the requirements for | |
| 611 the ``cloud`` parameter. If no value is provided, the class field is used. | |
| 612 """ | |
| 613 if cloud is None: | |
| 614 cloud = self.cloud | |
| 615 ci = self._get_cloud_info(cloud) | |
| 616 r = RegionInfo(name=ci['region_name'], endpoint=ci['region_endpoint']) | |
| 617 ec2_conn = boto.connect_ec2(aws_access_key_id=a_key, | |
| 618 aws_secret_access_key=s_key, | |
| 619 is_secure=ci['is_secure'], | |
| 620 region=r, | |
| 621 port=ci['ec2_port'], | |
| 622 path=ci['ec2_conn_path'], | |
| 623 validate_certs=False) | |
| 624 return ec2_conn | |
| 625 | |
| 626 def connect_s3(self, a_key, s_key, cloud=None): | |
| 627 """ | |
| 628 Create and return an S3-compatible connection object for the given cloud. | |
| 629 | |
| 630 See ``_get_cloud_info`` method for more details on the requirements for | |
| 631 the ``cloud`` parameter. If no value is provided, the class field is used. | |
| 632 """ | |
| 633 if cloud is None: | |
| 634 cloud = self.cloud | |
| 635 ci = self._get_cloud_info(cloud) | |
| 636 if ci['cloud_type'] == 'amazon': | |
| 637 calling_format = SubdomainCallingFormat() | |
| 638 else: | |
| 639 calling_format = OrdinaryCallingFormat() | |
| 640 s3_conn = S3Connection( | |
| 641 aws_access_key_id=a_key, aws_secret_access_key=s_key, | |
| 642 is_secure=ci['is_secure'], port=ci['s3_port'], host=ci['s3_host'], | |
| 643 path=ci['s3_conn_path'], calling_format=calling_format) | |
| 644 return s3_conn | |
| 645 | |
| 646 def connect_vpc(self, a_key, s_key, cloud=None): | |
| 647 """ | |
| 648 Establish a connection to the VPC service. | |
| 649 | |
| 650 TODO: Make this work with non-default clouds as well. | |
| 651 """ | |
| 652 if cloud is None: | |
| 653 cloud = self.cloud | |
| 654 ci = self._get_cloud_info(cloud) | |
| 655 r = RegionInfo(name=ci['region_name'], endpoint=ci['region_endpoint']) | |
| 656 vpc_conn = boto.connect_vpc( | |
| 657 aws_access_key_id=a_key, | |
| 658 aws_secret_access_key=s_key, | |
| 659 is_secure=ci['is_secure'], | |
| 660 region=r, | |
| 661 port=ci['ec2_port'], | |
| 662 path=ci['ec2_conn_path'], | |
| 663 validate_certs=False) | |
| 664 return vpc_conn | |
| 665 | |
| 666 def _compose_user_data(self, user_provided_data): | |
| 667 """ | |
| 668 A convenience method used to compose and properly format the user data | |
| 669 required when requesting an instance. | |
| 670 | |
| 671 ``user_provided_data`` is the data provided by a user required to identify | |
| 672 a cluster and user other user requirements. | |
| 673 """ | |
| 674 form_data = {} | |
| 675 # Do not include the following fields in the user data but do include | |
| 676 # any 'advanced startup fields' that might be added in the future | |
| 677 excluded_fields = ['sg_name', 'image_id', 'instance_id', 'kp_name', | |
| 678 'cloud', 'cloud_type', 'public_dns', 'cidr_range', | |
| 679 'kp_material', 'placement', 'flavor_id'] | |
| 680 for key, value in six.iteritems(user_provided_data): | |
| 681 if key not in excluded_fields: | |
| 682 form_data[key] = value | |
| 683 # If the following user data keys are empty, do not include them in the | |
| 684 # request user data | |
| 685 udkeys = [ | |
| 686 'post_start_script_url', | |
| 687 'worker_post_start_script_url', | |
| 688 'bucket_default', | |
| 689 'share_string'] | |
| 690 for udkey in udkeys: | |
| 691 if udkey in form_data and form_data[udkey] == '': | |
| 692 del form_data[udkey] | |
| 693 # If bucket_default was not provided, add a default value to the user data | |
| 694 # (missing value does not play nicely with CloudMan's ec2autorun.py) | |
| 695 if not form_data.get( | |
| 696 'bucket_default', None) and self.cloud.bucket_default: | |
| 697 form_data['bucket_default'] = self.cloud.bucket_default | |
| 698 # Reuse the ``password`` for the ``freenxpass`` user data option | |
| 699 if 'freenxpass' not in form_data and 'password' in form_data: | |
| 700 form_data['freenxpass'] = form_data['password'] | |
| 701 # Convert form_data into the YAML format | |
| 702 ud = yaml.dump(form_data, default_flow_style=False, allow_unicode=False) | |
| 703 # Also include connection info about the selected cloud | |
| 704 ci = self._get_cloud_info(self.cloud, as_str=True) | |
| 705 return ud + "\n" + ci | |
| 706 | |
| 707 def _get_cloud_info(self, cloud, as_str=False): | |
| 708 """ | |
| 709 Get connection information about a given cloud | |
| 710 """ | |
| 711 ci = {} | |
| 712 ci['cloud_type'] = cloud.cloud_type | |
| 713 ci['region_name'] = cloud.region_name | |
| 714 ci['region_endpoint'] = cloud.region_endpoint | |
| 715 ci['is_secure'] = cloud.is_secure | |
| 716 ci['ec2_port'] = cloud.ec2_port if cloud.ec2_port != '' else None | |
| 717 ci['ec2_conn_path'] = cloud.ec2_conn_path | |
| 718 # Include cidr_range only if not empty | |
| 719 if cloud.cidr_range != '': | |
| 720 ci['cidr_range'] = cloud.cidr_range | |
| 721 ci['s3_host'] = cloud.s3_host | |
| 722 ci['s3_port'] = cloud.s3_port if cloud.s3_port != '' else None | |
| 723 ci['s3_conn_path'] = cloud.s3_conn_path | |
| 724 if as_str: | |
| 725 ci = yaml.dump(ci, default_flow_style=False, allow_unicode=False) | |
| 726 return ci | |
| 727 | |
| 728 def _get_volume_placement(self, vol_id): | |
| 729 """ | |
| 730 Returns the placement of a volume (or None, if it cannot be determined) | |
| 731 """ | |
| 732 try: | |
| 733 vol = self.ec2_conn.get_all_volumes(volume_ids=[vol_id]) | |
| 734 except EC2ResponseError as ec2e: | |
| 735 bioblend.log.error("EC2ResponseError querying for volume {0}: {1}" | |
| 736 .format(vol_id, ec2e)) | |
| 737 vol = None | |
| 738 if vol: | |
| 739 return vol[0].zone | |
| 740 else: | |
| 741 bioblend.log.error( | |
| 742 "Requested placement of a volume '%s' that does not exist." % | |
| 743 vol_id) | |
| 744 return None | |
| 745 | |
| 746 def _find_placement(self, cluster_name, cluster=None): | |
| 747 """ | |
| 748 Find a placement zone for a cluster with the name ``cluster_name``. | |
| 749 | |
| 750 By default, this method will search for and fetch given cluster's | |
| 751 *persistent data*; alternatively, *persistent data* can be provided via | |
| 752 the ``cluster`` parameter. This dict needs to have ``persistent_data`` | |
| 753 key with the contents of cluster's *persistent data*. | |
| 754 If the cluster or the volume associated with the cluster cannot be found, | |
| 755 cluster placement is set to ``None``. | |
| 756 | |
| 757 :rtype: dict | |
| 758 :return: A dictionary with ``placement`` and ``error`` keywords. | |
| 759 | |
| 760 .. versionchanged:: 0.7.0 | |
| 761 The return value changed from a list to a dictionary. | |
| 762 """ | |
| 763 placement = None | |
| 764 response = {'placement': placement, 'error': None} | |
| 765 cluster = cluster or self.get_cluster_pd(cluster_name) | |
| 766 if cluster and 'persistent_data' in cluster: | |
| 767 pd = cluster['persistent_data'] | |
| 768 try: | |
| 769 if 'placement' in pd: | |
| 770 response['placement'] = pd['placement'] | |
| 771 elif 'data_filesystems' in pd: | |
| 772 # We have v1 format persistent data so get the volume first and | |
| 773 # then the placement zone | |
| 774 vol_id = pd['data_filesystems']['galaxyData'][0]['vol_id'] | |
| 775 response['placement'] = self._get_volume_placement(vol_id) | |
| 776 elif 'filesystems' in pd: | |
| 777 # V2 format. | |
| 778 for fs in [fs for fs in pd['filesystems'] if fs.get( | |
| 779 'kind', None) == 'volume' and 'ids' in fs]: | |
| 780 # All volumes must be in the same zone | |
| 781 vol_id = fs['ids'][0] | |
| 782 response['placement'] = self._get_volume_placement( | |
| 783 vol_id) | |
| 784 # No need to continue to iterate through | |
| 785 # filesystems, if we found one with a volume. | |
| 786 break | |
| 787 except Exception as exc: | |
| 788 response['error'] = ("Exception while finding placement for " | |
| 789 "cluster '{0}'. This can indicate malformed " | |
| 790 "instance data. Or that this method is " | |
| 791 "broken: {1}".format(cluster_name, exc)) | |
| 792 bioblend.log.error(response['error']) | |
| 793 response['placement'] = None | |
| 794 else: | |
| 795 bioblend.log.debug("Insufficient info about cluster {0} to get placement." | |
| 796 .format(cluster_name)) | |
| 797 return response | |
| 798 | |
| 799 def find_placements( | |
| 800 self, ec2_conn, instance_type, cloud_type, cluster_name=None): | |
| 801 """ | |
| 802 Find a list of placement zones that support the specified instance type. | |
| 803 | |
| 804 If ``cluster_name`` is given and a cluster with the given name exist, | |
| 805 return a list with only one entry where the given cluster lives. | |
| 806 | |
| 807 Searching for available zones for a given instance type is done by | |
| 808 checking the spot prices in the potential availability zones for | |
| 809 support before deciding on a region: | |
| 810 http://blog.piefox.com/2011/07/ec2-availability-zones-and-instance.html | |
| 811 | |
| 812 Note that, currently, instance-type based zone selection applies only to | |
| 813 AWS. For other clouds, all the available zones are returned (unless a | |
| 814 cluster is being recreated, in which case the cluster's placement zone is | |
| 815 returned sa stored in its persistent data. | |
| 816 | |
| 817 :rtype: dict | |
| 818 :return: A dictionary with ``zones`` and ``error`` keywords. | |
| 819 | |
| 820 .. versionchanged:: 0.3 | |
| 821 Changed method name from ``_find_placements`` to ``find_placements``. | |
| 822 Also added ``cluster_name`` parameter. | |
| 823 | |
| 824 .. versionchanged:: 0.7.0 | |
| 825 The return value changed from a list to a dictionary. | |
| 826 """ | |
| 827 # First look for a specific zone a given cluster is bound to | |
| 828 zones = [] | |
| 829 response = {'zones': zones, 'error': None} | |
| 830 if cluster_name: | |
| 831 placement = self._find_placement(cluster_name) | |
| 832 if placement.get('error'): | |
| 833 response['error'] = placement['error'] | |
| 834 return response | |
| 835 response['zones'] = placement.get('placement', []) | |
| 836 # If placement is not found, look for a list of available zones | |
| 837 if not response['zones']: | |
| 838 in_the_past = datetime.datetime.now() - datetime.timedelta(hours=1) | |
| 839 back_compatible_zone = "us-east-1e" | |
| 840 for zone in [ | |
| 841 z for z in ec2_conn.get_all_zones() if z.state == 'available']: | |
| 842 # Non EC2 clouds may not support get_spot_price_history | |
| 843 if instance_type is None or cloud_type != 'ec2': | |
| 844 zones.append(zone.name) | |
| 845 elif ec2_conn.get_spot_price_history(instance_type=instance_type, | |
| 846 end_time=in_the_past.isoformat(), | |
| 847 availability_zone=zone.name): | |
| 848 zones.append(zone.name) | |
| 849 # Higher-lettered zones seem to have more availability currently | |
| 850 zones.sort(reverse=True) | |
| 851 if back_compatible_zone in zones: | |
| 852 zones = [back_compatible_zone] + \ | |
| 853 [z for z in zones if z != back_compatible_zone] | |
| 854 if len(zones) == 0: | |
| 855 response['error'] = ("Did not find availabilty zone for {0}" | |
| 856 .format(instance_type)) | |
| 857 bioblend.log.error(response['error']) | |
| 858 zones.append(back_compatible_zone) | |
| 859 return response | |
| 860 | |
| 861 def _checkURL(self, url): | |
| 862 """ | |
| 863 Check if the ``url`` is *alive* (i.e., remote server returns code 200(OK) | |
| 864 or 401 (unauthorized)). | |
| 865 """ | |
| 866 try: | |
| 867 p = urlparse(url) | |
| 868 h = HTTPConnection(p[1]) | |
| 869 h.putrequest('HEAD', p[2]) | |
| 870 h.endheaders() | |
| 871 r = h.getresponse() | |
| 872 # CloudMan UI is pwd protected so include 401 | |
| 873 if r.status in (200, 401): | |
| 874 return True | |
| 875 except Exception: | |
| 876 # No response or no good response | |
| 877 pass | |
| 878 return False | 
