comparison env/lib/python3.7/site-packages/boto/emr/connection.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 # Copyright (c) 2010 Spotify AB
2 # Copyright (c) 2010-2011 Yelp
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol-
10 # lowing conditions:
11 #
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 # IN THE SOFTWARE.
22
23 """
24 Represents a connection to the EMR service
25 """
26 import types
27
28 import boto
29 import boto.utils
30 from boto.ec2.regioninfo import RegionInfo
31 from boto.emr.emrobject import AddInstanceGroupsResponse, BootstrapActionList, \
32 Cluster, ClusterSummaryList, HadoopStep, \
33 InstanceGroupList, InstanceList, JobFlow, \
34 JobFlowStepList, \
35 ModifyInstanceGroupsResponse, \
36 RunJobFlowResponse, StepSummaryList
37 from boto.emr.step import JarStep
38 from boto.connection import AWSQueryConnection
39 from boto.exception import EmrResponseError
40 from boto.compat import six
41
42
43 class EmrConnection(AWSQueryConnection):
44
45 APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31')
46 DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1')
47 DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint',
48 'elasticmapreduce.us-east-1.amazonaws.com')
49 ResponseError = EmrResponseError
50
51
52
53 # Constants for AWS Console debugging
54 DebuggingJar = 's3://{region_name}.elasticmapreduce/libs/script-runner/script-runner.jar'
55 DebuggingArgs = 's3://{region_name}.elasticmapreduce/libs/state-pusher/0.1/fetch'
56
57 def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
58 is_secure=True, port=None, proxy=None, proxy_port=None,
59 proxy_user=None, proxy_pass=None, debug=0,
60 https_connection_factory=None, region=None, path='/',
61 security_token=None, validate_certs=True, profile_name=None):
62 if not region:
63 region = RegionInfo(self, self.DefaultRegionName,
64 self.DefaultRegionEndpoint)
65 self.region = region
66 super(EmrConnection, self).__init__(aws_access_key_id,
67 aws_secret_access_key,
68 is_secure, port, proxy, proxy_port,
69 proxy_user, proxy_pass,
70 self.region.endpoint, debug,
71 https_connection_factory, path,
72 security_token,
73 validate_certs=validate_certs,
74 profile_name=profile_name)
75 # Many of the EMR hostnames are of the form:
76 # <region>.<service_name>.amazonaws.com
77 # rather than the more common:
78 # <service_name>.<region>.amazonaws.com
79 # so we need to explicitly set the region_name and service_name
80 # for the SigV4 signing.
81 self.auth_region_name = self.region.name
82 self.auth_service_name = 'elasticmapreduce'
83
84 def _required_auth_capability(self):
85 return ['hmac-v4']
86
87 def describe_cluster(self, cluster_id):
88 """
89 Describes an Elastic MapReduce cluster
90
91 :type cluster_id: str
92 :param cluster_id: The cluster id of interest
93 """
94 params = {
95 'ClusterId': cluster_id
96 }
97 return self.get_object('DescribeCluster', params, Cluster)
98
99 def describe_jobflow(self, jobflow_id):
100 """
101 This method is deprecated. We recommend you use list_clusters,
102 describe_cluster, list_steps, list_instance_groups and
103 list_bootstrap_actions instead.
104
105 Describes a single Elastic MapReduce job flow
106
107 :type jobflow_id: str
108 :param jobflow_id: The job flow id of interest
109 """
110 jobflows = self.describe_jobflows(jobflow_ids=[jobflow_id])
111 if jobflows:
112 return jobflows[0]
113
114 def describe_jobflows(self, states=None, jobflow_ids=None,
115 created_after=None, created_before=None):
116 """
117 This method is deprecated. We recommend you use list_clusters,
118 describe_cluster, list_steps, list_instance_groups and
119 list_bootstrap_actions instead.
120
121 Retrieve all the Elastic MapReduce job flows on your account
122
123 :type states: list
124 :param states: A list of strings with job flow states wanted
125
126 :type jobflow_ids: list
127 :param jobflow_ids: A list of job flow IDs
128 :type created_after: datetime
129 :param created_after: Bound on job flow creation time
130
131 :type created_before: datetime
132 :param created_before: Bound on job flow creation time
133 """
134 params = {}
135
136 if states:
137 self.build_list_params(params, states, 'JobFlowStates.member')
138 if jobflow_ids:
139 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
140 if created_after:
141 params['CreatedAfter'] = created_after.strftime(
142 boto.utils.ISO8601)
143 if created_before:
144 params['CreatedBefore'] = created_before.strftime(
145 boto.utils.ISO8601)
146
147 return self.get_list('DescribeJobFlows', params, [('member', JobFlow)])
148
149 def describe_step(self, cluster_id, step_id):
150 """
151 Describe an Elastic MapReduce step
152
153 :type cluster_id: str
154 :param cluster_id: The cluster id of interest
155 :type step_id: str
156 :param step_id: The step id of interest
157 """
158 params = {
159 'ClusterId': cluster_id,
160 'StepId': step_id
161 }
162
163 return self.get_object('DescribeStep', params, HadoopStep)
164
165 def list_bootstrap_actions(self, cluster_id, marker=None):
166 """
167 Get a list of bootstrap actions for an Elastic MapReduce cluster
168
169 :type cluster_id: str
170 :param cluster_id: The cluster id of interest
171 :type marker: str
172 :param marker: Pagination marker
173 """
174 params = {
175 'ClusterId': cluster_id
176 }
177
178 if marker:
179 params['Marker'] = marker
180
181 return self.get_object('ListBootstrapActions', params, BootstrapActionList)
182
183 def list_clusters(self, created_after=None, created_before=None,
184 cluster_states=None, marker=None):
185 """
186 List Elastic MapReduce clusters with optional filtering
187
188 :type created_after: datetime
189 :param created_after: Bound on cluster creation time
190 :type created_before: datetime
191 :param created_before: Bound on cluster creation time
192 :type cluster_states: list
193 :param cluster_states: Bound on cluster states
194 :type marker: str
195 :param marker: Pagination marker
196 """
197 params = {}
198 if created_after:
199 params['CreatedAfter'] = created_after.strftime(
200 boto.utils.ISO8601)
201 if created_before:
202 params['CreatedBefore'] = created_before.strftime(
203 boto.utils.ISO8601)
204 if marker:
205 params['Marker'] = marker
206
207 if cluster_states:
208 self.build_list_params(params, cluster_states, 'ClusterStates.member')
209
210 return self.get_object('ListClusters', params, ClusterSummaryList)
211
212 def list_instance_groups(self, cluster_id, marker=None):
213 """
214 List EC2 instance groups in a cluster
215
216 :type cluster_id: str
217 :param cluster_id: The cluster id of interest
218 :type marker: str
219 :param marker: Pagination marker
220 """
221 params = {
222 'ClusterId': cluster_id
223 }
224
225 if marker:
226 params['Marker'] = marker
227
228 return self.get_object('ListInstanceGroups', params, InstanceGroupList)
229
230 def list_instances(self, cluster_id, instance_group_id=None,
231 instance_group_types=None, marker=None):
232 """
233 List EC2 instances in a cluster
234
235 :type cluster_id: str
236 :param cluster_id: The cluster id of interest
237 :type instance_group_id: str
238 :param instance_group_id: The EC2 instance group id of interest
239 :type instance_group_types: list
240 :param instance_group_types: Filter by EC2 instance group type
241 :type marker: str
242 :param marker: Pagination marker
243 """
244 params = {
245 'ClusterId': cluster_id
246 }
247
248 if instance_group_id:
249 params['InstanceGroupId'] = instance_group_id
250 if marker:
251 params['Marker'] = marker
252
253 if instance_group_types:
254 self.build_list_params(params, instance_group_types,
255 'InstanceGroupTypes.member')
256
257 return self.get_object('ListInstances', params, InstanceList)
258
259 def list_steps(self, cluster_id, step_states=None, marker=None):
260 """
261 List cluster steps
262
263 :type cluster_id: str
264 :param cluster_id: The cluster id of interest
265 :type step_states: list
266 :param step_states: Filter by step states
267 :type marker: str
268 :param marker: Pagination marker
269 """
270 params = {
271 'ClusterId': cluster_id
272 }
273
274 if marker:
275 params['Marker'] = marker
276
277 if step_states:
278 self.build_list_params(params, step_states, 'StepStates.member')
279
280 return self.get_object('ListSteps', params, StepSummaryList)
281
282 def add_tags(self, resource_id, tags):
283 """
284 Create new metadata tags for the specified resource id.
285
286 :type resource_id: str
287 :param resource_id: The cluster id
288
289 :type tags: dict
290 :param tags: A dictionary containing the name/value pairs.
291 If you want to create only a tag name, the
292 value for that tag should be the empty string
293 (e.g. '') or None.
294 """
295 assert isinstance(resource_id, six.string_types)
296 params = {
297 'ResourceId': resource_id,
298 }
299 params.update(self._build_tag_list(tags))
300 return self.get_status('AddTags', params, verb='POST')
301
302 def remove_tags(self, resource_id, tags):
303 """
304 Remove metadata tags for the specified resource id.
305
306 :type resource_id: str
307 :param resource_id: The cluster id
308
309 :type tags: list
310 :param tags: A list of tag names to remove.
311 """
312 params = {
313 'ResourceId': resource_id,
314 }
315 params.update(self._build_string_list('TagKeys', tags))
316 return self.get_status('RemoveTags', params, verb='POST')
317
318 def terminate_jobflow(self, jobflow_id):
319 """
320 Terminate an Elastic MapReduce job flow
321
322 :type jobflow_id: str
323 :param jobflow_id: A jobflow id
324 """
325 self.terminate_jobflows([jobflow_id])
326
327 def terminate_jobflows(self, jobflow_ids):
328 """
329 Terminate an Elastic MapReduce job flow
330
331 :type jobflow_ids: list
332 :param jobflow_ids: A list of job flow IDs
333 """
334 params = {}
335 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
336 return self.get_status('TerminateJobFlows', params, verb='POST')
337
338 def add_jobflow_steps(self, jobflow_id, steps):
339 """
340 Adds steps to a jobflow
341
342 :type jobflow_id: str
343 :param jobflow_id: The job flow id
344 :type steps: list(boto.emr.Step)
345 :param steps: A list of steps to add to the job
346 """
347 if not isinstance(steps, list):
348 steps = [steps]
349 params = {}
350 params['JobFlowId'] = jobflow_id
351
352 # Step args
353 step_args = [self._build_step_args(step) for step in steps]
354 params.update(self._build_step_list(step_args))
355
356 return self.get_object(
357 'AddJobFlowSteps', params, JobFlowStepList, verb='POST')
358
359 def add_instance_groups(self, jobflow_id, instance_groups):
360 """
361 Adds instance groups to a running cluster.
362
363 :type jobflow_id: str
364 :param jobflow_id: The id of the jobflow which will take the
365 new instance groups
366
367 :type instance_groups: list(boto.emr.InstanceGroup)
368 :param instance_groups: A list of instance groups to add to the job
369 """
370 if not isinstance(instance_groups, list):
371 instance_groups = [instance_groups]
372 params = {}
373 params['JobFlowId'] = jobflow_id
374 params.update(self._build_instance_group_list_args(instance_groups))
375
376 return self.get_object('AddInstanceGroups', params,
377 AddInstanceGroupsResponse, verb='POST')
378
379 def modify_instance_groups(self, instance_group_ids, new_sizes):
380 """
381 Modify the number of nodes and configuration settings in an
382 instance group.
383
384 :type instance_group_ids: list(str)
385 :param instance_group_ids: A list of the ID's of the instance
386 groups to be modified
387
388 :type new_sizes: list(int)
389 :param new_sizes: A list of the new sizes for each instance group
390 """
391 if not isinstance(instance_group_ids, list):
392 instance_group_ids = [instance_group_ids]
393 if not isinstance(new_sizes, list):
394 new_sizes = [new_sizes]
395
396 instance_groups = zip(instance_group_ids, new_sizes)
397
398 params = {}
399 for k, ig in enumerate(instance_groups):
400 # could be wrong - the example amazon gives uses
401 # InstanceRequestCount, while the api documentation
402 # says InstanceCount
403 params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0]
404 params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1]
405
406 return self.get_object('ModifyInstanceGroups', params,
407 ModifyInstanceGroupsResponse, verb='POST')
408
409 def run_jobflow(self, name, log_uri=None, ec2_keyname=None,
410 availability_zone=None,
411 master_instance_type='m1.small',
412 slave_instance_type='m1.small', num_instances=1,
413 action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False,
414 enable_debugging=False,
415 hadoop_version=None,
416 steps=None,
417 bootstrap_actions=[],
418 instance_groups=None,
419 additional_info=None,
420 ami_version=None,
421 api_params=None,
422 visible_to_all_users=None,
423 job_flow_role=None,
424 service_role=None):
425 """
426 Runs a job flow
427 :type name: str
428 :param name: Name of the job flow
429
430 :type log_uri: str
431 :param log_uri: URI of the S3 bucket to place logs
432
433 :type ec2_keyname: str
434 :param ec2_keyname: EC2 key used for the instances
435
436 :type availability_zone: str
437 :param availability_zone: EC2 availability zone of the cluster
438
439 :type master_instance_type: str
440 :param master_instance_type: EC2 instance type of the master
441
442 :type slave_instance_type: str
443 :param slave_instance_type: EC2 instance type of the slave nodes
444
445 :type num_instances: int
446 :param num_instances: Number of instances in the Hadoop cluster
447
448 :type action_on_failure: str
449 :param action_on_failure: Action to take if a step terminates
450
451 :type keep_alive: bool
452 :param keep_alive: Denotes whether the cluster should stay
453 alive upon completion
454
455 :type enable_debugging: bool
456 :param enable_debugging: Denotes whether AWS console debugging
457 should be enabled.
458
459 :type hadoop_version: str
460 :param hadoop_version: Version of Hadoop to use. This no longer
461 defaults to '0.20' and now uses the AMI default.
462
463 :type steps: list(boto.emr.Step)
464 :param steps: List of steps to add with the job
465
466 :type bootstrap_actions: list(boto.emr.BootstrapAction)
467 :param bootstrap_actions: List of bootstrap actions that run
468 before Hadoop starts.
469
470 :type instance_groups: list(boto.emr.InstanceGroup)
471 :param instance_groups: Optional list of instance groups to
472 use when creating this job.
473 NB: When provided, this argument supersedes num_instances
474 and master/slave_instance_type.
475
476 :type ami_version: str
477 :param ami_version: Amazon Machine Image (AMI) version to use
478 for instances. Values accepted by EMR are '1.0', '2.0', and
479 'latest'; EMR currently defaults to '1.0' if you don't set
480 'ami_version'.
481
482 :type additional_info: JSON str
483 :param additional_info: A JSON string for selecting additional features
484
485 :type api_params: dict
486 :param api_params: a dictionary of additional parameters to pass
487 directly to the EMR API (so you don't have to upgrade boto to
488 use new EMR features). You can also delete an API parameter
489 by setting it to None.
490
491 :type visible_to_all_users: bool
492 :param visible_to_all_users: Whether the job flow is visible to all IAM
493 users of the AWS account associated with the job flow. If this
494 value is set to ``True``, all IAM users of that AWS
495 account can view and (if they have the proper policy permissions
496 set) manage the job flow. If it is set to ``False``, only
497 the IAM user that created the job flow can view and manage
498 it.
499
500 :type job_flow_role: str
501 :param job_flow_role: An IAM role for the job flow. The EC2
502 instances of the job flow assume this role. The default role is
503 ``EMRJobflowDefault``. In order to use the default role,
504 you must have already created it using the CLI.
505
506 :type service_role: str
507 :param service_role: The IAM role that will be assumed by the Amazon
508 EMR service to access AWS resources on your behalf.
509
510 :rtype: str
511 :return: The jobflow id
512 """
513 steps = steps or []
514 params = {}
515 if action_on_failure:
516 params['ActionOnFailure'] = action_on_failure
517 if log_uri:
518 params['LogUri'] = log_uri
519 params['Name'] = name
520
521 # Common instance args
522 common_params = self._build_instance_common_args(ec2_keyname,
523 availability_zone,
524 keep_alive,
525 hadoop_version)
526 params.update(common_params)
527
528 # NB: according to the AWS API's error message, we must
529 # "configure instances either using instance count, master and
530 # slave instance type or instance groups but not both."
531 #
532 # Thus we switch here on the truthiness of instance_groups.
533 if not instance_groups:
534 # Instance args (the common case)
535 instance_params = self._build_instance_count_and_type_args(
536 master_instance_type,
537 slave_instance_type,
538 num_instances)
539 params.update(instance_params)
540 else:
541 # Instance group args (for spot instances or a heterogenous cluster)
542 list_args = self._build_instance_group_list_args(instance_groups)
543 instance_params = dict(
544 ('Instances.%s' % k, v) for k, v in six.iteritems(list_args)
545 )
546 params.update(instance_params)
547
548 # Debugging step from EMR API docs
549 if enable_debugging:
550 debugging_step = JarStep(name='Setup Hadoop Debugging',
551 action_on_failure='TERMINATE_JOB_FLOW',
552 main_class=None,
553 jar=self.DebuggingJar.format(region_name=self.region.name),
554 step_args=self.DebuggingArgs.format(region_name=self.region.name))
555 steps.insert(0, debugging_step)
556
557 # Step args
558 if steps:
559 step_args = [self._build_step_args(step) for step in steps]
560 params.update(self._build_step_list(step_args))
561
562 if bootstrap_actions:
563 bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap_action) for bootstrap_action in bootstrap_actions]
564 params.update(self._build_bootstrap_action_list(bootstrap_action_args))
565
566 if ami_version:
567 params['AmiVersion'] = ami_version
568
569 if additional_info is not None:
570 params['AdditionalInfo'] = additional_info
571
572 if api_params:
573 for key, value in six.iteritems(api_params):
574 if value is None:
575 params.pop(key, None)
576 else:
577 params[key] = value
578
579 if visible_to_all_users is not None:
580 if visible_to_all_users:
581 params['VisibleToAllUsers'] = 'true'
582 else:
583 params['VisibleToAllUsers'] = 'false'
584
585 if job_flow_role is not None:
586 params['JobFlowRole'] = job_flow_role
587
588 if service_role is not None:
589 params['ServiceRole'] = service_role
590
591 response = self.get_object(
592 'RunJobFlow', params, RunJobFlowResponse, verb='POST')
593 return response.jobflowid
594
595 def set_termination_protection(self, jobflow_id,
596 termination_protection_status):
597 """
598 Set termination protection on specified Elastic MapReduce job flows
599
600 :type jobflow_ids: list or str
601 :param jobflow_ids: A list of job flow IDs
602
603 :type termination_protection_status: bool
604 :param termination_protection_status: Termination protection status
605 """
606 assert termination_protection_status in (True, False)
607
608 params = {}
609 params['TerminationProtected'] = (termination_protection_status and "true") or "false"
610 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member')
611
612 return self.get_status('SetTerminationProtection', params, verb='POST')
613
614 def set_visible_to_all_users(self, jobflow_id, visibility):
615 """
616 Set whether specified Elastic Map Reduce job flows are visible to all IAM users
617
618 :type jobflow_ids: list or str
619 :param jobflow_ids: A list of job flow IDs
620
621 :type visibility: bool
622 :param visibility: Visibility
623 """
624 assert visibility in (True, False)
625
626 params = {}
627 params['VisibleToAllUsers'] = (visibility and "true") or "false"
628 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member')
629
630 return self.get_status('SetVisibleToAllUsers', params, verb='POST')
631
632 def _build_bootstrap_action_args(self, bootstrap_action):
633 bootstrap_action_params = {}
634 bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path
635
636 try:
637 bootstrap_action_params['Name'] = bootstrap_action.name
638 except AttributeError:
639 pass
640
641 args = bootstrap_action.args()
642 if args:
643 self.build_list_params(bootstrap_action_params, args, 'ScriptBootstrapAction.Args.member')
644
645 return bootstrap_action_params
646
647 def _build_step_args(self, step):
648 step_params = {}
649 step_params['ActionOnFailure'] = step.action_on_failure
650 step_params['HadoopJarStep.Jar'] = step.jar()
651
652 main_class = step.main_class()
653 if main_class:
654 step_params['HadoopJarStep.MainClass'] = main_class
655
656 args = step.args()
657 if args:
658 self.build_list_params(step_params, args, 'HadoopJarStep.Args.member')
659
660 step_params['Name'] = step.name
661 return step_params
662
663 def _build_bootstrap_action_list(self, bootstrap_actions):
664 if not isinstance(bootstrap_actions, list):
665 bootstrap_actions = [bootstrap_actions]
666
667 params = {}
668 for i, bootstrap_action in enumerate(bootstrap_actions):
669 for key, value in six.iteritems(bootstrap_action):
670 params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value
671 return params
672
673 def _build_step_list(self, steps):
674 if not isinstance(steps, list):
675 steps = [steps]
676
677 params = {}
678 for i, step in enumerate(steps):
679 for key, value in six.iteritems(step):
680 params['Steps.member.%s.%s' % (i+1, key)] = value
681 return params
682
683 def _build_string_list(self, field, items):
684 if not isinstance(items, list):
685 items = [items]
686
687 params = {}
688 for i, item in enumerate(items):
689 params['%s.member.%s' % (field, i + 1)] = item
690 return params
691
692 def _build_tag_list(self, tags):
693 assert isinstance(tags, dict)
694
695 params = {}
696 for i, key_value in enumerate(sorted(six.iteritems(tags)), start=1):
697 key, value = key_value
698 current_prefix = 'Tags.member.%s' % i
699 params['%s.Key' % current_prefix] = key
700 if value:
701 params['%s.Value' % current_prefix] = value
702 return params
703
704 def _build_instance_common_args(self, ec2_keyname, availability_zone,
705 keep_alive, hadoop_version):
706 """
707 Takes a number of parameters used when starting a jobflow (as
708 specified in run_jobflow() above). Returns a comparable dict for
709 use in making a RunJobFlow request.
710 """
711 params = {
712 'Instances.KeepJobFlowAliveWhenNoSteps': str(keep_alive).lower(),
713 }
714
715 if hadoop_version:
716 params['Instances.HadoopVersion'] = hadoop_version
717 if ec2_keyname:
718 params['Instances.Ec2KeyName'] = ec2_keyname
719 if availability_zone:
720 params['Instances.Placement.AvailabilityZone'] = availability_zone
721
722 return params
723
724 def _build_instance_count_and_type_args(self, master_instance_type,
725 slave_instance_type, num_instances):
726 """
727 Takes a master instance type (string), a slave instance type
728 (string), and a number of instances. Returns a comparable dict
729 for use in making a RunJobFlow request.
730 """
731 params = {'Instances.MasterInstanceType': master_instance_type,
732 'Instances.SlaveInstanceType': slave_instance_type,
733 'Instances.InstanceCount': num_instances}
734 return params
735
736 def _build_instance_group_args(self, instance_group):
737 """
738 Takes an InstanceGroup; returns a dict that, when its keys are
739 properly prefixed, can be used for describing InstanceGroups in
740 RunJobFlow or AddInstanceGroups requests.
741 """
742 params = {'InstanceCount': instance_group.num_instances,
743 'InstanceRole': instance_group.role,
744 'InstanceType': instance_group.type,
745 'Name': instance_group.name,
746 'Market': instance_group.market}
747 if instance_group.market == 'SPOT':
748 params['BidPrice'] = instance_group.bidprice
749 return params
750
751 def _build_instance_group_list_args(self, instance_groups):
752 """
753 Takes a list of InstanceGroups, or a single InstanceGroup. Returns
754 a comparable dict for use in making a RunJobFlow or AddInstanceGroups
755 request.
756 """
757 if not isinstance(instance_groups, list):
758 instance_groups = [instance_groups]
759
760 params = {}
761 for i, instance_group in enumerate(instance_groups):
762 ig_dict = self._build_instance_group_args(instance_group)
763 for key, value in six.iteritems(ig_dict):
764 params['InstanceGroups.member.%d.%s' % (i+1, key)] = value
765 return params