11from decimal import Decimal
2+
3+ import errno
4+ import logging
25import math
36import time
47import threading
1417from databricks .sql .auth .authenticators import CredentialsProvider
1518from databricks .sql .thrift_api .TCLIService import TCLIService , ttypes
1619from databricks .sql import *
20+ from databricks .sql .thrift_api .TCLIService .TCLIService import (
21+ Client as TCLIServiceClient ,
22+ )
1723from databricks .sql .utils import (
1824 ArrowQueue ,
1925 ExecuteResponse ,
3844 "_retry_delay_max" : (float , 60 , 5 , 3600 ),
3945 "_retry_stop_after_attempts_count" : (int , 30 , 1 , 60 ),
4046 "_retry_stop_after_attempts_duration" : (float , 900 , 1 , 86400 ),
47+ "_retry_delay_default" : (float , 5 , 1 , 60 ),
4148}
4249
4350
@@ -70,6 +77,8 @@ def __init__(
7077 # _retry_delay_min (default: 1)
7178 # _retry_delay_max (default: 60)
7279 # {min,max} pre-retry delay bounds
80+ # _retry_delay_default (default: 5)
81+ # Only used when GetOperationStatus fails due to a TCP/OS Error.
7382 # _retry_stop_after_attempts_count (default: 30)
7483 # total max attempts during retry sequence
7584 # _retry_stop_after_attempts_duration (default: 900)
@@ -160,7 +169,7 @@ def _initialize_retry_args(self, kwargs):
160169 "retry parameter: {} given_or_default {}" .format (key , given_or_default )
161170 )
162171 if bound != given_or_default :
163- logger .warn (
172+ logger .warning (
164173 "Override out of policy retry parameter: "
165174 + "{} given {}, restricted to {}" .format (
166175 key , given_or_default , bound
@@ -245,7 +254,9 @@ def _handle_request_error(self, error_info, attempt, elapsed):
245254 # FUTURE: Consider moving to https://github.com/litl/backoff or
246255 # https://github.com/jd/tenacity for retry logic.
247256 def make_request (self , method , request ):
248- """Execute given request, attempting retries when receiving HTTP 429/503.
257+ """Execute given request, attempting retries when
258+ 1. Receiving HTTP 429/503 from server
259+ 2. OSError is raised during a GetOperationStatus
249260
250261 For delay between attempts, honor the given Retry-After header, but with bounds.
251262 Use lower bound of expontial-backoff based on _retry_delay_min,
@@ -262,17 +273,21 @@ def make_request(self, method, request):
262273 def get_elapsed ():
263274 return time .time () - t0
264275
276+ def bound_retry_delay (attempt , proposed_delay ):
277+ """bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay]"""
278+ delay = int (proposed_delay )
279+ delay = max (delay , self ._retry_delay_min * math .pow (1.5 , attempt - 1 ))
280+ delay = min (delay , self ._retry_delay_max )
281+ return delay
282+
265283 def extract_retry_delay (attempt ):
266284 # encapsulate retry checks, returns None || delay-in-secs
267285 # Retry IFF 429/503 code + Retry-After header set
268286 http_code = getattr (self ._transport , "code" , None )
269287 retry_after = getattr (self ._transport , "headers" , {}).get ("Retry-After" )
270288 if http_code in [429 , 503 ] and retry_after :
271289 # bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay]
272- delay = int (retry_after )
273- delay = max (delay , self ._retry_delay_min * math .pow (1.5 , attempt - 1 ))
274- delay = min (delay , self ._retry_delay_max )
275- return delay
290+ return bound_retry_delay (attempt , int (retry_after ))
276291 return None
277292
278293 def attempt_request (attempt ):
@@ -281,24 +296,57 @@ def attempt_request(attempt):
281296 # - non-None method_return -> success, return and be done
282297 # - non-None retry_delay -> sleep delay before retry
283298 # - error, error_message always set when available
299+
300+ error , error_message , retry_delay = None , None , None
284301 try :
285302 logger .debug ("Sending request: {}" .format (request ))
286303 response = method (request )
287304 logger .debug ("Received response: {}" .format (response ))
288305 return response
289- except Exception as error :
306+ except OSError as err :
307+ error = err
308+ error_message = str (err )
309+
310+ gos_name = TCLIServiceClient .GetOperationStatus .__name__
311+ if method .__name__ == gos_name :
312+ retry_delay = bound_retry_delay (attempt , self ._retry_delay_default )
313+
314+ # fmt: off
315+ # The built-in errno package encapsulates OSError codes, which are OS-specific.
316+ # log.info for errors we believe are not unusual or unexpected. log.warn for
317+ # for others like EEXIST, EBADF, ERANGE which are not expected in this context.
318+ #
319+ # I manually tested this retry behaviour using mitmweb and confirmed that
320+ # GetOperationStatus requests are retried when I forced network connection
321+ # interruptions / timeouts / reconnects. See #24 for more info.
322+ # | Debian | Darwin |
323+ info_errs = [ # |--------|--------|
324+ errno .ESHUTDOWN , # | 32 | 32 |
325+ errno .EAFNOSUPPORT , # | 97 | 47 |
326+ errno .ECONNRESET , # | 104 | 54 |
327+ errno .ETIMEDOUT , # | 110 | 60 |
328+ ]
329+
330+ # fmt: on
331+ log_string = f"{ gos_name } failed with code { err .errno } and will attempt to retry"
332+ if err .errno in info_errs :
333+ logger .info (log_string )
334+ else :
335+ logger .warning (log_string )
336+ except Exception as err :
337+ error = err
290338 retry_delay = extract_retry_delay (attempt )
291339 error_message = ThriftBackend ._extract_error_message_from_headers (
292340 getattr (self ._transport , "headers" , {})
293341 )
294- return RequestErrorInfo (
295- error = error ,
296- error_message = error_message ,
297- retry_delay = retry_delay ,
298- http_code = getattr (self ._transport , "code" , None ),
299- method = method .__name__ ,
300- request = request ,
301- )
342+ return RequestErrorInfo (
343+ error = error ,
344+ error_message = error_message ,
345+ retry_delay = retry_delay ,
346+ http_code = getattr (self ._transport , "code" , None ),
347+ method = method .__name__ ,
348+ request = request ,
349+ )
302350
303351 # The real work:
304352 # - for each available attempt:
0 commit comments