2626import copy
2727import gzip
2828import httplib2
29+ import logging
2930import mimeparse
3031import mimetypes
3132import os
33+ import random
3234import sys
35+ import time
3336import urllib
3437import urlparse
3538import uuid
@@ -508,9 +511,20 @@ def __init__(self, fd, request, chunksize=DEFAULT_CHUNK_SIZE):
508511 self ._original_follow_redirects = request .http .follow_redirects
509512 request .http .follow_redirects = False
510513
511- def next_chunk (self ):
514+ # Stubs for testing.
515+ self ._sleep = time .sleep
516+ self ._rand = random .random
517+
518+ @util .positional (1 )
519+ def next_chunk (self , num_retries = 0 ):
512520 """Get the next chunk of the download.
513521
522+ Args:
523+ num_retries: Integer, number of times to retry 500's with randomized
524+ exponential backoff. If all retries fail, the raised HttpError
525+ represents the last request. If zero (default), we attempt the
526+ request only once.
527+
514528 Returns:
515529 (status, done): (MediaDownloadStatus, boolean)
516530 The value of 'done' will be True when the media has been fully
@@ -526,7 +540,17 @@ def next_chunk(self):
526540 }
527541 http = self ._request .http
528542
529- resp , content = http .request (self ._uri , headers = headers )
543+ for retry_num in xrange (num_retries + 1 ):
544+ if retry_num > 0 :
545+ self ._sleep (self ._rand () * 2 ** retry_num )
546+ logging .warning (
547+ 'Retry #%d for media download: GET %s, following status: %d'
548+ % (retry_num , self ._uri , resp .status ))
549+
550+ resp , content = http .request (self ._uri , headers = headers )
551+ if resp .status < 500 :
552+ break
553+
530554 if resp .status in [301 , 302 , 303 , 307 , 308 ] and 'location' in resp :
531555 self ._uri = resp ['location' ]
532556 resp , content = http .request (self ._uri , headers = headers )
@@ -635,13 +659,21 @@ def __init__(self, http, postproc, uri,
635659 # The bytes that have been uploaded.
636660 self .resumable_progress = 0
637661
662+ # Stubs for testing.
663+ self ._rand = random .random
664+ self ._sleep = time .sleep
665+
638666 @util .positional (1 )
639- def execute (self , http = None ):
667+ def execute (self , http = None , num_retries = 0 ):
640668 """Execute the request.
641669
642670 Args:
643671 http: httplib2.Http, an http object to be used in place of the
644672 one the HttpRequest request object was constructed with.
673+ num_retries: Integer, number of times to retry 500's with randomized
674+ exponential backoff. If all retries fail, the raised HttpError
675+ represents the last request. If zero (default), we attempt the
676+ request only once.
645677
646678 Returns:
647679 A deserialized object model of the response body as determined
@@ -653,33 +685,46 @@ def execute(self, http=None):
653685 """
654686 if http is None :
655687 http = self .http
688+
656689 if self .resumable :
657690 body = None
658691 while body is None :
659- _ , body = self .next_chunk (http = http )
692+ _ , body = self .next_chunk (http = http , num_retries = num_retries )
660693 return body
661- else :
662- if 'content-length' not in self .headers :
663- self .headers ['content-length' ] = str (self .body_size )
664- # If the request URI is too long then turn it into a POST request.
665- if len (self .uri ) > MAX_URI_LENGTH and self .method == 'GET' :
666- self .method = 'POST'
667- self .headers ['x-http-method-override' ] = 'GET'
668- self .headers ['content-type' ] = 'application/x-www-form-urlencoded'
669- parsed = urlparse .urlparse (self .uri )
670- self .uri = urlparse .urlunparse (
671- (parsed .scheme , parsed .netloc , parsed .path , parsed .params , None ,
672- None )
673- )
674- self .body = parsed .query
675- self .headers ['content-length' ] = str (len (self .body ))
694+
695+ # Non-resumable case.
696+
697+ if 'content-length' not in self .headers :
698+ self .headers ['content-length' ] = str (self .body_size )
699+ # If the request URI is too long then turn it into a POST request.
700+ if len (self .uri ) > MAX_URI_LENGTH and self .method == 'GET' :
701+ self .method = 'POST'
702+ self .headers ['x-http-method-override' ] = 'GET'
703+ self .headers ['content-type' ] = 'application/x-www-form-urlencoded'
704+ parsed = urlparse .urlparse (self .uri )
705+ self .uri = urlparse .urlunparse (
706+ (parsed .scheme , parsed .netloc , parsed .path , parsed .params , None ,
707+ None )
708+ )
709+ self .body = parsed .query
710+ self .headers ['content-length' ] = str (len (self .body ))
711+
712+ # Handle retries for server-side errors.
713+ for retry_num in xrange (num_retries + 1 ):
714+ if retry_num > 0 :
715+ self ._sleep (self ._rand () * 2 ** retry_num )
716+ logging .warning ('Retry #%d for request: %s %s, following status: %d'
717+ % (retry_num , self .method , self .uri , resp .status ))
676718
677719 resp , content = http .request (str (self .uri ), method = str (self .method ),
678720 body = self .body , headers = self .headers )
679- for callback in self .response_callbacks :
680- callback (resp )
681- if resp .status >= 300 :
682- raise HttpError (resp , content , uri = self .uri )
721+ if resp .status < 500 :
722+ break
723+
724+ for callback in self .response_callbacks :
725+ callback (resp )
726+ if resp .status >= 300 :
727+ raise HttpError (resp , content , uri = self .uri )
683728 return self .postproc (resp , content )
684729
685730 @util .positional (2 )
@@ -695,7 +740,7 @@ def cb(resp):
695740 self .response_callbacks .append (cb )
696741
697742 @util .positional (1 )
698- def next_chunk (self , http = None ):
743+ def next_chunk (self , http = None , num_retries = 0 ):
699744 """Execute the next step of a resumable upload.
700745
701746 Can only be used if the method being executed supports media uploads and
@@ -717,6 +762,14 @@ def next_chunk(self, http=None):
717762 print "Upload %d%% complete." % int(status.progress() * 100)
718763
719764
765+ Args:
766+ http: httplib2.Http, an http object to be used in place of the
767+ one the HttpRequest request object was constructed with.
768+ num_retries: Integer, number of times to retry 500's with randomized
769+ exponential backoff. If all retries fail, the raised HttpError
770+ represents the last request. If zero (default), we attempt the
771+ request only once.
772+
720773 Returns:
721774 (status, body): (ResumableMediaStatus, object)
722775 The body will be None until the resumable media is fully uploaded.
@@ -740,9 +793,19 @@ def next_chunk(self, http=None):
740793 start_headers ['X-Upload-Content-Length' ] = size
741794 start_headers ['content-length' ] = str (self .body_size )
742795
743- resp , content = http .request (self .uri , method = self .method ,
744- body = self .body ,
745- headers = start_headers )
796+ for retry_num in xrange (num_retries + 1 ):
797+ if retry_num > 0 :
798+ self ._sleep (self ._rand () * 2 ** retry_num )
799+ logging .warning (
800+ 'Retry #%d for resumable URI request: %s %s, following status: %d'
801+ % (retry_num , self .method , self .uri , resp .status ))
802+
803+ resp , content = http .request (self .uri , method = self .method ,
804+ body = self .body ,
805+ headers = start_headers )
806+ if resp .status < 500 :
807+ break
808+
746809 if resp .status == 200 and 'location' in resp :
747810 self .resumable_uri = resp ['location' ]
748811 else :
@@ -794,13 +857,23 @@ def next_chunk(self, http=None):
794857 # calculate the size when working with _StreamSlice.
795858 'Content-Length' : str (chunk_end - self .resumable_progress + 1 )
796859 }
797- try :
798- resp , content = http .request (self .resumable_uri , method = 'PUT' ,
799- body = data ,
800- headers = headers )
801- except :
802- self ._in_error_state = True
803- raise
860+
861+ for retry_num in xrange (num_retries + 1 ):
862+ if retry_num > 0 :
863+ self ._sleep (self ._rand () * 2 ** retry_num )
864+ logging .warning (
865+ 'Retry #%d for media upload: %s %s, following status: %d'
866+ % (retry_num , self .method , self .uri , resp .status ))
867+
868+ try :
869+ resp , content = http .request (self .resumable_uri , method = 'PUT' ,
870+ body = data ,
871+ headers = headers )
872+ except :
873+ self ._in_error_state = True
874+ raise
875+ if resp .status < 500 :
876+ break
804877
805878 return self ._process_response (resp , content )
806879
@@ -841,6 +914,8 @@ def to_json(self):
841914 d ['resumable' ] = self .resumable .to_json ()
842915 del d ['http' ]
843916 del d ['postproc' ]
917+ del d ['_sleep' ]
918+ del d ['_rand' ]
844919
845920 return simplejson .dumps (d )
846921
0 commit comments