Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/boto/gs/resumable_upload_handler.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 # Copyright 2010 Google Inc. | |
2 # | |
3 # Permission is hereby granted, free of charge, to any person obtaining a | |
4 # copy of this software and associated documentation files (the | |
5 # "Software"), to deal in the Software without restriction, including | |
6 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
7 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
8 # persons to whom the Software is furnished to do so, subject to the fol- | |
9 # lowing conditions: | |
10 # | |
11 # The above copyright notice and this permission notice shall be included | |
12 # in all copies or substantial portions of the Software. | |
13 # | |
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
20 # IN THE SOFTWARE. | |
21 import errno | |
22 import httplib | |
23 import os | |
24 import random | |
25 import re | |
26 import socket | |
27 import time | |
28 import urlparse | |
29 from hashlib import md5 | |
30 from boto import config, UserAgent | |
31 from boto.connection import AWSAuthConnection | |
32 from boto.exception import InvalidUriError | |
33 from boto.exception import ResumableTransferDisposition | |
34 from boto.exception import ResumableUploadException | |
35 from boto.s3.keyfile import KeyFile | |
36 | |
37 """ | |
38 Handler for Google Cloud Storage resumable uploads. See | |
39 http://code.google.com/apis/storage/docs/developer-guide.html#resumable | |
40 for details. | |
41 | |
42 Resumable uploads will retry failed uploads, resuming at the byte | |
43 count completed by the last upload attempt. If too many retries happen with | |
44 no progress (per configurable num_retries param), the upload will be | |
45 aborted in the current process. | |
46 | |
47 The caller can optionally specify a tracker_file_name param in the | |
48 ResumableUploadHandler constructor. If you do this, that file will | |
49 save the state needed to allow retrying later, in a separate process | |
50 (e.g., in a later run of gsutil). | |
51 """ | |
52 | |
53 | |
54 class ResumableUploadHandler(object): | |
55 | |
56 BUFFER_SIZE = 8192 | |
57 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, | |
58 socket.gaierror) | |
59 | |
60 # (start, end) response indicating server has nothing (upload protocol uses | |
61 # inclusive numbering). | |
62 SERVER_HAS_NOTHING = (0, -1) | |
63 | |
64 def __init__(self, tracker_file_name=None, num_retries=None): | |
65 """ | |
66 Constructor. Instantiate once for each uploaded file. | |
67 | |
68 :type tracker_file_name: string | |
69 :param tracker_file_name: optional file name to save tracker URI. | |
70 If supplied and the current process fails the upload, it can be | |
71 retried in a new process. If called with an existing file containing | |
72 a valid tracker URI, we'll resume the upload from this URI; else | |
73 we'll start a new resumable upload (and write the URI to this | |
74 tracker file). | |
75 | |
76 :type num_retries: int | |
77 :param num_retries: the number of times we'll re-try a resumable upload | |
78 making no progress. (Count resets every time we get progress, so | |
79 upload can span many more than this number of retries.) | |
80 """ | |
81 self.tracker_file_name = tracker_file_name | |
82 self.num_retries = num_retries | |
83 self.server_has_bytes = 0 # Byte count at last server check. | |
84 self.tracker_uri = None | |
85 if tracker_file_name: | |
86 self._load_tracker_uri_from_file() | |
87 # Save upload_start_point in instance state so caller can find how | |
88 # much was transferred by this ResumableUploadHandler (across retries). | |
89 self.upload_start_point = None | |
90 | |
91 def _load_tracker_uri_from_file(self): | |
92 f = None | |
93 try: | |
94 f = open(self.tracker_file_name, 'r') | |
95 uri = f.readline().strip() | |
96 self._set_tracker_uri(uri) | |
97 except IOError as e: | |
98 # Ignore non-existent file (happens first time an upload | |
99 # is attempted on a file), but warn user for other errors. | |
100 if e.errno != errno.ENOENT: | |
101 # Will restart because self.tracker_uri is None. | |
102 print('Couldn\'t read URI tracker file (%s): %s. Restarting ' | |
103 'upload from scratch.' % | |
104 (self.tracker_file_name, e.strerror)) | |
105 except InvalidUriError as e: | |
106 # Warn user, but proceed (will restart because | |
107 # self.tracker_uri is None). | |
108 print('Invalid tracker URI (%s) found in URI tracker file ' | |
109 '(%s). Restarting upload from scratch.' % | |
110 (uri, self.tracker_file_name)) | |
111 finally: | |
112 if f: | |
113 f.close() | |
114 | |
115 def _save_tracker_uri_to_file(self): | |
116 """ | |
117 Saves URI to tracker file if one was passed to constructor. | |
118 """ | |
119 if not self.tracker_file_name: | |
120 return | |
121 f = None | |
122 try: | |
123 with os.fdopen(os.open(self.tracker_file_name, | |
124 os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f: | |
125 f.write(self.tracker_uri) | |
126 except IOError as e: | |
127 raise ResumableUploadException( | |
128 'Couldn\'t write URI tracker file (%s): %s.\nThis can happen' | |
129 'if you\'re using an incorrectly configured upload tool\n' | |
130 '(e.g., gsutil configured to save tracker files to an ' | |
131 'unwritable directory)' % | |
132 (self.tracker_file_name, e.strerror), | |
133 ResumableTransferDisposition.ABORT) | |
134 | |
135 def _set_tracker_uri(self, uri): | |
136 """ | |
137 Called when we start a new resumable upload or get a new tracker | |
138 URI for the upload. Saves URI and resets upload state. | |
139 | |
140 Raises InvalidUriError if URI is syntactically invalid. | |
141 """ | |
142 parse_result = urlparse.urlparse(uri) | |
143 if (parse_result.scheme.lower() not in ['http', 'https'] or | |
144 not parse_result.netloc): | |
145 raise InvalidUriError('Invalid tracker URI (%s)' % uri) | |
146 self.tracker_uri = uri | |
147 self.tracker_uri_host = parse_result.netloc | |
148 self.tracker_uri_path = '%s?%s' % ( | |
149 parse_result.path, parse_result.query) | |
150 self.server_has_bytes = 0 | |
151 | |
152 def get_tracker_uri(self): | |
153 """ | |
154 Returns upload tracker URI, or None if the upload has not yet started. | |
155 """ | |
156 return self.tracker_uri | |
157 | |
158 def get_upload_id(self): | |
159 """ | |
160 Returns the upload ID for the resumable upload, or None if the upload | |
161 has not yet started. | |
162 """ | |
163 # We extract the upload_id from the tracker uri. We could retrieve the | |
164 # upload_id from the headers in the response but this only works for | |
165 # the case where we get the tracker uri from the service. In the case | |
166 # where we get the tracker from the tracking file we need to do this | |
167 # logic anyway. | |
168 delim = '?upload_id=' | |
169 if self.tracker_uri and delim in self.tracker_uri: | |
170 return self.tracker_uri[self.tracker_uri.index(delim) + len(delim):] | |
171 else: | |
172 return None | |
173 | |
174 def _remove_tracker_file(self): | |
175 if (self.tracker_file_name and | |
176 os.path.exists(self.tracker_file_name)): | |
177 os.unlink(self.tracker_file_name) | |
178 | |
179 def _build_content_range_header(self, range_spec='*', length_spec='*'): | |
180 return 'bytes %s/%s' % (range_spec, length_spec) | |
181 | |
182 def _query_server_state(self, conn, file_length): | |
183 """ | |
184 Queries server to find out state of given upload. | |
185 | |
186 Note that this method really just makes special case use of the | |
187 fact that the upload server always returns the current start/end | |
188 state whenever a PUT doesn't complete. | |
189 | |
190 Returns HTTP response from sending request. | |
191 | |
192 Raises ResumableUploadException if problem querying server. | |
193 """ | |
194 # Send an empty PUT so that server replies with this resumable | |
195 # transfer's state. | |
196 put_headers = {} | |
197 put_headers['Content-Range'] = ( | |
198 self._build_content_range_header('*', file_length)) | |
199 put_headers['Content-Length'] = '0' | |
200 return AWSAuthConnection.make_request(conn, 'PUT', | |
201 path=self.tracker_uri_path, | |
202 auth_path=self.tracker_uri_path, | |
203 headers=put_headers, | |
204 host=self.tracker_uri_host) | |
205 | |
206 def _query_server_pos(self, conn, file_length): | |
207 """ | |
208 Queries server to find out what bytes it currently has. | |
209 | |
210 Returns (server_start, server_end), where the values are inclusive. | |
211 For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2. | |
212 | |
213 Raises ResumableUploadException if problem querying server. | |
214 """ | |
215 resp = self._query_server_state(conn, file_length) | |
216 if resp.status == 200: | |
217 # To handle the boundary condition where the server has the complete | |
218 # file, we return (server_start, file_length-1). That way the | |
219 # calling code can always simply read up through server_end. (If we | |
220 # didn't handle this boundary condition here, the caller would have | |
221 # to check whether server_end == file_length and read one fewer byte | |
222 # in that case.) | |
223 return (0, file_length - 1) # Completed upload. | |
224 if resp.status != 308: | |
225 # This means the server didn't have any state for the given | |
226 # upload ID, which can happen (for example) if the caller saved | |
227 # the tracker URI to a file and then tried to restart the transfer | |
228 # after that upload ID has gone stale. In that case we need to | |
229 # start a new transfer (and the caller will then save the new | |
230 # tracker URI to the tracker file). | |
231 raise ResumableUploadException( | |
232 'Got non-308 response (%s) from server state query' % | |
233 resp.status, ResumableTransferDisposition.START_OVER) | |
234 got_valid_response = False | |
235 range_spec = resp.getheader('range') | |
236 if range_spec: | |
237 # Parse 'bytes=<from>-<to>' range_spec. | |
238 m = re.search('bytes=(\d+)-(\d+)', range_spec) | |
239 if m: | |
240 server_start = long(m.group(1)) | |
241 server_end = long(m.group(2)) | |
242 got_valid_response = True | |
243 else: | |
244 # No Range header, which means the server does not yet have | |
245 # any bytes. Note that the Range header uses inclusive 'from' | |
246 # and 'to' values. Since Range 0-0 would mean that the server | |
247 # has byte 0, omitting the Range header is used to indicate that | |
248 # the server doesn't have any bytes. | |
249 return self.SERVER_HAS_NOTHING | |
250 if not got_valid_response: | |
251 raise ResumableUploadException( | |
252 'Couldn\'t parse upload server state query response (%s)' % | |
253 str(resp.getheaders()), ResumableTransferDisposition.START_OVER) | |
254 if conn.debug >= 1: | |
255 print('Server has: Range: %d - %d.' % (server_start, server_end)) | |
256 return (server_start, server_end) | |
257 | |
258 def _start_new_resumable_upload(self, key, headers=None): | |
259 """ | |
260 Starts a new resumable upload. | |
261 | |
262 Raises ResumableUploadException if any errors occur. | |
263 """ | |
264 conn = key.bucket.connection | |
265 if conn.debug >= 1: | |
266 print('Starting new resumable upload.') | |
267 self.server_has_bytes = 0 | |
268 | |
269 # Start a new resumable upload by sending a POST request with an | |
270 # empty body and the "X-Goog-Resumable: start" header. Include any | |
271 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length | |
272 # (and raise an exception if they tried to pass one, since it's | |
273 # a semantic error to specify it at this point, and if we were to | |
274 # include one now it would cause the server to expect that many | |
275 # bytes; the POST doesn't include the actual file bytes We set | |
276 # the Content-Length in the subsequent PUT, based on the uploaded | |
277 # file size. | |
278 post_headers = {} | |
279 for k in headers: | |
280 if k.lower() == 'content-length': | |
281 raise ResumableUploadException( | |
282 'Attempt to specify Content-Length header (disallowed)', | |
283 ResumableTransferDisposition.ABORT) | |
284 post_headers[k] = headers[k] | |
285 post_headers[conn.provider.resumable_upload_header] = 'start' | |
286 | |
287 resp = conn.make_request( | |
288 'POST', key.bucket.name, key.name, post_headers) | |
289 # Get tracker URI from response 'Location' header. | |
290 body = resp.read() | |
291 | |
292 # Check for various status conditions. | |
293 if resp.status in [500, 503]: | |
294 # Retry status 500 and 503 errors after a delay. | |
295 raise ResumableUploadException( | |
296 'Got status %d from attempt to start resumable upload. ' | |
297 'Will wait/retry' % resp.status, | |
298 ResumableTransferDisposition.WAIT_BEFORE_RETRY) | |
299 elif resp.status != 200 and resp.status != 201: | |
300 raise ResumableUploadException( | |
301 'Got status %d from attempt to start resumable upload. ' | |
302 'Aborting' % resp.status, | |
303 ResumableTransferDisposition.ABORT) | |
304 | |
305 # Else we got 200 or 201 response code, indicating the resumable | |
306 # upload was created. | |
307 tracker_uri = resp.getheader('Location') | |
308 if not tracker_uri: | |
309 raise ResumableUploadException( | |
310 'No resumable tracker URI found in resumable initiation ' | |
311 'POST response (%s)' % body, | |
312 ResumableTransferDisposition.WAIT_BEFORE_RETRY) | |
313 self._set_tracker_uri(tracker_uri) | |
314 self._save_tracker_uri_to_file() | |
315 | |
316 def _upload_file_bytes(self, conn, http_conn, fp, file_length, | |
317 total_bytes_uploaded, cb, num_cb, headers): | |
318 """ | |
319 Makes one attempt to upload file bytes, using an existing resumable | |
320 upload connection. | |
321 | |
322 Returns (etag, generation, metageneration) from server upon success. | |
323 | |
324 Raises ResumableUploadException if any problems occur. | |
325 """ | |
326 buf = fp.read(self.BUFFER_SIZE) | |
327 if cb: | |
328 # The cb_count represents the number of full buffers to send between | |
329 # cb executions. | |
330 if num_cb > 2: | |
331 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2) | |
332 elif num_cb < 0: | |
333 cb_count = -1 | |
334 else: | |
335 cb_count = 0 | |
336 i = 0 | |
337 cb(total_bytes_uploaded, file_length) | |
338 | |
339 # Build resumable upload headers for the transfer. Don't send a | |
340 # Content-Range header if the file is 0 bytes long, because the | |
341 # resumable upload protocol uses an *inclusive* end-range (so, sending | |
342 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file). | |
343 if not headers: | |
344 put_headers = {} | |
345 else: | |
346 put_headers = headers.copy() | |
347 if file_length: | |
348 if total_bytes_uploaded == file_length: | |
349 range_header = self._build_content_range_header( | |
350 '*', file_length) | |
351 else: | |
352 range_header = self._build_content_range_header( | |
353 '%d-%d' % (total_bytes_uploaded, file_length - 1), | |
354 file_length) | |
355 put_headers['Content-Range'] = range_header | |
356 # Set Content-Length to the total bytes we'll send with this PUT. | |
357 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded) | |
358 http_request = AWSAuthConnection.build_base_http_request( | |
359 conn, 'PUT', path=self.tracker_uri_path, auth_path=None, | |
360 headers=put_headers, host=self.tracker_uri_host) | |
361 http_conn.putrequest('PUT', http_request.path) | |
362 for k in put_headers: | |
363 http_conn.putheader(k, put_headers[k]) | |
364 http_conn.endheaders() | |
365 | |
366 # Turn off debug on http connection so upload content isn't included | |
367 # in debug stream. | |
368 http_conn.set_debuglevel(0) | |
369 while buf: | |
370 http_conn.send(buf) | |
371 for alg in self.digesters: | |
372 self.digesters[alg].update(buf) | |
373 total_bytes_uploaded += len(buf) | |
374 if cb: | |
375 i += 1 | |
376 if i == cb_count or cb_count == -1: | |
377 cb(total_bytes_uploaded, file_length) | |
378 i = 0 | |
379 buf = fp.read(self.BUFFER_SIZE) | |
380 http_conn.set_debuglevel(conn.debug) | |
381 if cb: | |
382 cb(total_bytes_uploaded, file_length) | |
383 if total_bytes_uploaded != file_length: | |
384 # Abort (and delete the tracker file) so if the user retries | |
385 # they'll start a new resumable upload rather than potentially | |
386 # attempting to pick back up later where we left off. | |
387 raise ResumableUploadException( | |
388 'File changed during upload: EOF at %d bytes of %d byte file.' % | |
389 (total_bytes_uploaded, file_length), | |
390 ResumableTransferDisposition.ABORT) | |
391 resp = http_conn.getresponse() | |
392 # Restore http connection debug level. | |
393 http_conn.set_debuglevel(conn.debug) | |
394 | |
395 if resp.status == 200: | |
396 # Success. | |
397 return (resp.getheader('etag'), | |
398 resp.getheader('x-goog-generation'), | |
399 resp.getheader('x-goog-metageneration')) | |
400 # Retry timeout (408) and status 500 and 503 errors after a delay. | |
401 elif resp.status in [408, 500, 503]: | |
402 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY | |
403 else: | |
404 # Catch all for any other error codes. | |
405 disposition = ResumableTransferDisposition.ABORT | |
406 raise ResumableUploadException('Got response code %d while attempting ' | |
407 'upload (%s)' % | |
408 (resp.status, resp.reason), disposition) | |
409 | |
410 def _attempt_resumable_upload(self, key, fp, file_length, headers, cb, | |
411 num_cb): | |
412 """ | |
413 Attempts a resumable upload. | |
414 | |
415 Returns (etag, generation, metageneration) from server upon success. | |
416 | |
417 Raises ResumableUploadException if any problems occur. | |
418 """ | |
419 (server_start, server_end) = self.SERVER_HAS_NOTHING | |
420 conn = key.bucket.connection | |
421 if self.tracker_uri: | |
422 # Try to resume existing resumable upload. | |
423 try: | |
424 (server_start, server_end) = ( | |
425 self._query_server_pos(conn, file_length)) | |
426 self.server_has_bytes = server_start | |
427 | |
428 if server_end: | |
429 # If the server already has some of the content, we need to | |
430 # update the digesters with the bytes that have already been | |
431 # uploaded to ensure we get a complete hash in the end. | |
432 print('Catching up hash digest(s) for resumed upload') | |
433 fp.seek(0) | |
434 # Read local file's bytes through position server has. For | |
435 # example, if server has (0, 3) we want to read 3-0+1=4 bytes. | |
436 bytes_to_go = server_end + 1 | |
437 while bytes_to_go: | |
438 chunk = fp.read(min(key.BufferSize, bytes_to_go)) | |
439 if not chunk: | |
440 raise ResumableUploadException( | |
441 'Hit end of file during resumable upload hash ' | |
442 'catchup. This should not happen under\n' | |
443 'normal circumstances, as it indicates the ' | |
444 'server has more bytes of this transfer\nthan' | |
445 ' the current file size. Restarting upload.', | |
446 ResumableTransferDisposition.START_OVER) | |
447 for alg in self.digesters: | |
448 self.digesters[alg].update(chunk) | |
449 bytes_to_go -= len(chunk) | |
450 | |
451 if conn.debug >= 1: | |
452 print('Resuming transfer.') | |
453 except ResumableUploadException as e: | |
454 if conn.debug >= 1: | |
455 print('Unable to resume transfer (%s).' % e.message) | |
456 self._start_new_resumable_upload(key, headers) | |
457 else: | |
458 self._start_new_resumable_upload(key, headers) | |
459 | |
460 # upload_start_point allows the code that instantiated the | |
461 # ResumableUploadHandler to find out the point from which it started | |
462 # uploading (e.g., so it can correctly compute throughput). | |
463 if self.upload_start_point is None: | |
464 self.upload_start_point = server_end | |
465 | |
466 total_bytes_uploaded = server_end + 1 | |
467 # Corner case: Don't attempt to seek if we've already uploaded the | |
468 # entire file, because if the file is a stream (e.g., the KeyFile | |
469 # wrapper around input key when copying between providers), attempting | |
470 # to seek to the end of file would result in an InvalidRange error. | |
471 if file_length < total_bytes_uploaded: | |
472 fp.seek(total_bytes_uploaded) | |
473 conn = key.bucket.connection | |
474 | |
475 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses | |
476 # pool connections) because httplib requires a new HTTP connection per | |
477 # transaction. (Without this, calling http_conn.getresponse() would get | |
478 # "ResponseNotReady".) | |
479 http_conn = conn.new_http_connection(self.tracker_uri_host, conn.port, | |
480 conn.is_secure) | |
481 http_conn.set_debuglevel(conn.debug) | |
482 | |
483 # Make sure to close http_conn at end so if a local file read | |
484 # failure occurs partway through server will terminate current upload | |
485 # and can report that progress on next attempt. | |
486 try: | |
487 return self._upload_file_bytes(conn, http_conn, fp, file_length, | |
488 total_bytes_uploaded, cb, num_cb, | |
489 headers) | |
490 except (ResumableUploadException, socket.error): | |
491 resp = self._query_server_state(conn, file_length) | |
492 if resp.status == 400: | |
493 raise ResumableUploadException('Got 400 response from server ' | |
494 'state query after failed resumable upload attempt. This ' | |
495 'can happen for various reasons, including specifying an ' | |
496 'invalid request (e.g., an invalid canned ACL) or if the ' | |
497 'file size changed between upload attempts', | |
498 ResumableTransferDisposition.ABORT) | |
499 else: | |
500 raise | |
501 finally: | |
502 http_conn.close() | |
503 | |
504 def _check_final_md5(self, key, etag): | |
505 """ | |
506 Checks that etag from server agrees with md5 computed before upload. | |
507 This is important, since the upload could have spanned a number of | |
508 hours and multiple processes (e.g., gsutil runs), and the user could | |
509 change some of the file and not realize they have inconsistent data. | |
510 """ | |
511 if key.bucket.connection.debug >= 1: | |
512 print('Checking md5 against etag.') | |
513 if key.md5 != etag.strip('"\''): | |
514 # Call key.open_read() before attempting to delete the | |
515 # (incorrect-content) key, so we perform that request on a | |
516 # different HTTP connection. This is neededb because httplib | |
517 # will return a "Response not ready" error if you try to perform | |
518 # a second transaction on the connection. | |
519 key.open_read() | |
520 key.close() | |
521 key.delete() | |
522 raise ResumableUploadException( | |
523 'File changed during upload: md5 signature doesn\'t match etag ' | |
524 '(incorrect uploaded object deleted)', | |
525 ResumableTransferDisposition.ABORT) | |
526 | |
527 def handle_resumable_upload_exception(self, e, debug): | |
528 if (e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS): | |
529 if debug >= 1: | |
530 print('Caught non-retryable ResumableUploadException (%s); ' | |
531 'aborting but retaining tracker file' % e.message) | |
532 raise | |
533 elif (e.disposition == ResumableTransferDisposition.ABORT): | |
534 if debug >= 1: | |
535 print('Caught non-retryable ResumableUploadException (%s); ' | |
536 'aborting and removing tracker file' % e.message) | |
537 self._remove_tracker_file() | |
538 raise | |
539 else: | |
540 if debug >= 1: | |
541 print('Caught ResumableUploadException (%s) - will retry' % | |
542 e.message) | |
543 | |
544 def track_progress_less_iterations(self, server_had_bytes_before_attempt, | |
545 roll_back_md5=True, debug=0): | |
546 # At this point we had a re-tryable failure; see if made progress. | |
547 if self.server_has_bytes > server_had_bytes_before_attempt: | |
548 self.progress_less_iterations = 0 # If progress, reset counter. | |
549 else: | |
550 self.progress_less_iterations += 1 | |
551 if roll_back_md5: | |
552 # Rollback any potential hash updates, as we did not | |
553 # make any progress in this iteration. | |
554 self.digesters = self.digesters_before_attempt | |
555 | |
556 if self.progress_less_iterations > self.num_retries: | |
557 # Don't retry any longer in the current process. | |
558 raise ResumableUploadException( | |
559 'Too many resumable upload attempts failed without ' | |
560 'progress. You might try this upload again later', | |
561 ResumableTransferDisposition.ABORT_CUR_PROCESS) | |
562 | |
563 # Use binary exponential backoff to desynchronize client requests. | |
564 sleep_time_secs = random.random() * (2**self.progress_less_iterations) | |
565 if debug >= 1: | |
566 print('Got retryable failure (%d progress-less in a row).\n' | |
567 'Sleeping %3.1f seconds before re-trying' % | |
568 (self.progress_less_iterations, sleep_time_secs)) | |
569 time.sleep(sleep_time_secs) | |
570 | |
571 def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None): | |
572 """ | |
573 Upload a file to a key into a bucket on GS, using GS resumable upload | |
574 protocol. | |
575 | |
576 :type key: :class:`boto.s3.key.Key` or subclass | |
577 :param key: The Key object to which data is to be uploaded | |
578 | |
579 :type fp: file-like object | |
580 :param fp: The file pointer to upload | |
581 | |
582 :type headers: dict | |
583 :param headers: The headers to pass along with the PUT request | |
584 | |
585 :type cb: function | |
586 :param cb: a callback function that will be called to report progress on | |
587 the upload. The callback should accept two integer parameters, the | |
588 first representing the number of bytes that have been successfully | |
589 transmitted to GS, and the second representing the total number of | |
590 bytes that need to be transmitted. | |
591 | |
592 :type num_cb: int | |
593 :param num_cb: (optional) If a callback is specified with the cb | |
594 parameter, this parameter determines the granularity of the callback | |
595 by defining the maximum number of times the callback will be called | |
596 during the file transfer. Providing a negative integer will cause | |
597 your callback to be called with each buffer read. | |
598 | |
599 :type hash_algs: dictionary | |
600 :param hash_algs: (optional) Dictionary mapping hash algorithm | |
601 descriptions to corresponding state-ful hashing objects that | |
602 implement update(), digest(), and copy() (e.g. hashlib.md5()). | |
603 Defaults to {'md5': md5()}. | |
604 | |
605 Raises ResumableUploadException if a problem occurs during the transfer. | |
606 """ | |
607 | |
608 if not headers: | |
609 headers = {} | |
610 # If Content-Type header is present and set to None, remove it. | |
611 # This is gsutil's way of asking boto to refrain from auto-generating | |
612 # that header. | |
613 CT = 'Content-Type' | |
614 if CT in headers and headers[CT] is None: | |
615 del headers[CT] | |
616 | |
617 headers['User-Agent'] = UserAgent | |
618 | |
619 # Determine file size different ways for case where fp is actually a | |
620 # wrapper around a Key vs an actual file. | |
621 if isinstance(fp, KeyFile): | |
622 file_length = fp.getkey().size | |
623 else: | |
624 fp.seek(0, os.SEEK_END) | |
625 file_length = fp.tell() | |
626 fp.seek(0) | |
627 debug = key.bucket.connection.debug | |
628 | |
629 # Compute the MD5 checksum on the fly. | |
630 if hash_algs is None: | |
631 hash_algs = {'md5': md5} | |
632 self.digesters = dict( | |
633 (alg, hash_algs[alg]()) for alg in hash_algs or {}) | |
634 | |
635 # Use num-retries from constructor if one was provided; else check | |
636 # for a value specified in the boto config file; else default to 5. | |
637 if self.num_retries is None: | |
638 self.num_retries = config.getint('Boto', 'num_retries', 6) | |
639 self.progress_less_iterations = 0 | |
640 | |
641 while True: # Retry as long as we're making progress. | |
642 server_had_bytes_before_attempt = self.server_has_bytes | |
643 self.digesters_before_attempt = dict( | |
644 (alg, self.digesters[alg].copy()) | |
645 for alg in self.digesters) | |
646 try: | |
647 # Save generation and metageneration in class state so caller | |
648 # can find these values, for use in preconditions of future | |
649 # operations on the uploaded object. | |
650 (etag, self.generation, self.metageneration) = ( | |
651 self._attempt_resumable_upload(key, fp, file_length, | |
652 headers, cb, num_cb)) | |
653 | |
654 # Get the final digests for the uploaded content. | |
655 for alg in self.digesters: | |
656 key.local_hashes[alg] = self.digesters[alg].digest() | |
657 | |
658 # Upload succceded, so remove the tracker file (if have one). | |
659 self._remove_tracker_file() | |
660 self._check_final_md5(key, etag) | |
661 key.generation = self.generation | |
662 if debug >= 1: | |
663 print('Resumable upload complete.') | |
664 return | |
665 except self.RETRYABLE_EXCEPTIONS as e: | |
666 if debug >= 1: | |
667 print('Caught exception (%s)' % e.__repr__()) | |
668 if isinstance(e, IOError) and e.errno == errno.EPIPE: | |
669 # Broken pipe error causes httplib to immediately | |
670 # close the socket (http://bugs.python.org/issue5542), | |
671 # so we need to close the connection before we resume | |
672 # the upload (which will cause a new connection to be | |
673 # opened the next time an HTTP request is sent). | |
674 key.bucket.connection.connection.close() | |
675 except ResumableUploadException as e: | |
676 self.handle_resumable_upload_exception(e, debug) | |
677 | |
678 self.track_progress_less_iterations(server_had_bytes_before_attempt, | |
679 True, debug) |