Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/boto/gs/resumable_upload_handler.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 # Copyright 2010 Google Inc. | |
| 2 # | |
| 3 # Permission is hereby granted, free of charge, to any person obtaining a | |
| 4 # copy of this software and associated documentation files (the | |
| 5 # "Software"), to deal in the Software without restriction, including | |
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
| 8 # persons to whom the Software is furnished to do so, subject to the fol- | |
| 9 # lowing conditions: | |
| 10 # | |
| 11 # The above copyright notice and this permission notice shall be included | |
| 12 # in all copies or substantial portions of the Software. | |
| 13 # | |
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
| 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
| 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
| 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
| 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
| 20 # IN THE SOFTWARE. | |
| 21 import errno | |
| 22 import httplib | |
| 23 import os | |
| 24 import random | |
| 25 import re | |
| 26 import socket | |
| 27 import time | |
| 28 import urlparse | |
| 29 from hashlib import md5 | |
| 30 from boto import config, UserAgent | |
| 31 from boto.connection import AWSAuthConnection | |
| 32 from boto.exception import InvalidUriError | |
| 33 from boto.exception import ResumableTransferDisposition | |
| 34 from boto.exception import ResumableUploadException | |
| 35 from boto.s3.keyfile import KeyFile | |
| 36 | |
| 37 """ | |
| 38 Handler for Google Cloud Storage resumable uploads. See | |
| 39 http://code.google.com/apis/storage/docs/developer-guide.html#resumable | |
| 40 for details. | |
| 41 | |
| 42 Resumable uploads will retry failed uploads, resuming at the byte | |
| 43 count completed by the last upload attempt. If too many retries happen with | |
| 44 no progress (per configurable num_retries param), the upload will be | |
| 45 aborted in the current process. | |
| 46 | |
| 47 The caller can optionally specify a tracker_file_name param in the | |
| 48 ResumableUploadHandler constructor. If you do this, that file will | |
| 49 save the state needed to allow retrying later, in a separate process | |
| 50 (e.g., in a later run of gsutil). | |
| 51 """ | |
| 52 | |
| 53 | |
| 54 class ResumableUploadHandler(object): | |
| 55 | |
| 56 BUFFER_SIZE = 8192 | |
| 57 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, | |
| 58 socket.gaierror) | |
| 59 | |
| 60 # (start, end) response indicating server has nothing (upload protocol uses | |
| 61 # inclusive numbering). | |
| 62 SERVER_HAS_NOTHING = (0, -1) | |
| 63 | |
| 64 def __init__(self, tracker_file_name=None, num_retries=None): | |
| 65 """ | |
| 66 Constructor. Instantiate once for each uploaded file. | |
| 67 | |
| 68 :type tracker_file_name: string | |
| 69 :param tracker_file_name: optional file name to save tracker URI. | |
| 70 If supplied and the current process fails the upload, it can be | |
| 71 retried in a new process. If called with an existing file containing | |
| 72 a valid tracker URI, we'll resume the upload from this URI; else | |
| 73 we'll start a new resumable upload (and write the URI to this | |
| 74 tracker file). | |
| 75 | |
| 76 :type num_retries: int | |
| 77 :param num_retries: the number of times we'll re-try a resumable upload | |
| 78 making no progress. (Count resets every time we get progress, so | |
| 79 upload can span many more than this number of retries.) | |
| 80 """ | |
| 81 self.tracker_file_name = tracker_file_name | |
| 82 self.num_retries = num_retries | |
| 83 self.server_has_bytes = 0 # Byte count at last server check. | |
| 84 self.tracker_uri = None | |
| 85 if tracker_file_name: | |
| 86 self._load_tracker_uri_from_file() | |
| 87 # Save upload_start_point in instance state so caller can find how | |
| 88 # much was transferred by this ResumableUploadHandler (across retries). | |
| 89 self.upload_start_point = None | |
| 90 | |
| 91 def _load_tracker_uri_from_file(self): | |
| 92 f = None | |
| 93 try: | |
| 94 f = open(self.tracker_file_name, 'r') | |
| 95 uri = f.readline().strip() | |
| 96 self._set_tracker_uri(uri) | |
| 97 except IOError as e: | |
| 98 # Ignore non-existent file (happens first time an upload | |
| 99 # is attempted on a file), but warn user for other errors. | |
| 100 if e.errno != errno.ENOENT: | |
| 101 # Will restart because self.tracker_uri is None. | |
| 102 print('Couldn\'t read URI tracker file (%s): %s. Restarting ' | |
| 103 'upload from scratch.' % | |
| 104 (self.tracker_file_name, e.strerror)) | |
| 105 except InvalidUriError as e: | |
| 106 # Warn user, but proceed (will restart because | |
| 107 # self.tracker_uri is None). | |
| 108 print('Invalid tracker URI (%s) found in URI tracker file ' | |
| 109 '(%s). Restarting upload from scratch.' % | |
| 110 (uri, self.tracker_file_name)) | |
| 111 finally: | |
| 112 if f: | |
| 113 f.close() | |
| 114 | |
| 115 def _save_tracker_uri_to_file(self): | |
| 116 """ | |
| 117 Saves URI to tracker file if one was passed to constructor. | |
| 118 """ | |
| 119 if not self.tracker_file_name: | |
| 120 return | |
| 121 f = None | |
| 122 try: | |
| 123 with os.fdopen(os.open(self.tracker_file_name, | |
| 124 os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f: | |
| 125 f.write(self.tracker_uri) | |
| 126 except IOError as e: | |
| 127 raise ResumableUploadException( | |
| 128 'Couldn\'t write URI tracker file (%s): %s.\nThis can happen' | |
| 129 'if you\'re using an incorrectly configured upload tool\n' | |
| 130 '(e.g., gsutil configured to save tracker files to an ' | |
| 131 'unwritable directory)' % | |
| 132 (self.tracker_file_name, e.strerror), | |
| 133 ResumableTransferDisposition.ABORT) | |
| 134 | |
| 135 def _set_tracker_uri(self, uri): | |
| 136 """ | |
| 137 Called when we start a new resumable upload or get a new tracker | |
| 138 URI for the upload. Saves URI and resets upload state. | |
| 139 | |
| 140 Raises InvalidUriError if URI is syntactically invalid. | |
| 141 """ | |
| 142 parse_result = urlparse.urlparse(uri) | |
| 143 if (parse_result.scheme.lower() not in ['http', 'https'] or | |
| 144 not parse_result.netloc): | |
| 145 raise InvalidUriError('Invalid tracker URI (%s)' % uri) | |
| 146 self.tracker_uri = uri | |
| 147 self.tracker_uri_host = parse_result.netloc | |
| 148 self.tracker_uri_path = '%s?%s' % ( | |
| 149 parse_result.path, parse_result.query) | |
| 150 self.server_has_bytes = 0 | |
| 151 | |
| 152 def get_tracker_uri(self): | |
| 153 """ | |
| 154 Returns upload tracker URI, or None if the upload has not yet started. | |
| 155 """ | |
| 156 return self.tracker_uri | |
| 157 | |
| 158 def get_upload_id(self): | |
| 159 """ | |
| 160 Returns the upload ID for the resumable upload, or None if the upload | |
| 161 has not yet started. | |
| 162 """ | |
| 163 # We extract the upload_id from the tracker uri. We could retrieve the | |
| 164 # upload_id from the headers in the response but this only works for | |
| 165 # the case where we get the tracker uri from the service. In the case | |
| 166 # where we get the tracker from the tracking file we need to do this | |
| 167 # logic anyway. | |
| 168 delim = '?upload_id=' | |
| 169 if self.tracker_uri and delim in self.tracker_uri: | |
| 170 return self.tracker_uri[self.tracker_uri.index(delim) + len(delim):] | |
| 171 else: | |
| 172 return None | |
| 173 | |
| 174 def _remove_tracker_file(self): | |
| 175 if (self.tracker_file_name and | |
| 176 os.path.exists(self.tracker_file_name)): | |
| 177 os.unlink(self.tracker_file_name) | |
| 178 | |
| 179 def _build_content_range_header(self, range_spec='*', length_spec='*'): | |
| 180 return 'bytes %s/%s' % (range_spec, length_spec) | |
| 181 | |
| 182 def _query_server_state(self, conn, file_length): | |
| 183 """ | |
| 184 Queries server to find out state of given upload. | |
| 185 | |
| 186 Note that this method really just makes special case use of the | |
| 187 fact that the upload server always returns the current start/end | |
| 188 state whenever a PUT doesn't complete. | |
| 189 | |
| 190 Returns HTTP response from sending request. | |
| 191 | |
| 192 Raises ResumableUploadException if problem querying server. | |
| 193 """ | |
| 194 # Send an empty PUT so that server replies with this resumable | |
| 195 # transfer's state. | |
| 196 put_headers = {} | |
| 197 put_headers['Content-Range'] = ( | |
| 198 self._build_content_range_header('*', file_length)) | |
| 199 put_headers['Content-Length'] = '0' | |
| 200 return AWSAuthConnection.make_request(conn, 'PUT', | |
| 201 path=self.tracker_uri_path, | |
| 202 auth_path=self.tracker_uri_path, | |
| 203 headers=put_headers, | |
| 204 host=self.tracker_uri_host) | |
| 205 | |
| 206 def _query_server_pos(self, conn, file_length): | |
| 207 """ | |
| 208 Queries server to find out what bytes it currently has. | |
| 209 | |
| 210 Returns (server_start, server_end), where the values are inclusive. | |
| 211 For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2. | |
| 212 | |
| 213 Raises ResumableUploadException if problem querying server. | |
| 214 """ | |
| 215 resp = self._query_server_state(conn, file_length) | |
| 216 if resp.status == 200: | |
| 217 # To handle the boundary condition where the server has the complete | |
| 218 # file, we return (server_start, file_length-1). That way the | |
| 219 # calling code can always simply read up through server_end. (If we | |
| 220 # didn't handle this boundary condition here, the caller would have | |
| 221 # to check whether server_end == file_length and read one fewer byte | |
| 222 # in that case.) | |
| 223 return (0, file_length - 1) # Completed upload. | |
| 224 if resp.status != 308: | |
| 225 # This means the server didn't have any state for the given | |
| 226 # upload ID, which can happen (for example) if the caller saved | |
| 227 # the tracker URI to a file and then tried to restart the transfer | |
| 228 # after that upload ID has gone stale. In that case we need to | |
| 229 # start a new transfer (and the caller will then save the new | |
| 230 # tracker URI to the tracker file). | |
| 231 raise ResumableUploadException( | |
| 232 'Got non-308 response (%s) from server state query' % | |
| 233 resp.status, ResumableTransferDisposition.START_OVER) | |
| 234 got_valid_response = False | |
| 235 range_spec = resp.getheader('range') | |
| 236 if range_spec: | |
| 237 # Parse 'bytes=<from>-<to>' range_spec. | |
| 238 m = re.search('bytes=(\d+)-(\d+)', range_spec) | |
| 239 if m: | |
| 240 server_start = long(m.group(1)) | |
| 241 server_end = long(m.group(2)) | |
| 242 got_valid_response = True | |
| 243 else: | |
| 244 # No Range header, which means the server does not yet have | |
| 245 # any bytes. Note that the Range header uses inclusive 'from' | |
| 246 # and 'to' values. Since Range 0-0 would mean that the server | |
| 247 # has byte 0, omitting the Range header is used to indicate that | |
| 248 # the server doesn't have any bytes. | |
| 249 return self.SERVER_HAS_NOTHING | |
| 250 if not got_valid_response: | |
| 251 raise ResumableUploadException( | |
| 252 'Couldn\'t parse upload server state query response (%s)' % | |
| 253 str(resp.getheaders()), ResumableTransferDisposition.START_OVER) | |
| 254 if conn.debug >= 1: | |
| 255 print('Server has: Range: %d - %d.' % (server_start, server_end)) | |
| 256 return (server_start, server_end) | |
| 257 | |
| 258 def _start_new_resumable_upload(self, key, headers=None): | |
| 259 """ | |
| 260 Starts a new resumable upload. | |
| 261 | |
| 262 Raises ResumableUploadException if any errors occur. | |
| 263 """ | |
| 264 conn = key.bucket.connection | |
| 265 if conn.debug >= 1: | |
| 266 print('Starting new resumable upload.') | |
| 267 self.server_has_bytes = 0 | |
| 268 | |
| 269 # Start a new resumable upload by sending a POST request with an | |
| 270 # empty body and the "X-Goog-Resumable: start" header. Include any | |
| 271 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length | |
| 272 # (and raise an exception if they tried to pass one, since it's | |
| 273 # a semantic error to specify it at this point, and if we were to | |
| 274 # include one now it would cause the server to expect that many | |
| 275 # bytes; the POST doesn't include the actual file bytes We set | |
| 276 # the Content-Length in the subsequent PUT, based on the uploaded | |
| 277 # file size. | |
| 278 post_headers = {} | |
| 279 for k in headers: | |
| 280 if k.lower() == 'content-length': | |
| 281 raise ResumableUploadException( | |
| 282 'Attempt to specify Content-Length header (disallowed)', | |
| 283 ResumableTransferDisposition.ABORT) | |
| 284 post_headers[k] = headers[k] | |
| 285 post_headers[conn.provider.resumable_upload_header] = 'start' | |
| 286 | |
| 287 resp = conn.make_request( | |
| 288 'POST', key.bucket.name, key.name, post_headers) | |
| 289 # Get tracker URI from response 'Location' header. | |
| 290 body = resp.read() | |
| 291 | |
| 292 # Check for various status conditions. | |
| 293 if resp.status in [500, 503]: | |
| 294 # Retry status 500 and 503 errors after a delay. | |
| 295 raise ResumableUploadException( | |
| 296 'Got status %d from attempt to start resumable upload. ' | |
| 297 'Will wait/retry' % resp.status, | |
| 298 ResumableTransferDisposition.WAIT_BEFORE_RETRY) | |
| 299 elif resp.status != 200 and resp.status != 201: | |
| 300 raise ResumableUploadException( | |
| 301 'Got status %d from attempt to start resumable upload. ' | |
| 302 'Aborting' % resp.status, | |
| 303 ResumableTransferDisposition.ABORT) | |
| 304 | |
| 305 # Else we got 200 or 201 response code, indicating the resumable | |
| 306 # upload was created. | |
| 307 tracker_uri = resp.getheader('Location') | |
| 308 if not tracker_uri: | |
| 309 raise ResumableUploadException( | |
| 310 'No resumable tracker URI found in resumable initiation ' | |
| 311 'POST response (%s)' % body, | |
| 312 ResumableTransferDisposition.WAIT_BEFORE_RETRY) | |
| 313 self._set_tracker_uri(tracker_uri) | |
| 314 self._save_tracker_uri_to_file() | |
| 315 | |
| 316 def _upload_file_bytes(self, conn, http_conn, fp, file_length, | |
| 317 total_bytes_uploaded, cb, num_cb, headers): | |
| 318 """ | |
| 319 Makes one attempt to upload file bytes, using an existing resumable | |
| 320 upload connection. | |
| 321 | |
| 322 Returns (etag, generation, metageneration) from server upon success. | |
| 323 | |
| 324 Raises ResumableUploadException if any problems occur. | |
| 325 """ | |
| 326 buf = fp.read(self.BUFFER_SIZE) | |
| 327 if cb: | |
| 328 # The cb_count represents the number of full buffers to send between | |
| 329 # cb executions. | |
| 330 if num_cb > 2: | |
| 331 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2) | |
| 332 elif num_cb < 0: | |
| 333 cb_count = -1 | |
| 334 else: | |
| 335 cb_count = 0 | |
| 336 i = 0 | |
| 337 cb(total_bytes_uploaded, file_length) | |
| 338 | |
| 339 # Build resumable upload headers for the transfer. Don't send a | |
| 340 # Content-Range header if the file is 0 bytes long, because the | |
| 341 # resumable upload protocol uses an *inclusive* end-range (so, sending | |
| 342 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file). | |
| 343 if not headers: | |
| 344 put_headers = {} | |
| 345 else: | |
| 346 put_headers = headers.copy() | |
| 347 if file_length: | |
| 348 if total_bytes_uploaded == file_length: | |
| 349 range_header = self._build_content_range_header( | |
| 350 '*', file_length) | |
| 351 else: | |
| 352 range_header = self._build_content_range_header( | |
| 353 '%d-%d' % (total_bytes_uploaded, file_length - 1), | |
| 354 file_length) | |
| 355 put_headers['Content-Range'] = range_header | |
| 356 # Set Content-Length to the total bytes we'll send with this PUT. | |
| 357 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded) | |
| 358 http_request = AWSAuthConnection.build_base_http_request( | |
| 359 conn, 'PUT', path=self.tracker_uri_path, auth_path=None, | |
| 360 headers=put_headers, host=self.tracker_uri_host) | |
| 361 http_conn.putrequest('PUT', http_request.path) | |
| 362 for k in put_headers: | |
| 363 http_conn.putheader(k, put_headers[k]) | |
| 364 http_conn.endheaders() | |
| 365 | |
| 366 # Turn off debug on http connection so upload content isn't included | |
| 367 # in debug stream. | |
| 368 http_conn.set_debuglevel(0) | |
| 369 while buf: | |
| 370 http_conn.send(buf) | |
| 371 for alg in self.digesters: | |
| 372 self.digesters[alg].update(buf) | |
| 373 total_bytes_uploaded += len(buf) | |
| 374 if cb: | |
| 375 i += 1 | |
| 376 if i == cb_count or cb_count == -1: | |
| 377 cb(total_bytes_uploaded, file_length) | |
| 378 i = 0 | |
| 379 buf = fp.read(self.BUFFER_SIZE) | |
| 380 http_conn.set_debuglevel(conn.debug) | |
| 381 if cb: | |
| 382 cb(total_bytes_uploaded, file_length) | |
| 383 if total_bytes_uploaded != file_length: | |
| 384 # Abort (and delete the tracker file) so if the user retries | |
| 385 # they'll start a new resumable upload rather than potentially | |
| 386 # attempting to pick back up later where we left off. | |
| 387 raise ResumableUploadException( | |
| 388 'File changed during upload: EOF at %d bytes of %d byte file.' % | |
| 389 (total_bytes_uploaded, file_length), | |
| 390 ResumableTransferDisposition.ABORT) | |
| 391 resp = http_conn.getresponse() | |
| 392 # Restore http connection debug level. | |
| 393 http_conn.set_debuglevel(conn.debug) | |
| 394 | |
| 395 if resp.status == 200: | |
| 396 # Success. | |
| 397 return (resp.getheader('etag'), | |
| 398 resp.getheader('x-goog-generation'), | |
| 399 resp.getheader('x-goog-metageneration')) | |
| 400 # Retry timeout (408) and status 500 and 503 errors after a delay. | |
| 401 elif resp.status in [408, 500, 503]: | |
| 402 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY | |
| 403 else: | |
| 404 # Catch all for any other error codes. | |
| 405 disposition = ResumableTransferDisposition.ABORT | |
| 406 raise ResumableUploadException('Got response code %d while attempting ' | |
| 407 'upload (%s)' % | |
| 408 (resp.status, resp.reason), disposition) | |
| 409 | |
| 410 def _attempt_resumable_upload(self, key, fp, file_length, headers, cb, | |
| 411 num_cb): | |
| 412 """ | |
| 413 Attempts a resumable upload. | |
| 414 | |
| 415 Returns (etag, generation, metageneration) from server upon success. | |
| 416 | |
| 417 Raises ResumableUploadException if any problems occur. | |
| 418 """ | |
| 419 (server_start, server_end) = self.SERVER_HAS_NOTHING | |
| 420 conn = key.bucket.connection | |
| 421 if self.tracker_uri: | |
| 422 # Try to resume existing resumable upload. | |
| 423 try: | |
| 424 (server_start, server_end) = ( | |
| 425 self._query_server_pos(conn, file_length)) | |
| 426 self.server_has_bytes = server_start | |
| 427 | |
| 428 if server_end: | |
| 429 # If the server already has some of the content, we need to | |
| 430 # update the digesters with the bytes that have already been | |
| 431 # uploaded to ensure we get a complete hash in the end. | |
| 432 print('Catching up hash digest(s) for resumed upload') | |
| 433 fp.seek(0) | |
| 434 # Read local file's bytes through position server has. For | |
| 435 # example, if server has (0, 3) we want to read 3-0+1=4 bytes. | |
| 436 bytes_to_go = server_end + 1 | |
| 437 while bytes_to_go: | |
| 438 chunk = fp.read(min(key.BufferSize, bytes_to_go)) | |
| 439 if not chunk: | |
| 440 raise ResumableUploadException( | |
| 441 'Hit end of file during resumable upload hash ' | |
| 442 'catchup. This should not happen under\n' | |
| 443 'normal circumstances, as it indicates the ' | |
| 444 'server has more bytes of this transfer\nthan' | |
| 445 ' the current file size. Restarting upload.', | |
| 446 ResumableTransferDisposition.START_OVER) | |
| 447 for alg in self.digesters: | |
| 448 self.digesters[alg].update(chunk) | |
| 449 bytes_to_go -= len(chunk) | |
| 450 | |
| 451 if conn.debug >= 1: | |
| 452 print('Resuming transfer.') | |
| 453 except ResumableUploadException as e: | |
| 454 if conn.debug >= 1: | |
| 455 print('Unable to resume transfer (%s).' % e.message) | |
| 456 self._start_new_resumable_upload(key, headers) | |
| 457 else: | |
| 458 self._start_new_resumable_upload(key, headers) | |
| 459 | |
| 460 # upload_start_point allows the code that instantiated the | |
| 461 # ResumableUploadHandler to find out the point from which it started | |
| 462 # uploading (e.g., so it can correctly compute throughput). | |
| 463 if self.upload_start_point is None: | |
| 464 self.upload_start_point = server_end | |
| 465 | |
| 466 total_bytes_uploaded = server_end + 1 | |
| 467 # Corner case: Don't attempt to seek if we've already uploaded the | |
| 468 # entire file, because if the file is a stream (e.g., the KeyFile | |
| 469 # wrapper around input key when copying between providers), attempting | |
| 470 # to seek to the end of file would result in an InvalidRange error. | |
| 471 if file_length < total_bytes_uploaded: | |
| 472 fp.seek(total_bytes_uploaded) | |
| 473 conn = key.bucket.connection | |
| 474 | |
| 475 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses | |
| 476 # pool connections) because httplib requires a new HTTP connection per | |
| 477 # transaction. (Without this, calling http_conn.getresponse() would get | |
| 478 # "ResponseNotReady".) | |
| 479 http_conn = conn.new_http_connection(self.tracker_uri_host, conn.port, | |
| 480 conn.is_secure) | |
| 481 http_conn.set_debuglevel(conn.debug) | |
| 482 | |
| 483 # Make sure to close http_conn at end so if a local file read | |
| 484 # failure occurs partway through server will terminate current upload | |
| 485 # and can report that progress on next attempt. | |
| 486 try: | |
| 487 return self._upload_file_bytes(conn, http_conn, fp, file_length, | |
| 488 total_bytes_uploaded, cb, num_cb, | |
| 489 headers) | |
| 490 except (ResumableUploadException, socket.error): | |
| 491 resp = self._query_server_state(conn, file_length) | |
| 492 if resp.status == 400: | |
| 493 raise ResumableUploadException('Got 400 response from server ' | |
| 494 'state query after failed resumable upload attempt. This ' | |
| 495 'can happen for various reasons, including specifying an ' | |
| 496 'invalid request (e.g., an invalid canned ACL) or if the ' | |
| 497 'file size changed between upload attempts', | |
| 498 ResumableTransferDisposition.ABORT) | |
| 499 else: | |
| 500 raise | |
| 501 finally: | |
| 502 http_conn.close() | |
| 503 | |
| 504 def _check_final_md5(self, key, etag): | |
| 505 """ | |
| 506 Checks that etag from server agrees with md5 computed before upload. | |
| 507 This is important, since the upload could have spanned a number of | |
| 508 hours and multiple processes (e.g., gsutil runs), and the user could | |
| 509 change some of the file and not realize they have inconsistent data. | |
| 510 """ | |
| 511 if key.bucket.connection.debug >= 1: | |
| 512 print('Checking md5 against etag.') | |
| 513 if key.md5 != etag.strip('"\''): | |
| 514 # Call key.open_read() before attempting to delete the | |
| 515 # (incorrect-content) key, so we perform that request on a | |
| 516 # different HTTP connection. This is neededb because httplib | |
| 517 # will return a "Response not ready" error if you try to perform | |
| 518 # a second transaction on the connection. | |
| 519 key.open_read() | |
| 520 key.close() | |
| 521 key.delete() | |
| 522 raise ResumableUploadException( | |
| 523 'File changed during upload: md5 signature doesn\'t match etag ' | |
| 524 '(incorrect uploaded object deleted)', | |
| 525 ResumableTransferDisposition.ABORT) | |
| 526 | |
| 527 def handle_resumable_upload_exception(self, e, debug): | |
| 528 if (e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS): | |
| 529 if debug >= 1: | |
| 530 print('Caught non-retryable ResumableUploadException (%s); ' | |
| 531 'aborting but retaining tracker file' % e.message) | |
| 532 raise | |
| 533 elif (e.disposition == ResumableTransferDisposition.ABORT): | |
| 534 if debug >= 1: | |
| 535 print('Caught non-retryable ResumableUploadException (%s); ' | |
| 536 'aborting and removing tracker file' % e.message) | |
| 537 self._remove_tracker_file() | |
| 538 raise | |
| 539 else: | |
| 540 if debug >= 1: | |
| 541 print('Caught ResumableUploadException (%s) - will retry' % | |
| 542 e.message) | |
| 543 | |
| 544 def track_progress_less_iterations(self, server_had_bytes_before_attempt, | |
| 545 roll_back_md5=True, debug=0): | |
| 546 # At this point we had a re-tryable failure; see if made progress. | |
| 547 if self.server_has_bytes > server_had_bytes_before_attempt: | |
| 548 self.progress_less_iterations = 0 # If progress, reset counter. | |
| 549 else: | |
| 550 self.progress_less_iterations += 1 | |
| 551 if roll_back_md5: | |
| 552 # Rollback any potential hash updates, as we did not | |
| 553 # make any progress in this iteration. | |
| 554 self.digesters = self.digesters_before_attempt | |
| 555 | |
| 556 if self.progress_less_iterations > self.num_retries: | |
| 557 # Don't retry any longer in the current process. | |
| 558 raise ResumableUploadException( | |
| 559 'Too many resumable upload attempts failed without ' | |
| 560 'progress. You might try this upload again later', | |
| 561 ResumableTransferDisposition.ABORT_CUR_PROCESS) | |
| 562 | |
| 563 # Use binary exponential backoff to desynchronize client requests. | |
| 564 sleep_time_secs = random.random() * (2**self.progress_less_iterations) | |
| 565 if debug >= 1: | |
| 566 print('Got retryable failure (%d progress-less in a row).\n' | |
| 567 'Sleeping %3.1f seconds before re-trying' % | |
| 568 (self.progress_less_iterations, sleep_time_secs)) | |
| 569 time.sleep(sleep_time_secs) | |
| 570 | |
| 571 def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None): | |
| 572 """ | |
| 573 Upload a file to a key into a bucket on GS, using GS resumable upload | |
| 574 protocol. | |
| 575 | |
| 576 :type key: :class:`boto.s3.key.Key` or subclass | |
| 577 :param key: The Key object to which data is to be uploaded | |
| 578 | |
| 579 :type fp: file-like object | |
| 580 :param fp: The file pointer to upload | |
| 581 | |
| 582 :type headers: dict | |
| 583 :param headers: The headers to pass along with the PUT request | |
| 584 | |
| 585 :type cb: function | |
| 586 :param cb: a callback function that will be called to report progress on | |
| 587 the upload. The callback should accept two integer parameters, the | |
| 588 first representing the number of bytes that have been successfully | |
| 589 transmitted to GS, and the second representing the total number of | |
| 590 bytes that need to be transmitted. | |
| 591 | |
| 592 :type num_cb: int | |
| 593 :param num_cb: (optional) If a callback is specified with the cb | |
| 594 parameter, this parameter determines the granularity of the callback | |
| 595 by defining the maximum number of times the callback will be called | |
| 596 during the file transfer. Providing a negative integer will cause | |
| 597 your callback to be called with each buffer read. | |
| 598 | |
| 599 :type hash_algs: dictionary | |
| 600 :param hash_algs: (optional) Dictionary mapping hash algorithm | |
| 601 descriptions to corresponding state-ful hashing objects that | |
| 602 implement update(), digest(), and copy() (e.g. hashlib.md5()). | |
| 603 Defaults to {'md5': md5()}. | |
| 604 | |
| 605 Raises ResumableUploadException if a problem occurs during the transfer. | |
| 606 """ | |
| 607 | |
| 608 if not headers: | |
| 609 headers = {} | |
| 610 # If Content-Type header is present and set to None, remove it. | |
| 611 # This is gsutil's way of asking boto to refrain from auto-generating | |
| 612 # that header. | |
| 613 CT = 'Content-Type' | |
| 614 if CT in headers and headers[CT] is None: | |
| 615 del headers[CT] | |
| 616 | |
| 617 headers['User-Agent'] = UserAgent | |
| 618 | |
| 619 # Determine file size different ways for case where fp is actually a | |
| 620 # wrapper around a Key vs an actual file. | |
| 621 if isinstance(fp, KeyFile): | |
| 622 file_length = fp.getkey().size | |
| 623 else: | |
| 624 fp.seek(0, os.SEEK_END) | |
| 625 file_length = fp.tell() | |
| 626 fp.seek(0) | |
| 627 debug = key.bucket.connection.debug | |
| 628 | |
| 629 # Compute the MD5 checksum on the fly. | |
| 630 if hash_algs is None: | |
| 631 hash_algs = {'md5': md5} | |
| 632 self.digesters = dict( | |
| 633 (alg, hash_algs[alg]()) for alg in hash_algs or {}) | |
| 634 | |
| 635 # Use num-retries from constructor if one was provided; else check | |
| 636 # for a value specified in the boto config file; else default to 5. | |
| 637 if self.num_retries is None: | |
| 638 self.num_retries = config.getint('Boto', 'num_retries', 6) | |
| 639 self.progress_less_iterations = 0 | |
| 640 | |
| 641 while True: # Retry as long as we're making progress. | |
| 642 server_had_bytes_before_attempt = self.server_has_bytes | |
| 643 self.digesters_before_attempt = dict( | |
| 644 (alg, self.digesters[alg].copy()) | |
| 645 for alg in self.digesters) | |
| 646 try: | |
| 647 # Save generation and metageneration in class state so caller | |
| 648 # can find these values, for use in preconditions of future | |
| 649 # operations on the uploaded object. | |
| 650 (etag, self.generation, self.metageneration) = ( | |
| 651 self._attempt_resumable_upload(key, fp, file_length, | |
| 652 headers, cb, num_cb)) | |
| 653 | |
| 654 # Get the final digests for the uploaded content. | |
| 655 for alg in self.digesters: | |
| 656 key.local_hashes[alg] = self.digesters[alg].digest() | |
| 657 | |
| 658 # Upload succceded, so remove the tracker file (if have one). | |
| 659 self._remove_tracker_file() | |
| 660 self._check_final_md5(key, etag) | |
| 661 key.generation = self.generation | |
| 662 if debug >= 1: | |
| 663 print('Resumable upload complete.') | |
| 664 return | |
| 665 except self.RETRYABLE_EXCEPTIONS as e: | |
| 666 if debug >= 1: | |
| 667 print('Caught exception (%s)' % e.__repr__()) | |
| 668 if isinstance(e, IOError) and e.errno == errno.EPIPE: | |
| 669 # Broken pipe error causes httplib to immediately | |
| 670 # close the socket (http://bugs.python.org/issue5542), | |
| 671 # so we need to close the connection before we resume | |
| 672 # the upload (which will cause a new connection to be | |
| 673 # opened the next time an HTTP request is sent). | |
| 674 key.bucket.connection.connection.close() | |
| 675 except ResumableUploadException as e: | |
| 676 self.handle_resumable_upload_exception(e, debug) | |
| 677 | |
| 678 self.track_progress_less_iterations(server_had_bytes_before_attempt, | |
| 679 True, debug) |
