comparison env/lib/python3.7/site-packages/boto/emr/step.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
comparison
equal deleted inserted replaced
4:79f47841a781 5:9b1c78e6ba9c
1 # Copyright (c) 2010 Spotify AB
2 # Copyright (c) 2010-2011 Yelp
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol-
10 # lowing conditions:
11 #
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 # IN THE SOFTWARE.
22
23 from boto.compat import six
24
25
26 class Step(object):
27 """
28 Jobflow Step base class
29 """
30 def jar(self):
31 """
32 :rtype: str
33 :return: URI to the jar
34 """
35 raise NotImplemented()
36
37 def args(self):
38 """
39 :rtype: list(str)
40 :return: List of arguments for the step
41 """
42 raise NotImplemented()
43
44 def main_class(self):
45 """
46 :rtype: str
47 :return: The main class name
48 """
49 raise NotImplemented()
50
51
52 class JarStep(Step):
53 """
54 Custom jar step
55 """
56 def __init__(self, name, jar, main_class=None,
57 action_on_failure='TERMINATE_JOB_FLOW', step_args=None):
58 """
59 A elastic mapreduce step that executes a jar
60
61 :type name: str
62 :param name: The name of the step
63 :type jar: str
64 :param jar: S3 URI to the Jar file
65 :type main_class: str
66 :param main_class: The class to execute in the jar
67 :type action_on_failure: str
68 :param action_on_failure: An action, defined in the EMR docs to
69 take on failure.
70 :type step_args: list(str)
71 :param step_args: A list of arguments to pass to the step
72 """
73 self.name = name
74 self._jar = jar
75 self._main_class = main_class
76 self.action_on_failure = action_on_failure
77
78 if isinstance(step_args, six.string_types):
79 step_args = [step_args]
80
81 self.step_args = step_args
82
83 def jar(self):
84 return self._jar
85
86 def args(self):
87 args = []
88
89 if self.step_args:
90 args.extend(self.step_args)
91
92 return args
93
94 def main_class(self):
95 return self._main_class
96
97
98 class StreamingStep(Step):
99 """
100 Hadoop streaming step
101 """
102 def __init__(self, name, mapper, reducer=None, combiner=None,
103 action_on_failure='TERMINATE_JOB_FLOW',
104 cache_files=None, cache_archives=None,
105 step_args=None, input=None, output=None,
106 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'):
107 """
108 A hadoop streaming elastic mapreduce step
109
110 :type name: str
111 :param name: The name of the step
112 :type mapper: str
113 :param mapper: The mapper URI
114 :type reducer: str
115 :param reducer: The reducer URI
116 :type combiner: str
117 :param combiner: The combiner URI. Only works for Hadoop 0.20
118 and later!
119 :type action_on_failure: str
120 :param action_on_failure: An action, defined in the EMR docs to
121 take on failure.
122 :type cache_files: list(str)
123 :param cache_files: A list of cache files to be bundled with the job
124 :type cache_archives: list(str)
125 :param cache_archives: A list of jar archives to be bundled with
126 the job
127 :type step_args: list(str)
128 :param step_args: A list of arguments to pass to the step
129 :type input: str or a list of str
130 :param input: The input uri
131 :type output: str
132 :param output: The output uri
133 :type jar: str
134 :param jar: The hadoop streaming jar. This can be either a local
135 path on the master node, or an s3:// URI.
136 """
137 self.name = name
138 self.mapper = mapper
139 self.reducer = reducer
140 self.combiner = combiner
141 self.action_on_failure = action_on_failure
142 self.cache_files = cache_files
143 self.cache_archives = cache_archives
144 self.input = input
145 self.output = output
146 self._jar = jar
147
148 if isinstance(step_args, six.string_types):
149 step_args = [step_args]
150
151 self.step_args = step_args
152
153 def jar(self):
154 return self._jar
155
156 def main_class(self):
157 return None
158
159 def args(self):
160 args = []
161
162 # put extra args BEFORE -mapper and -reducer so that e.g. -libjar
163 # will work
164 if self.step_args:
165 args.extend(self.step_args)
166
167 args.extend(['-mapper', self.mapper])
168
169 if self.combiner:
170 args.extend(['-combiner', self.combiner])
171
172 if self.reducer:
173 args.extend(['-reducer', self.reducer])
174 else:
175 args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
176
177 if self.input:
178 if isinstance(self.input, list):
179 for input in self.input:
180 args.extend(('-input', input))
181 else:
182 args.extend(('-input', self.input))
183 if self.output:
184 args.extend(('-output', self.output))
185
186 if self.cache_files:
187 for cache_file in self.cache_files:
188 args.extend(('-cacheFile', cache_file))
189
190 if self.cache_archives:
191 for cache_archive in self.cache_archives:
192 args.extend(('-cacheArchive', cache_archive))
193
194 return args
195
196 def __repr__(self):
197 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % (
198 self.__class__.__module__, self.__class__.__name__,
199 self.name, self.mapper, self.reducer, self.action_on_failure,
200 self.cache_files, self.cache_archives, self.step_args,
201 self.input, self.output, self._jar)
202
203
204 class ScriptRunnerStep(JarStep):
205
206 ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
207
208 def __init__(self, name, **kw):
209 super(ScriptRunnerStep, self).__init__(name, self.ScriptRunnerJar, **kw)
210
211
212 class PigBase(ScriptRunnerStep):
213
214 BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script',
215 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/']
216
217
218 class InstallPigStep(PigBase):
219 """
220 Install pig on emr step
221 """
222
223 InstallPigName = 'Install Pig'
224
225 def __init__(self, pig_versions='latest'):
226 step_args = []
227 step_args.extend(self.BaseArgs)
228 step_args.extend(['--install-pig'])
229 step_args.extend(['--pig-versions', pig_versions])
230 super(InstallPigStep, self).__init__(self.InstallPigName, step_args=step_args)
231
232
233 class PigStep(PigBase):
234 """
235 Pig script step
236 """
237
238 def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]):
239 step_args = []
240 step_args.extend(self.BaseArgs)
241 step_args.extend(['--pig-versions', pig_versions])
242 step_args.extend(['--run-pig-script', '--args', '-f', pig_file])
243 step_args.extend(pig_args)
244 super(PigStep, self).__init__(name, step_args=step_args)
245
246
247 class HiveBase(ScriptRunnerStep):
248
249 BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script',
250 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/']
251
252
253 class InstallHiveStep(HiveBase):
254 """
255 Install Hive on EMR step
256 """
257 InstallHiveName = 'Install Hive'
258
259 def __init__(self, hive_versions='latest', hive_site=None):
260 step_args = []
261 step_args.extend(self.BaseArgs)
262 step_args.extend(['--install-hive'])
263 step_args.extend(['--hive-versions', hive_versions])
264 if hive_site is not None:
265 step_args.extend(['--hive-site=%s' % hive_site])
266 super(InstallHiveStep, self).__init__(self.InstallHiveName,
267 step_args=step_args)
268
269
270 class HiveStep(HiveBase):
271 """
272 Hive script step
273 """
274
275 def __init__(self, name, hive_file, hive_versions='latest',
276 hive_args=None):
277 step_args = []
278 step_args.extend(self.BaseArgs)
279 step_args.extend(['--hive-versions', hive_versions])
280 step_args.extend(['--run-hive-script', '--args', '-f', hive_file])
281 if hive_args is not None:
282 step_args.extend(hive_args)
283 super(HiveStep, self).__init__(name, step_args=step_args)