Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/boto/emr/step.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
comparison
equal
deleted
inserted
replaced
4:79f47841a781 | 5:9b1c78e6ba9c |
---|---|
1 # Copyright (c) 2010 Spotify AB | |
2 # Copyright (c) 2010-2011 Yelp | |
3 # | |
4 # Permission is hereby granted, free of charge, to any person obtaining a | |
5 # copy of this software and associated documentation files (the | |
6 # "Software"), to deal in the Software without restriction, including | |
7 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
8 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
9 # persons to whom the Software is furnished to do so, subject to the fol- | |
10 # lowing conditions: | |
11 # | |
12 # The above copyright notice and this permission notice shall be included | |
13 # in all copies or substantial portions of the Software. | |
14 # | |
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
21 # IN THE SOFTWARE. | |
22 | |
23 from boto.compat import six | |
24 | |
25 | |
26 class Step(object): | |
27 """ | |
28 Jobflow Step base class | |
29 """ | |
30 def jar(self): | |
31 """ | |
32 :rtype: str | |
33 :return: URI to the jar | |
34 """ | |
35 raise NotImplemented() | |
36 | |
37 def args(self): | |
38 """ | |
39 :rtype: list(str) | |
40 :return: List of arguments for the step | |
41 """ | |
42 raise NotImplemented() | |
43 | |
44 def main_class(self): | |
45 """ | |
46 :rtype: str | |
47 :return: The main class name | |
48 """ | |
49 raise NotImplemented() | |
50 | |
51 | |
52 class JarStep(Step): | |
53 """ | |
54 Custom jar step | |
55 """ | |
56 def __init__(self, name, jar, main_class=None, | |
57 action_on_failure='TERMINATE_JOB_FLOW', step_args=None): | |
58 """ | |
59 A elastic mapreduce step that executes a jar | |
60 | |
61 :type name: str | |
62 :param name: The name of the step | |
63 :type jar: str | |
64 :param jar: S3 URI to the Jar file | |
65 :type main_class: str | |
66 :param main_class: The class to execute in the jar | |
67 :type action_on_failure: str | |
68 :param action_on_failure: An action, defined in the EMR docs to | |
69 take on failure. | |
70 :type step_args: list(str) | |
71 :param step_args: A list of arguments to pass to the step | |
72 """ | |
73 self.name = name | |
74 self._jar = jar | |
75 self._main_class = main_class | |
76 self.action_on_failure = action_on_failure | |
77 | |
78 if isinstance(step_args, six.string_types): | |
79 step_args = [step_args] | |
80 | |
81 self.step_args = step_args | |
82 | |
83 def jar(self): | |
84 return self._jar | |
85 | |
86 def args(self): | |
87 args = [] | |
88 | |
89 if self.step_args: | |
90 args.extend(self.step_args) | |
91 | |
92 return args | |
93 | |
94 def main_class(self): | |
95 return self._main_class | |
96 | |
97 | |
98 class StreamingStep(Step): | |
99 """ | |
100 Hadoop streaming step | |
101 """ | |
102 def __init__(self, name, mapper, reducer=None, combiner=None, | |
103 action_on_failure='TERMINATE_JOB_FLOW', | |
104 cache_files=None, cache_archives=None, | |
105 step_args=None, input=None, output=None, | |
106 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'): | |
107 """ | |
108 A hadoop streaming elastic mapreduce step | |
109 | |
110 :type name: str | |
111 :param name: The name of the step | |
112 :type mapper: str | |
113 :param mapper: The mapper URI | |
114 :type reducer: str | |
115 :param reducer: The reducer URI | |
116 :type combiner: str | |
117 :param combiner: The combiner URI. Only works for Hadoop 0.20 | |
118 and later! | |
119 :type action_on_failure: str | |
120 :param action_on_failure: An action, defined in the EMR docs to | |
121 take on failure. | |
122 :type cache_files: list(str) | |
123 :param cache_files: A list of cache files to be bundled with the job | |
124 :type cache_archives: list(str) | |
125 :param cache_archives: A list of jar archives to be bundled with | |
126 the job | |
127 :type step_args: list(str) | |
128 :param step_args: A list of arguments to pass to the step | |
129 :type input: str or a list of str | |
130 :param input: The input uri | |
131 :type output: str | |
132 :param output: The output uri | |
133 :type jar: str | |
134 :param jar: The hadoop streaming jar. This can be either a local | |
135 path on the master node, or an s3:// URI. | |
136 """ | |
137 self.name = name | |
138 self.mapper = mapper | |
139 self.reducer = reducer | |
140 self.combiner = combiner | |
141 self.action_on_failure = action_on_failure | |
142 self.cache_files = cache_files | |
143 self.cache_archives = cache_archives | |
144 self.input = input | |
145 self.output = output | |
146 self._jar = jar | |
147 | |
148 if isinstance(step_args, six.string_types): | |
149 step_args = [step_args] | |
150 | |
151 self.step_args = step_args | |
152 | |
153 def jar(self): | |
154 return self._jar | |
155 | |
156 def main_class(self): | |
157 return None | |
158 | |
159 def args(self): | |
160 args = [] | |
161 | |
162 # put extra args BEFORE -mapper and -reducer so that e.g. -libjar | |
163 # will work | |
164 if self.step_args: | |
165 args.extend(self.step_args) | |
166 | |
167 args.extend(['-mapper', self.mapper]) | |
168 | |
169 if self.combiner: | |
170 args.extend(['-combiner', self.combiner]) | |
171 | |
172 if self.reducer: | |
173 args.extend(['-reducer', self.reducer]) | |
174 else: | |
175 args.extend(['-jobconf', 'mapred.reduce.tasks=0']) | |
176 | |
177 if self.input: | |
178 if isinstance(self.input, list): | |
179 for input in self.input: | |
180 args.extend(('-input', input)) | |
181 else: | |
182 args.extend(('-input', self.input)) | |
183 if self.output: | |
184 args.extend(('-output', self.output)) | |
185 | |
186 if self.cache_files: | |
187 for cache_file in self.cache_files: | |
188 args.extend(('-cacheFile', cache_file)) | |
189 | |
190 if self.cache_archives: | |
191 for cache_archive in self.cache_archives: | |
192 args.extend(('-cacheArchive', cache_archive)) | |
193 | |
194 return args | |
195 | |
196 def __repr__(self): | |
197 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % ( | |
198 self.__class__.__module__, self.__class__.__name__, | |
199 self.name, self.mapper, self.reducer, self.action_on_failure, | |
200 self.cache_files, self.cache_archives, self.step_args, | |
201 self.input, self.output, self._jar) | |
202 | |
203 | |
204 class ScriptRunnerStep(JarStep): | |
205 | |
206 ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar' | |
207 | |
208 def __init__(self, name, **kw): | |
209 super(ScriptRunnerStep, self).__init__(name, self.ScriptRunnerJar, **kw) | |
210 | |
211 | |
212 class PigBase(ScriptRunnerStep): | |
213 | |
214 BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script', | |
215 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/'] | |
216 | |
217 | |
218 class InstallPigStep(PigBase): | |
219 """ | |
220 Install pig on emr step | |
221 """ | |
222 | |
223 InstallPigName = 'Install Pig' | |
224 | |
225 def __init__(self, pig_versions='latest'): | |
226 step_args = [] | |
227 step_args.extend(self.BaseArgs) | |
228 step_args.extend(['--install-pig']) | |
229 step_args.extend(['--pig-versions', pig_versions]) | |
230 super(InstallPigStep, self).__init__(self.InstallPigName, step_args=step_args) | |
231 | |
232 | |
233 class PigStep(PigBase): | |
234 """ | |
235 Pig script step | |
236 """ | |
237 | |
238 def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]): | |
239 step_args = [] | |
240 step_args.extend(self.BaseArgs) | |
241 step_args.extend(['--pig-versions', pig_versions]) | |
242 step_args.extend(['--run-pig-script', '--args', '-f', pig_file]) | |
243 step_args.extend(pig_args) | |
244 super(PigStep, self).__init__(name, step_args=step_args) | |
245 | |
246 | |
247 class HiveBase(ScriptRunnerStep): | |
248 | |
249 BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script', | |
250 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/'] | |
251 | |
252 | |
253 class InstallHiveStep(HiveBase): | |
254 """ | |
255 Install Hive on EMR step | |
256 """ | |
257 InstallHiveName = 'Install Hive' | |
258 | |
259 def __init__(self, hive_versions='latest', hive_site=None): | |
260 step_args = [] | |
261 step_args.extend(self.BaseArgs) | |
262 step_args.extend(['--install-hive']) | |
263 step_args.extend(['--hive-versions', hive_versions]) | |
264 if hive_site is not None: | |
265 step_args.extend(['--hive-site=%s' % hive_site]) | |
266 super(InstallHiveStep, self).__init__(self.InstallHiveName, | |
267 step_args=step_args) | |
268 | |
269 | |
270 class HiveStep(HiveBase): | |
271 """ | |
272 Hive script step | |
273 """ | |
274 | |
275 def __init__(self, name, hive_file, hive_versions='latest', | |
276 hive_args=None): | |
277 step_args = [] | |
278 step_args.extend(self.BaseArgs) | |
279 step_args.extend(['--hive-versions', hive_versions]) | |
280 step_args.extend(['--run-hive-script', '--args', '-f', hive_file]) | |
281 if hive_args is not None: | |
282 step_args.extend(hive_args) | |
283 super(HiveStep, self).__init__(name, step_args=step_args) |