Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/boto/emr/connection.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/boto/emr/connection.py Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,765 @@ +# Copyright (c) 2010 Spotify AB +# Copyright (c) 2010-2011 Yelp +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, dis- +# tribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the fol- +# lowing conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +""" +Represents a connection to the EMR service +""" +import types + +import boto +import boto.utils +from boto.ec2.regioninfo import RegionInfo +from boto.emr.emrobject import AddInstanceGroupsResponse, BootstrapActionList, \ + Cluster, ClusterSummaryList, HadoopStep, \ + InstanceGroupList, InstanceList, JobFlow, \ + JobFlowStepList, \ + ModifyInstanceGroupsResponse, \ + RunJobFlowResponse, StepSummaryList +from boto.emr.step import JarStep +from boto.connection import AWSQueryConnection +from boto.exception import EmrResponseError +from boto.compat import six + + +class EmrConnection(AWSQueryConnection): + + APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31') + DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1') + DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint', + 'elasticmapreduce.us-east-1.amazonaws.com') + ResponseError = EmrResponseError + + + + # Constants for AWS Console debugging + DebuggingJar = 's3://{region_name}.elasticmapreduce/libs/script-runner/script-runner.jar' + DebuggingArgs = 's3://{region_name}.elasticmapreduce/libs/state-pusher/0.1/fetch' + + def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, + is_secure=True, port=None, proxy=None, proxy_port=None, + proxy_user=None, proxy_pass=None, debug=0, + https_connection_factory=None, region=None, path='/', + security_token=None, validate_certs=True, profile_name=None): + if not region: + region = RegionInfo(self, self.DefaultRegionName, + self.DefaultRegionEndpoint) + self.region = region + super(EmrConnection, self).__init__(aws_access_key_id, + aws_secret_access_key, + is_secure, port, proxy, proxy_port, + proxy_user, proxy_pass, + self.region.endpoint, debug, + https_connection_factory, path, + security_token, + validate_certs=validate_certs, + profile_name=profile_name) + # Many of the EMR hostnames are of the form: + # <region>.<service_name>.amazonaws.com + # rather than the more common: + # <service_name>.<region>.amazonaws.com + # so we need to explicitly set the region_name and service_name + # for the SigV4 signing. + self.auth_region_name = self.region.name + self.auth_service_name = 'elasticmapreduce' + + def _required_auth_capability(self): + return ['hmac-v4'] + + def describe_cluster(self, cluster_id): + """ + Describes an Elastic MapReduce cluster + + :type cluster_id: str + :param cluster_id: The cluster id of interest + """ + params = { + 'ClusterId': cluster_id + } + return self.get_object('DescribeCluster', params, Cluster) + + def describe_jobflow(self, jobflow_id): + """ + This method is deprecated. We recommend you use list_clusters, + describe_cluster, list_steps, list_instance_groups and + list_bootstrap_actions instead. + + Describes a single Elastic MapReduce job flow + + :type jobflow_id: str + :param jobflow_id: The job flow id of interest + """ + jobflows = self.describe_jobflows(jobflow_ids=[jobflow_id]) + if jobflows: + return jobflows[0] + + def describe_jobflows(self, states=None, jobflow_ids=None, + created_after=None, created_before=None): + """ + This method is deprecated. We recommend you use list_clusters, + describe_cluster, list_steps, list_instance_groups and + list_bootstrap_actions instead. + + Retrieve all the Elastic MapReduce job flows on your account + + :type states: list + :param states: A list of strings with job flow states wanted + + :type jobflow_ids: list + :param jobflow_ids: A list of job flow IDs + :type created_after: datetime + :param created_after: Bound on job flow creation time + + :type created_before: datetime + :param created_before: Bound on job flow creation time + """ + params = {} + + if states: + self.build_list_params(params, states, 'JobFlowStates.member') + if jobflow_ids: + self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') + if created_after: + params['CreatedAfter'] = created_after.strftime( + boto.utils.ISO8601) + if created_before: + params['CreatedBefore'] = created_before.strftime( + boto.utils.ISO8601) + + return self.get_list('DescribeJobFlows', params, [('member', JobFlow)]) + + def describe_step(self, cluster_id, step_id): + """ + Describe an Elastic MapReduce step + + :type cluster_id: str + :param cluster_id: The cluster id of interest + :type step_id: str + :param step_id: The step id of interest + """ + params = { + 'ClusterId': cluster_id, + 'StepId': step_id + } + + return self.get_object('DescribeStep', params, HadoopStep) + + def list_bootstrap_actions(self, cluster_id, marker=None): + """ + Get a list of bootstrap actions for an Elastic MapReduce cluster + + :type cluster_id: str + :param cluster_id: The cluster id of interest + :type marker: str + :param marker: Pagination marker + """ + params = { + 'ClusterId': cluster_id + } + + if marker: + params['Marker'] = marker + + return self.get_object('ListBootstrapActions', params, BootstrapActionList) + + def list_clusters(self, created_after=None, created_before=None, + cluster_states=None, marker=None): + """ + List Elastic MapReduce clusters with optional filtering + + :type created_after: datetime + :param created_after: Bound on cluster creation time + :type created_before: datetime + :param created_before: Bound on cluster creation time + :type cluster_states: list + :param cluster_states: Bound on cluster states + :type marker: str + :param marker: Pagination marker + """ + params = {} + if created_after: + params['CreatedAfter'] = created_after.strftime( + boto.utils.ISO8601) + if created_before: + params['CreatedBefore'] = created_before.strftime( + boto.utils.ISO8601) + if marker: + params['Marker'] = marker + + if cluster_states: + self.build_list_params(params, cluster_states, 'ClusterStates.member') + + return self.get_object('ListClusters', params, ClusterSummaryList) + + def list_instance_groups(self, cluster_id, marker=None): + """ + List EC2 instance groups in a cluster + + :type cluster_id: str + :param cluster_id: The cluster id of interest + :type marker: str + :param marker: Pagination marker + """ + params = { + 'ClusterId': cluster_id + } + + if marker: + params['Marker'] = marker + + return self.get_object('ListInstanceGroups', params, InstanceGroupList) + + def list_instances(self, cluster_id, instance_group_id=None, + instance_group_types=None, marker=None): + """ + List EC2 instances in a cluster + + :type cluster_id: str + :param cluster_id: The cluster id of interest + :type instance_group_id: str + :param instance_group_id: The EC2 instance group id of interest + :type instance_group_types: list + :param instance_group_types: Filter by EC2 instance group type + :type marker: str + :param marker: Pagination marker + """ + params = { + 'ClusterId': cluster_id + } + + if instance_group_id: + params['InstanceGroupId'] = instance_group_id + if marker: + params['Marker'] = marker + + if instance_group_types: + self.build_list_params(params, instance_group_types, + 'InstanceGroupTypes.member') + + return self.get_object('ListInstances', params, InstanceList) + + def list_steps(self, cluster_id, step_states=None, marker=None): + """ + List cluster steps + + :type cluster_id: str + :param cluster_id: The cluster id of interest + :type step_states: list + :param step_states: Filter by step states + :type marker: str + :param marker: Pagination marker + """ + params = { + 'ClusterId': cluster_id + } + + if marker: + params['Marker'] = marker + + if step_states: + self.build_list_params(params, step_states, 'StepStates.member') + + return self.get_object('ListSteps', params, StepSummaryList) + + def add_tags(self, resource_id, tags): + """ + Create new metadata tags for the specified resource id. + + :type resource_id: str + :param resource_id: The cluster id + + :type tags: dict + :param tags: A dictionary containing the name/value pairs. + If you want to create only a tag name, the + value for that tag should be the empty string + (e.g. '') or None. + """ + assert isinstance(resource_id, six.string_types) + params = { + 'ResourceId': resource_id, + } + params.update(self._build_tag_list(tags)) + return self.get_status('AddTags', params, verb='POST') + + def remove_tags(self, resource_id, tags): + """ + Remove metadata tags for the specified resource id. + + :type resource_id: str + :param resource_id: The cluster id + + :type tags: list + :param tags: A list of tag names to remove. + """ + params = { + 'ResourceId': resource_id, + } + params.update(self._build_string_list('TagKeys', tags)) + return self.get_status('RemoveTags', params, verb='POST') + + def terminate_jobflow(self, jobflow_id): + """ + Terminate an Elastic MapReduce job flow + + :type jobflow_id: str + :param jobflow_id: A jobflow id + """ + self.terminate_jobflows([jobflow_id]) + + def terminate_jobflows(self, jobflow_ids): + """ + Terminate an Elastic MapReduce job flow + + :type jobflow_ids: list + :param jobflow_ids: A list of job flow IDs + """ + params = {} + self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') + return self.get_status('TerminateJobFlows', params, verb='POST') + + def add_jobflow_steps(self, jobflow_id, steps): + """ + Adds steps to a jobflow + + :type jobflow_id: str + :param jobflow_id: The job flow id + :type steps: list(boto.emr.Step) + :param steps: A list of steps to add to the job + """ + if not isinstance(steps, list): + steps = [steps] + params = {} + params['JobFlowId'] = jobflow_id + + # Step args + step_args = [self._build_step_args(step) for step in steps] + params.update(self._build_step_list(step_args)) + + return self.get_object( + 'AddJobFlowSteps', params, JobFlowStepList, verb='POST') + + def add_instance_groups(self, jobflow_id, instance_groups): + """ + Adds instance groups to a running cluster. + + :type jobflow_id: str + :param jobflow_id: The id of the jobflow which will take the + new instance groups + + :type instance_groups: list(boto.emr.InstanceGroup) + :param instance_groups: A list of instance groups to add to the job + """ + if not isinstance(instance_groups, list): + instance_groups = [instance_groups] + params = {} + params['JobFlowId'] = jobflow_id + params.update(self._build_instance_group_list_args(instance_groups)) + + return self.get_object('AddInstanceGroups', params, + AddInstanceGroupsResponse, verb='POST') + + def modify_instance_groups(self, instance_group_ids, new_sizes): + """ + Modify the number of nodes and configuration settings in an + instance group. + + :type instance_group_ids: list(str) + :param instance_group_ids: A list of the ID's of the instance + groups to be modified + + :type new_sizes: list(int) + :param new_sizes: A list of the new sizes for each instance group + """ + if not isinstance(instance_group_ids, list): + instance_group_ids = [instance_group_ids] + if not isinstance(new_sizes, list): + new_sizes = [new_sizes] + + instance_groups = zip(instance_group_ids, new_sizes) + + params = {} + for k, ig in enumerate(instance_groups): + # could be wrong - the example amazon gives uses + # InstanceRequestCount, while the api documentation + # says InstanceCount + params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0] + params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1] + + return self.get_object('ModifyInstanceGroups', params, + ModifyInstanceGroupsResponse, verb='POST') + + def run_jobflow(self, name, log_uri=None, ec2_keyname=None, + availability_zone=None, + master_instance_type='m1.small', + slave_instance_type='m1.small', num_instances=1, + action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False, + enable_debugging=False, + hadoop_version=None, + steps=None, + bootstrap_actions=[], + instance_groups=None, + additional_info=None, + ami_version=None, + api_params=None, + visible_to_all_users=None, + job_flow_role=None, + service_role=None): + """ + Runs a job flow + :type name: str + :param name: Name of the job flow + + :type log_uri: str + :param log_uri: URI of the S3 bucket to place logs + + :type ec2_keyname: str + :param ec2_keyname: EC2 key used for the instances + + :type availability_zone: str + :param availability_zone: EC2 availability zone of the cluster + + :type master_instance_type: str + :param master_instance_type: EC2 instance type of the master + + :type slave_instance_type: str + :param slave_instance_type: EC2 instance type of the slave nodes + + :type num_instances: int + :param num_instances: Number of instances in the Hadoop cluster + + :type action_on_failure: str + :param action_on_failure: Action to take if a step terminates + + :type keep_alive: bool + :param keep_alive: Denotes whether the cluster should stay + alive upon completion + + :type enable_debugging: bool + :param enable_debugging: Denotes whether AWS console debugging + should be enabled. + + :type hadoop_version: str + :param hadoop_version: Version of Hadoop to use. This no longer + defaults to '0.20' and now uses the AMI default. + + :type steps: list(boto.emr.Step) + :param steps: List of steps to add with the job + + :type bootstrap_actions: list(boto.emr.BootstrapAction) + :param bootstrap_actions: List of bootstrap actions that run + before Hadoop starts. + + :type instance_groups: list(boto.emr.InstanceGroup) + :param instance_groups: Optional list of instance groups to + use when creating this job. + NB: When provided, this argument supersedes num_instances + and master/slave_instance_type. + + :type ami_version: str + :param ami_version: Amazon Machine Image (AMI) version to use + for instances. Values accepted by EMR are '1.0', '2.0', and + 'latest'; EMR currently defaults to '1.0' if you don't set + 'ami_version'. + + :type additional_info: JSON str + :param additional_info: A JSON string for selecting additional features + + :type api_params: dict + :param api_params: a dictionary of additional parameters to pass + directly to the EMR API (so you don't have to upgrade boto to + use new EMR features). You can also delete an API parameter + by setting it to None. + + :type visible_to_all_users: bool + :param visible_to_all_users: Whether the job flow is visible to all IAM + users of the AWS account associated with the job flow. If this + value is set to ``True``, all IAM users of that AWS + account can view and (if they have the proper policy permissions + set) manage the job flow. If it is set to ``False``, only + the IAM user that created the job flow can view and manage + it. + + :type job_flow_role: str + :param job_flow_role: An IAM role for the job flow. The EC2 + instances of the job flow assume this role. The default role is + ``EMRJobflowDefault``. In order to use the default role, + you must have already created it using the CLI. + + :type service_role: str + :param service_role: The IAM role that will be assumed by the Amazon + EMR service to access AWS resources on your behalf. + + :rtype: str + :return: The jobflow id + """ + steps = steps or [] + params = {} + if action_on_failure: + params['ActionOnFailure'] = action_on_failure + if log_uri: + params['LogUri'] = log_uri + params['Name'] = name + + # Common instance args + common_params = self._build_instance_common_args(ec2_keyname, + availability_zone, + keep_alive, + hadoop_version) + params.update(common_params) + + # NB: according to the AWS API's error message, we must + # "configure instances either using instance count, master and + # slave instance type or instance groups but not both." + # + # Thus we switch here on the truthiness of instance_groups. + if not instance_groups: + # Instance args (the common case) + instance_params = self._build_instance_count_and_type_args( + master_instance_type, + slave_instance_type, + num_instances) + params.update(instance_params) + else: + # Instance group args (for spot instances or a heterogenous cluster) + list_args = self._build_instance_group_list_args(instance_groups) + instance_params = dict( + ('Instances.%s' % k, v) for k, v in six.iteritems(list_args) + ) + params.update(instance_params) + + # Debugging step from EMR API docs + if enable_debugging: + debugging_step = JarStep(name='Setup Hadoop Debugging', + action_on_failure='TERMINATE_JOB_FLOW', + main_class=None, + jar=self.DebuggingJar.format(region_name=self.region.name), + step_args=self.DebuggingArgs.format(region_name=self.region.name)) + steps.insert(0, debugging_step) + + # Step args + if steps: + step_args = [self._build_step_args(step) for step in steps] + params.update(self._build_step_list(step_args)) + + if bootstrap_actions: + bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap_action) for bootstrap_action in bootstrap_actions] + params.update(self._build_bootstrap_action_list(bootstrap_action_args)) + + if ami_version: + params['AmiVersion'] = ami_version + + if additional_info is not None: + params['AdditionalInfo'] = additional_info + + if api_params: + for key, value in six.iteritems(api_params): + if value is None: + params.pop(key, None) + else: + params[key] = value + + if visible_to_all_users is not None: + if visible_to_all_users: + params['VisibleToAllUsers'] = 'true' + else: + params['VisibleToAllUsers'] = 'false' + + if job_flow_role is not None: + params['JobFlowRole'] = job_flow_role + + if service_role is not None: + params['ServiceRole'] = service_role + + response = self.get_object( + 'RunJobFlow', params, RunJobFlowResponse, verb='POST') + return response.jobflowid + + def set_termination_protection(self, jobflow_id, + termination_protection_status): + """ + Set termination protection on specified Elastic MapReduce job flows + + :type jobflow_ids: list or str + :param jobflow_ids: A list of job flow IDs + + :type termination_protection_status: bool + :param termination_protection_status: Termination protection status + """ + assert termination_protection_status in (True, False) + + params = {} + params['TerminationProtected'] = (termination_protection_status and "true") or "false" + self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') + + return self.get_status('SetTerminationProtection', params, verb='POST') + + def set_visible_to_all_users(self, jobflow_id, visibility): + """ + Set whether specified Elastic Map Reduce job flows are visible to all IAM users + + :type jobflow_ids: list or str + :param jobflow_ids: A list of job flow IDs + + :type visibility: bool + :param visibility: Visibility + """ + assert visibility in (True, False) + + params = {} + params['VisibleToAllUsers'] = (visibility and "true") or "false" + self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') + + return self.get_status('SetVisibleToAllUsers', params, verb='POST') + + def _build_bootstrap_action_args(self, bootstrap_action): + bootstrap_action_params = {} + bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path + + try: + bootstrap_action_params['Name'] = bootstrap_action.name + except AttributeError: + pass + + args = bootstrap_action.args() + if args: + self.build_list_params(bootstrap_action_params, args, 'ScriptBootstrapAction.Args.member') + + return bootstrap_action_params + + def _build_step_args(self, step): + step_params = {} + step_params['ActionOnFailure'] = step.action_on_failure + step_params['HadoopJarStep.Jar'] = step.jar() + + main_class = step.main_class() + if main_class: + step_params['HadoopJarStep.MainClass'] = main_class + + args = step.args() + if args: + self.build_list_params(step_params, args, 'HadoopJarStep.Args.member') + + step_params['Name'] = step.name + return step_params + + def _build_bootstrap_action_list(self, bootstrap_actions): + if not isinstance(bootstrap_actions, list): + bootstrap_actions = [bootstrap_actions] + + params = {} + for i, bootstrap_action in enumerate(bootstrap_actions): + for key, value in six.iteritems(bootstrap_action): + params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value + return params + + def _build_step_list(self, steps): + if not isinstance(steps, list): + steps = [steps] + + params = {} + for i, step in enumerate(steps): + for key, value in six.iteritems(step): + params['Steps.member.%s.%s' % (i+1, key)] = value + return params + + def _build_string_list(self, field, items): + if not isinstance(items, list): + items = [items] + + params = {} + for i, item in enumerate(items): + params['%s.member.%s' % (field, i + 1)] = item + return params + + def _build_tag_list(self, tags): + assert isinstance(tags, dict) + + params = {} + for i, key_value in enumerate(sorted(six.iteritems(tags)), start=1): + key, value = key_value + current_prefix = 'Tags.member.%s' % i + params['%s.Key' % current_prefix] = key + if value: + params['%s.Value' % current_prefix] = value + return params + + def _build_instance_common_args(self, ec2_keyname, availability_zone, + keep_alive, hadoop_version): + """ + Takes a number of parameters used when starting a jobflow (as + specified in run_jobflow() above). Returns a comparable dict for + use in making a RunJobFlow request. + """ + params = { + 'Instances.KeepJobFlowAliveWhenNoSteps': str(keep_alive).lower(), + } + + if hadoop_version: + params['Instances.HadoopVersion'] = hadoop_version + if ec2_keyname: + params['Instances.Ec2KeyName'] = ec2_keyname + if availability_zone: + params['Instances.Placement.AvailabilityZone'] = availability_zone + + return params + + def _build_instance_count_and_type_args(self, master_instance_type, + slave_instance_type, num_instances): + """ + Takes a master instance type (string), a slave instance type + (string), and a number of instances. Returns a comparable dict + for use in making a RunJobFlow request. + """ + params = {'Instances.MasterInstanceType': master_instance_type, + 'Instances.SlaveInstanceType': slave_instance_type, + 'Instances.InstanceCount': num_instances} + return params + + def _build_instance_group_args(self, instance_group): + """ + Takes an InstanceGroup; returns a dict that, when its keys are + properly prefixed, can be used for describing InstanceGroups in + RunJobFlow or AddInstanceGroups requests. + """ + params = {'InstanceCount': instance_group.num_instances, + 'InstanceRole': instance_group.role, + 'InstanceType': instance_group.type, + 'Name': instance_group.name, + 'Market': instance_group.market} + if instance_group.market == 'SPOT': + params['BidPrice'] = instance_group.bidprice + return params + + def _build_instance_group_list_args(self, instance_groups): + """ + Takes a list of InstanceGroups, or a single InstanceGroup. Returns + a comparable dict for use in making a RunJobFlow or AddInstanceGroups + request. + """ + if not isinstance(instance_groups, list): + instance_groups = [instance_groups] + + params = {} + for i, instance_group in enumerate(instance_groups): + ig_dict = self._build_instance_group_args(instance_group) + for key, value in six.iteritems(ig_dict): + params['InstanceGroups.member.%d.%s' % (i+1, key)] = value + return params