Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/boto/emr/connection.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 # Copyright (c) 2010 Spotify AB | |
2 # Copyright (c) 2010-2011 Yelp | |
3 # | |
4 # Permission is hereby granted, free of charge, to any person obtaining a | |
5 # copy of this software and associated documentation files (the | |
6 # "Software"), to deal in the Software without restriction, including | |
7 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
8 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
9 # persons to whom the Software is furnished to do so, subject to the fol- | |
10 # lowing conditions: | |
11 # | |
12 # The above copyright notice and this permission notice shall be included | |
13 # in all copies or substantial portions of the Software. | |
14 # | |
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
21 # IN THE SOFTWARE. | |
22 | |
23 """ | |
24 Represents a connection to the EMR service | |
25 """ | |
26 import types | |
27 | |
28 import boto | |
29 import boto.utils | |
30 from boto.ec2.regioninfo import RegionInfo | |
31 from boto.emr.emrobject import AddInstanceGroupsResponse, BootstrapActionList, \ | |
32 Cluster, ClusterSummaryList, HadoopStep, \ | |
33 InstanceGroupList, InstanceList, JobFlow, \ | |
34 JobFlowStepList, \ | |
35 ModifyInstanceGroupsResponse, \ | |
36 RunJobFlowResponse, StepSummaryList | |
37 from boto.emr.step import JarStep | |
38 from boto.connection import AWSQueryConnection | |
39 from boto.exception import EmrResponseError | |
40 from boto.compat import six | |
41 | |
42 | |
43 class EmrConnection(AWSQueryConnection): | |
44 | |
45 APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31') | |
46 DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1') | |
47 DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint', | |
48 'elasticmapreduce.us-east-1.amazonaws.com') | |
49 ResponseError = EmrResponseError | |
50 | |
51 | |
52 | |
53 # Constants for AWS Console debugging | |
54 DebuggingJar = 's3://{region_name}.elasticmapreduce/libs/script-runner/script-runner.jar' | |
55 DebuggingArgs = 's3://{region_name}.elasticmapreduce/libs/state-pusher/0.1/fetch' | |
56 | |
57 def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, | |
58 is_secure=True, port=None, proxy=None, proxy_port=None, | |
59 proxy_user=None, proxy_pass=None, debug=0, | |
60 https_connection_factory=None, region=None, path='/', | |
61 security_token=None, validate_certs=True, profile_name=None): | |
62 if not region: | |
63 region = RegionInfo(self, self.DefaultRegionName, | |
64 self.DefaultRegionEndpoint) | |
65 self.region = region | |
66 super(EmrConnection, self).__init__(aws_access_key_id, | |
67 aws_secret_access_key, | |
68 is_secure, port, proxy, proxy_port, | |
69 proxy_user, proxy_pass, | |
70 self.region.endpoint, debug, | |
71 https_connection_factory, path, | |
72 security_token, | |
73 validate_certs=validate_certs, | |
74 profile_name=profile_name) | |
75 # Many of the EMR hostnames are of the form: | |
76 # <region>.<service_name>.amazonaws.com | |
77 # rather than the more common: | |
78 # <service_name>.<region>.amazonaws.com | |
79 # so we need to explicitly set the region_name and service_name | |
80 # for the SigV4 signing. | |
81 self.auth_region_name = self.region.name | |
82 self.auth_service_name = 'elasticmapreduce' | |
83 | |
84 def _required_auth_capability(self): | |
85 return ['hmac-v4'] | |
86 | |
87 def describe_cluster(self, cluster_id): | |
88 """ | |
89 Describes an Elastic MapReduce cluster | |
90 | |
91 :type cluster_id: str | |
92 :param cluster_id: The cluster id of interest | |
93 """ | |
94 params = { | |
95 'ClusterId': cluster_id | |
96 } | |
97 return self.get_object('DescribeCluster', params, Cluster) | |
98 | |
99 def describe_jobflow(self, jobflow_id): | |
100 """ | |
101 This method is deprecated. We recommend you use list_clusters, | |
102 describe_cluster, list_steps, list_instance_groups and | |
103 list_bootstrap_actions instead. | |
104 | |
105 Describes a single Elastic MapReduce job flow | |
106 | |
107 :type jobflow_id: str | |
108 :param jobflow_id: The job flow id of interest | |
109 """ | |
110 jobflows = self.describe_jobflows(jobflow_ids=[jobflow_id]) | |
111 if jobflows: | |
112 return jobflows[0] | |
113 | |
114 def describe_jobflows(self, states=None, jobflow_ids=None, | |
115 created_after=None, created_before=None): | |
116 """ | |
117 This method is deprecated. We recommend you use list_clusters, | |
118 describe_cluster, list_steps, list_instance_groups and | |
119 list_bootstrap_actions instead. | |
120 | |
121 Retrieve all the Elastic MapReduce job flows on your account | |
122 | |
123 :type states: list | |
124 :param states: A list of strings with job flow states wanted | |
125 | |
126 :type jobflow_ids: list | |
127 :param jobflow_ids: A list of job flow IDs | |
128 :type created_after: datetime | |
129 :param created_after: Bound on job flow creation time | |
130 | |
131 :type created_before: datetime | |
132 :param created_before: Bound on job flow creation time | |
133 """ | |
134 params = {} | |
135 | |
136 if states: | |
137 self.build_list_params(params, states, 'JobFlowStates.member') | |
138 if jobflow_ids: | |
139 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') | |
140 if created_after: | |
141 params['CreatedAfter'] = created_after.strftime( | |
142 boto.utils.ISO8601) | |
143 if created_before: | |
144 params['CreatedBefore'] = created_before.strftime( | |
145 boto.utils.ISO8601) | |
146 | |
147 return self.get_list('DescribeJobFlows', params, [('member', JobFlow)]) | |
148 | |
149 def describe_step(self, cluster_id, step_id): | |
150 """ | |
151 Describe an Elastic MapReduce step | |
152 | |
153 :type cluster_id: str | |
154 :param cluster_id: The cluster id of interest | |
155 :type step_id: str | |
156 :param step_id: The step id of interest | |
157 """ | |
158 params = { | |
159 'ClusterId': cluster_id, | |
160 'StepId': step_id | |
161 } | |
162 | |
163 return self.get_object('DescribeStep', params, HadoopStep) | |
164 | |
165 def list_bootstrap_actions(self, cluster_id, marker=None): | |
166 """ | |
167 Get a list of bootstrap actions for an Elastic MapReduce cluster | |
168 | |
169 :type cluster_id: str | |
170 :param cluster_id: The cluster id of interest | |
171 :type marker: str | |
172 :param marker: Pagination marker | |
173 """ | |
174 params = { | |
175 'ClusterId': cluster_id | |
176 } | |
177 | |
178 if marker: | |
179 params['Marker'] = marker | |
180 | |
181 return self.get_object('ListBootstrapActions', params, BootstrapActionList) | |
182 | |
183 def list_clusters(self, created_after=None, created_before=None, | |
184 cluster_states=None, marker=None): | |
185 """ | |
186 List Elastic MapReduce clusters with optional filtering | |
187 | |
188 :type created_after: datetime | |
189 :param created_after: Bound on cluster creation time | |
190 :type created_before: datetime | |
191 :param created_before: Bound on cluster creation time | |
192 :type cluster_states: list | |
193 :param cluster_states: Bound on cluster states | |
194 :type marker: str | |
195 :param marker: Pagination marker | |
196 """ | |
197 params = {} | |
198 if created_after: | |
199 params['CreatedAfter'] = created_after.strftime( | |
200 boto.utils.ISO8601) | |
201 if created_before: | |
202 params['CreatedBefore'] = created_before.strftime( | |
203 boto.utils.ISO8601) | |
204 if marker: | |
205 params['Marker'] = marker | |
206 | |
207 if cluster_states: | |
208 self.build_list_params(params, cluster_states, 'ClusterStates.member') | |
209 | |
210 return self.get_object('ListClusters', params, ClusterSummaryList) | |
211 | |
212 def list_instance_groups(self, cluster_id, marker=None): | |
213 """ | |
214 List EC2 instance groups in a cluster | |
215 | |
216 :type cluster_id: str | |
217 :param cluster_id: The cluster id of interest | |
218 :type marker: str | |
219 :param marker: Pagination marker | |
220 """ | |
221 params = { | |
222 'ClusterId': cluster_id | |
223 } | |
224 | |
225 if marker: | |
226 params['Marker'] = marker | |
227 | |
228 return self.get_object('ListInstanceGroups', params, InstanceGroupList) | |
229 | |
230 def list_instances(self, cluster_id, instance_group_id=None, | |
231 instance_group_types=None, marker=None): | |
232 """ | |
233 List EC2 instances in a cluster | |
234 | |
235 :type cluster_id: str | |
236 :param cluster_id: The cluster id of interest | |
237 :type instance_group_id: str | |
238 :param instance_group_id: The EC2 instance group id of interest | |
239 :type instance_group_types: list | |
240 :param instance_group_types: Filter by EC2 instance group type | |
241 :type marker: str | |
242 :param marker: Pagination marker | |
243 """ | |
244 params = { | |
245 'ClusterId': cluster_id | |
246 } | |
247 | |
248 if instance_group_id: | |
249 params['InstanceGroupId'] = instance_group_id | |
250 if marker: | |
251 params['Marker'] = marker | |
252 | |
253 if instance_group_types: | |
254 self.build_list_params(params, instance_group_types, | |
255 'InstanceGroupTypes.member') | |
256 | |
257 return self.get_object('ListInstances', params, InstanceList) | |
258 | |
259 def list_steps(self, cluster_id, step_states=None, marker=None): | |
260 """ | |
261 List cluster steps | |
262 | |
263 :type cluster_id: str | |
264 :param cluster_id: The cluster id of interest | |
265 :type step_states: list | |
266 :param step_states: Filter by step states | |
267 :type marker: str | |
268 :param marker: Pagination marker | |
269 """ | |
270 params = { | |
271 'ClusterId': cluster_id | |
272 } | |
273 | |
274 if marker: | |
275 params['Marker'] = marker | |
276 | |
277 if step_states: | |
278 self.build_list_params(params, step_states, 'StepStates.member') | |
279 | |
280 return self.get_object('ListSteps', params, StepSummaryList) | |
281 | |
282 def add_tags(self, resource_id, tags): | |
283 """ | |
284 Create new metadata tags for the specified resource id. | |
285 | |
286 :type resource_id: str | |
287 :param resource_id: The cluster id | |
288 | |
289 :type tags: dict | |
290 :param tags: A dictionary containing the name/value pairs. | |
291 If you want to create only a tag name, the | |
292 value for that tag should be the empty string | |
293 (e.g. '') or None. | |
294 """ | |
295 assert isinstance(resource_id, six.string_types) | |
296 params = { | |
297 'ResourceId': resource_id, | |
298 } | |
299 params.update(self._build_tag_list(tags)) | |
300 return self.get_status('AddTags', params, verb='POST') | |
301 | |
302 def remove_tags(self, resource_id, tags): | |
303 """ | |
304 Remove metadata tags for the specified resource id. | |
305 | |
306 :type resource_id: str | |
307 :param resource_id: The cluster id | |
308 | |
309 :type tags: list | |
310 :param tags: A list of tag names to remove. | |
311 """ | |
312 params = { | |
313 'ResourceId': resource_id, | |
314 } | |
315 params.update(self._build_string_list('TagKeys', tags)) | |
316 return self.get_status('RemoveTags', params, verb='POST') | |
317 | |
318 def terminate_jobflow(self, jobflow_id): | |
319 """ | |
320 Terminate an Elastic MapReduce job flow | |
321 | |
322 :type jobflow_id: str | |
323 :param jobflow_id: A jobflow id | |
324 """ | |
325 self.terminate_jobflows([jobflow_id]) | |
326 | |
327 def terminate_jobflows(self, jobflow_ids): | |
328 """ | |
329 Terminate an Elastic MapReduce job flow | |
330 | |
331 :type jobflow_ids: list | |
332 :param jobflow_ids: A list of job flow IDs | |
333 """ | |
334 params = {} | |
335 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') | |
336 return self.get_status('TerminateJobFlows', params, verb='POST') | |
337 | |
338 def add_jobflow_steps(self, jobflow_id, steps): | |
339 """ | |
340 Adds steps to a jobflow | |
341 | |
342 :type jobflow_id: str | |
343 :param jobflow_id: The job flow id | |
344 :type steps: list(boto.emr.Step) | |
345 :param steps: A list of steps to add to the job | |
346 """ | |
347 if not isinstance(steps, list): | |
348 steps = [steps] | |
349 params = {} | |
350 params['JobFlowId'] = jobflow_id | |
351 | |
352 # Step args | |
353 step_args = [self._build_step_args(step) for step in steps] | |
354 params.update(self._build_step_list(step_args)) | |
355 | |
356 return self.get_object( | |
357 'AddJobFlowSteps', params, JobFlowStepList, verb='POST') | |
358 | |
359 def add_instance_groups(self, jobflow_id, instance_groups): | |
360 """ | |
361 Adds instance groups to a running cluster. | |
362 | |
363 :type jobflow_id: str | |
364 :param jobflow_id: The id of the jobflow which will take the | |
365 new instance groups | |
366 | |
367 :type instance_groups: list(boto.emr.InstanceGroup) | |
368 :param instance_groups: A list of instance groups to add to the job | |
369 """ | |
370 if not isinstance(instance_groups, list): | |
371 instance_groups = [instance_groups] | |
372 params = {} | |
373 params['JobFlowId'] = jobflow_id | |
374 params.update(self._build_instance_group_list_args(instance_groups)) | |
375 | |
376 return self.get_object('AddInstanceGroups', params, | |
377 AddInstanceGroupsResponse, verb='POST') | |
378 | |
379 def modify_instance_groups(self, instance_group_ids, new_sizes): | |
380 """ | |
381 Modify the number of nodes and configuration settings in an | |
382 instance group. | |
383 | |
384 :type instance_group_ids: list(str) | |
385 :param instance_group_ids: A list of the ID's of the instance | |
386 groups to be modified | |
387 | |
388 :type new_sizes: list(int) | |
389 :param new_sizes: A list of the new sizes for each instance group | |
390 """ | |
391 if not isinstance(instance_group_ids, list): | |
392 instance_group_ids = [instance_group_ids] | |
393 if not isinstance(new_sizes, list): | |
394 new_sizes = [new_sizes] | |
395 | |
396 instance_groups = zip(instance_group_ids, new_sizes) | |
397 | |
398 params = {} | |
399 for k, ig in enumerate(instance_groups): | |
400 # could be wrong - the example amazon gives uses | |
401 # InstanceRequestCount, while the api documentation | |
402 # says InstanceCount | |
403 params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0] | |
404 params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1] | |
405 | |
406 return self.get_object('ModifyInstanceGroups', params, | |
407 ModifyInstanceGroupsResponse, verb='POST') | |
408 | |
409 def run_jobflow(self, name, log_uri=None, ec2_keyname=None, | |
410 availability_zone=None, | |
411 master_instance_type='m1.small', | |
412 slave_instance_type='m1.small', num_instances=1, | |
413 action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False, | |
414 enable_debugging=False, | |
415 hadoop_version=None, | |
416 steps=None, | |
417 bootstrap_actions=[], | |
418 instance_groups=None, | |
419 additional_info=None, | |
420 ami_version=None, | |
421 api_params=None, | |
422 visible_to_all_users=None, | |
423 job_flow_role=None, | |
424 service_role=None): | |
425 """ | |
426 Runs a job flow | |
427 :type name: str | |
428 :param name: Name of the job flow | |
429 | |
430 :type log_uri: str | |
431 :param log_uri: URI of the S3 bucket to place logs | |
432 | |
433 :type ec2_keyname: str | |
434 :param ec2_keyname: EC2 key used for the instances | |
435 | |
436 :type availability_zone: str | |
437 :param availability_zone: EC2 availability zone of the cluster | |
438 | |
439 :type master_instance_type: str | |
440 :param master_instance_type: EC2 instance type of the master | |
441 | |
442 :type slave_instance_type: str | |
443 :param slave_instance_type: EC2 instance type of the slave nodes | |
444 | |
445 :type num_instances: int | |
446 :param num_instances: Number of instances in the Hadoop cluster | |
447 | |
448 :type action_on_failure: str | |
449 :param action_on_failure: Action to take if a step terminates | |
450 | |
451 :type keep_alive: bool | |
452 :param keep_alive: Denotes whether the cluster should stay | |
453 alive upon completion | |
454 | |
455 :type enable_debugging: bool | |
456 :param enable_debugging: Denotes whether AWS console debugging | |
457 should be enabled. | |
458 | |
459 :type hadoop_version: str | |
460 :param hadoop_version: Version of Hadoop to use. This no longer | |
461 defaults to '0.20' and now uses the AMI default. | |
462 | |
463 :type steps: list(boto.emr.Step) | |
464 :param steps: List of steps to add with the job | |
465 | |
466 :type bootstrap_actions: list(boto.emr.BootstrapAction) | |
467 :param bootstrap_actions: List of bootstrap actions that run | |
468 before Hadoop starts. | |
469 | |
470 :type instance_groups: list(boto.emr.InstanceGroup) | |
471 :param instance_groups: Optional list of instance groups to | |
472 use when creating this job. | |
473 NB: When provided, this argument supersedes num_instances | |
474 and master/slave_instance_type. | |
475 | |
476 :type ami_version: str | |
477 :param ami_version: Amazon Machine Image (AMI) version to use | |
478 for instances. Values accepted by EMR are '1.0', '2.0', and | |
479 'latest'; EMR currently defaults to '1.0' if you don't set | |
480 'ami_version'. | |
481 | |
482 :type additional_info: JSON str | |
483 :param additional_info: A JSON string for selecting additional features | |
484 | |
485 :type api_params: dict | |
486 :param api_params: a dictionary of additional parameters to pass | |
487 directly to the EMR API (so you don't have to upgrade boto to | |
488 use new EMR features). You can also delete an API parameter | |
489 by setting it to None. | |
490 | |
491 :type visible_to_all_users: bool | |
492 :param visible_to_all_users: Whether the job flow is visible to all IAM | |
493 users of the AWS account associated with the job flow. If this | |
494 value is set to ``True``, all IAM users of that AWS | |
495 account can view and (if they have the proper policy permissions | |
496 set) manage the job flow. If it is set to ``False``, only | |
497 the IAM user that created the job flow can view and manage | |
498 it. | |
499 | |
500 :type job_flow_role: str | |
501 :param job_flow_role: An IAM role for the job flow. The EC2 | |
502 instances of the job flow assume this role. The default role is | |
503 ``EMRJobflowDefault``. In order to use the default role, | |
504 you must have already created it using the CLI. | |
505 | |
506 :type service_role: str | |
507 :param service_role: The IAM role that will be assumed by the Amazon | |
508 EMR service to access AWS resources on your behalf. | |
509 | |
510 :rtype: str | |
511 :return: The jobflow id | |
512 """ | |
513 steps = steps or [] | |
514 params = {} | |
515 if action_on_failure: | |
516 params['ActionOnFailure'] = action_on_failure | |
517 if log_uri: | |
518 params['LogUri'] = log_uri | |
519 params['Name'] = name | |
520 | |
521 # Common instance args | |
522 common_params = self._build_instance_common_args(ec2_keyname, | |
523 availability_zone, | |
524 keep_alive, | |
525 hadoop_version) | |
526 params.update(common_params) | |
527 | |
528 # NB: according to the AWS API's error message, we must | |
529 # "configure instances either using instance count, master and | |
530 # slave instance type or instance groups but not both." | |
531 # | |
532 # Thus we switch here on the truthiness of instance_groups. | |
533 if not instance_groups: | |
534 # Instance args (the common case) | |
535 instance_params = self._build_instance_count_and_type_args( | |
536 master_instance_type, | |
537 slave_instance_type, | |
538 num_instances) | |
539 params.update(instance_params) | |
540 else: | |
541 # Instance group args (for spot instances or a heterogenous cluster) | |
542 list_args = self._build_instance_group_list_args(instance_groups) | |
543 instance_params = dict( | |
544 ('Instances.%s' % k, v) for k, v in six.iteritems(list_args) | |
545 ) | |
546 params.update(instance_params) | |
547 | |
548 # Debugging step from EMR API docs | |
549 if enable_debugging: | |
550 debugging_step = JarStep(name='Setup Hadoop Debugging', | |
551 action_on_failure='TERMINATE_JOB_FLOW', | |
552 main_class=None, | |
553 jar=self.DebuggingJar.format(region_name=self.region.name), | |
554 step_args=self.DebuggingArgs.format(region_name=self.region.name)) | |
555 steps.insert(0, debugging_step) | |
556 | |
557 # Step args | |
558 if steps: | |
559 step_args = [self._build_step_args(step) for step in steps] | |
560 params.update(self._build_step_list(step_args)) | |
561 | |
562 if bootstrap_actions: | |
563 bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap_action) for bootstrap_action in bootstrap_actions] | |
564 params.update(self._build_bootstrap_action_list(bootstrap_action_args)) | |
565 | |
566 if ami_version: | |
567 params['AmiVersion'] = ami_version | |
568 | |
569 if additional_info is not None: | |
570 params['AdditionalInfo'] = additional_info | |
571 | |
572 if api_params: | |
573 for key, value in six.iteritems(api_params): | |
574 if value is None: | |
575 params.pop(key, None) | |
576 else: | |
577 params[key] = value | |
578 | |
579 if visible_to_all_users is not None: | |
580 if visible_to_all_users: | |
581 params['VisibleToAllUsers'] = 'true' | |
582 else: | |
583 params['VisibleToAllUsers'] = 'false' | |
584 | |
585 if job_flow_role is not None: | |
586 params['JobFlowRole'] = job_flow_role | |
587 | |
588 if service_role is not None: | |
589 params['ServiceRole'] = service_role | |
590 | |
591 response = self.get_object( | |
592 'RunJobFlow', params, RunJobFlowResponse, verb='POST') | |
593 return response.jobflowid | |
594 | |
595 def set_termination_protection(self, jobflow_id, | |
596 termination_protection_status): | |
597 """ | |
598 Set termination protection on specified Elastic MapReduce job flows | |
599 | |
600 :type jobflow_ids: list or str | |
601 :param jobflow_ids: A list of job flow IDs | |
602 | |
603 :type termination_protection_status: bool | |
604 :param termination_protection_status: Termination protection status | |
605 """ | |
606 assert termination_protection_status in (True, False) | |
607 | |
608 params = {} | |
609 params['TerminationProtected'] = (termination_protection_status and "true") or "false" | |
610 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') | |
611 | |
612 return self.get_status('SetTerminationProtection', params, verb='POST') | |
613 | |
614 def set_visible_to_all_users(self, jobflow_id, visibility): | |
615 """ | |
616 Set whether specified Elastic Map Reduce job flows are visible to all IAM users | |
617 | |
618 :type jobflow_ids: list or str | |
619 :param jobflow_ids: A list of job flow IDs | |
620 | |
621 :type visibility: bool | |
622 :param visibility: Visibility | |
623 """ | |
624 assert visibility in (True, False) | |
625 | |
626 params = {} | |
627 params['VisibleToAllUsers'] = (visibility and "true") or "false" | |
628 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') | |
629 | |
630 return self.get_status('SetVisibleToAllUsers', params, verb='POST') | |
631 | |
632 def _build_bootstrap_action_args(self, bootstrap_action): | |
633 bootstrap_action_params = {} | |
634 bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path | |
635 | |
636 try: | |
637 bootstrap_action_params['Name'] = bootstrap_action.name | |
638 except AttributeError: | |
639 pass | |
640 | |
641 args = bootstrap_action.args() | |
642 if args: | |
643 self.build_list_params(bootstrap_action_params, args, 'ScriptBootstrapAction.Args.member') | |
644 | |
645 return bootstrap_action_params | |
646 | |
647 def _build_step_args(self, step): | |
648 step_params = {} | |
649 step_params['ActionOnFailure'] = step.action_on_failure | |
650 step_params['HadoopJarStep.Jar'] = step.jar() | |
651 | |
652 main_class = step.main_class() | |
653 if main_class: | |
654 step_params['HadoopJarStep.MainClass'] = main_class | |
655 | |
656 args = step.args() | |
657 if args: | |
658 self.build_list_params(step_params, args, 'HadoopJarStep.Args.member') | |
659 | |
660 step_params['Name'] = step.name | |
661 return step_params | |
662 | |
663 def _build_bootstrap_action_list(self, bootstrap_actions): | |
664 if not isinstance(bootstrap_actions, list): | |
665 bootstrap_actions = [bootstrap_actions] | |
666 | |
667 params = {} | |
668 for i, bootstrap_action in enumerate(bootstrap_actions): | |
669 for key, value in six.iteritems(bootstrap_action): | |
670 params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value | |
671 return params | |
672 | |
673 def _build_step_list(self, steps): | |
674 if not isinstance(steps, list): | |
675 steps = [steps] | |
676 | |
677 params = {} | |
678 for i, step in enumerate(steps): | |
679 for key, value in six.iteritems(step): | |
680 params['Steps.member.%s.%s' % (i+1, key)] = value | |
681 return params | |
682 | |
683 def _build_string_list(self, field, items): | |
684 if not isinstance(items, list): | |
685 items = [items] | |
686 | |
687 params = {} | |
688 for i, item in enumerate(items): | |
689 params['%s.member.%s' % (field, i + 1)] = item | |
690 return params | |
691 | |
692 def _build_tag_list(self, tags): | |
693 assert isinstance(tags, dict) | |
694 | |
695 params = {} | |
696 for i, key_value in enumerate(sorted(six.iteritems(tags)), start=1): | |
697 key, value = key_value | |
698 current_prefix = 'Tags.member.%s' % i | |
699 params['%s.Key' % current_prefix] = key | |
700 if value: | |
701 params['%s.Value' % current_prefix] = value | |
702 return params | |
703 | |
704 def _build_instance_common_args(self, ec2_keyname, availability_zone, | |
705 keep_alive, hadoop_version): | |
706 """ | |
707 Takes a number of parameters used when starting a jobflow (as | |
708 specified in run_jobflow() above). Returns a comparable dict for | |
709 use in making a RunJobFlow request. | |
710 """ | |
711 params = { | |
712 'Instances.KeepJobFlowAliveWhenNoSteps': str(keep_alive).lower(), | |
713 } | |
714 | |
715 if hadoop_version: | |
716 params['Instances.HadoopVersion'] = hadoop_version | |
717 if ec2_keyname: | |
718 params['Instances.Ec2KeyName'] = ec2_keyname | |
719 if availability_zone: | |
720 params['Instances.Placement.AvailabilityZone'] = availability_zone | |
721 | |
722 return params | |
723 | |
724 def _build_instance_count_and_type_args(self, master_instance_type, | |
725 slave_instance_type, num_instances): | |
726 """ | |
727 Takes a master instance type (string), a slave instance type | |
728 (string), and a number of instances. Returns a comparable dict | |
729 for use in making a RunJobFlow request. | |
730 """ | |
731 params = {'Instances.MasterInstanceType': master_instance_type, | |
732 'Instances.SlaveInstanceType': slave_instance_type, | |
733 'Instances.InstanceCount': num_instances} | |
734 return params | |
735 | |
736 def _build_instance_group_args(self, instance_group): | |
737 """ | |
738 Takes an InstanceGroup; returns a dict that, when its keys are | |
739 properly prefixed, can be used for describing InstanceGroups in | |
740 RunJobFlow or AddInstanceGroups requests. | |
741 """ | |
742 params = {'InstanceCount': instance_group.num_instances, | |
743 'InstanceRole': instance_group.role, | |
744 'InstanceType': instance_group.type, | |
745 'Name': instance_group.name, | |
746 'Market': instance_group.market} | |
747 if instance_group.market == 'SPOT': | |
748 params['BidPrice'] = instance_group.bidprice | |
749 return params | |
750 | |
751 def _build_instance_group_list_args(self, instance_groups): | |
752 """ | |
753 Takes a list of InstanceGroups, or a single InstanceGroup. Returns | |
754 a comparable dict for use in making a RunJobFlow or AddInstanceGroups | |
755 request. | |
756 """ | |
757 if not isinstance(instance_groups, list): | |
758 instance_groups = [instance_groups] | |
759 | |
760 params = {} | |
761 for i, instance_group in enumerate(instance_groups): | |
762 ig_dict = self._build_instance_group_args(instance_group) | |
763 for key, value in six.iteritems(ig_dict): | |
764 params['InstanceGroups.member.%d.%s' % (i+1, key)] = value | |
765 return params |