From 8e4ba422e38f1757392fdfe71a39e2efd1b4c947 Mon Sep 17 00:00:00 2001 From: Dinesh N Date: Fri, 11 Mar 2022 16:42:58 +0530 Subject: [PATCH 1/5] Adding patch of connection reset errors --- semantics3/semantics3.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/semantics3/semantics3.py b/semantics3/semantics3.py index c6657e0..eec6a77 100644 --- a/semantics3/semantics3.py +++ b/semantics3/semantics3.py @@ -1,3 +1,4 @@ +import time import json from requests_oauthlib import OAuth1Session from url_normalize import url_normalize @@ -37,9 +38,17 @@ def __init__(self, api_key=None, api_secret=None, endpoint=None, api_base='https self.cache_size = 10 self.api_base = api_base self.timeout = timeout + self.oauth_session_start = int(time.time()) + + def _validate_session(self): + duration = int(time.time()) - self.oauth_session_start + if duration >= 60: + self.oauth.close() + self.__init__() def fetch(self, method, endpoint, params): api_endpoint = url_normalize(self.api_base + endpoint) + self._validate_session() if method.lower() in ['get', 'delete']: content = self.oauth.request( method, From bf3553cd880219ef349913ca27c8d57e170998e4 Mon Sep 17 00:00:00 2001 From: Dinesh N Date: Fri, 11 Mar 2022 23:04:26 +0530 Subject: [PATCH 2/5] Handling connection resets and gateway timeouts elegantly --- semantics3/semantics3.py | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/semantics3/semantics3.py b/semantics3/semantics3.py index eec6a77..4068100 100644 --- a/semantics3/semantics3.py +++ b/semantics3/semantics3.py @@ -1,7 +1,10 @@ import time import json +import logging + from requests_oauthlib import OAuth1Session from url_normalize import url_normalize +from urllib3.exceptions import HTTPError try: import urllib.parse as urllib @@ -40,15 +43,8 @@ def __init__(self, api_key=None, api_secret=None, endpoint=None, api_base='https self.timeout = timeout self.oauth_session_start = int(time.time()) - def _validate_session(self): - duration = int(time.time()) - self.oauth_session_start - if duration >= 60: - self.oauth.close() - self.__init__() - def fetch(self, method, endpoint, params): api_endpoint = url_normalize(self.api_base + endpoint) - self._validate_session() if method.lower() in ['get', 'delete']: content = self.oauth.request( method, @@ -56,7 +52,7 @@ def fetch(self, method, endpoint, params): params = params, headers={'User-Agent':'Semantics3 Python Lib/0.2'}, timeout=self.timeout - ) + ) else: content = self.oauth.request( method, @@ -64,7 +60,7 @@ def fetch(self, method, endpoint, params): data = json.dumps(params), headers={'User-Agent':'Semantics3 Python Lib/0.2', 'Content-Type':'application/json'}, timeout=self.timeout - ) + ) return content def remove(self, endpoint, *fields): @@ -128,7 +124,25 @@ def query(self, method, endpoint, kwargs): params = { 'q' : json.dumps(kwargs) } else: params = kwargs - response = self.fetch(method, endpoint, params) + + #-- Everytime we receive either of the following errors, retry the request + #-- 1. Connection reset by peer + try: + response = self.fetch(method, endpoint, params) + except (ConnectionResetError, ConnectionError) as e: + duration = int(time.time()) - self.oauth_session_start + logging.debug("Encountered connection error, duration since client created: {d}s, Recreating semantics3 client".format(d=duration)) + logging.debug(str(e)) + self.oauth.close() #-- Close connection pool associated with current session + self.__init__() #-- Initiate new client + return self.query(method, endpoint, kwargs) + + #-- 2. Gateway timeout (status code 504) error retry + if response.status_code == 504: + logging.debug("Encountered a gateway timeout error, Sending the request again") + return self.query(method, endpoint, kwargs) + + #-- Check if response received is a malformed json try: response_json = response.json() except: From 5a9425605d6aa85b0023323d506320c510cd08b1 Mon Sep 17 00:00:00 2001 From: Dinesh N Date: Mon, 14 Mar 2022 12:15:07 +0530 Subject: [PATCH 3/5] Added stop over for retries --- semantics3/semantics3.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/semantics3/semantics3.py b/semantics3/semantics3.py index 4068100..f51370a 100644 --- a/semantics3/semantics3.py +++ b/semantics3/semantics3.py @@ -119,28 +119,41 @@ def iter(self): if(offset < total_count): self.run_query() - def query(self, method, endpoint, kwargs): + def query(self, method, endpoint, kwargs, retry_attempt=0): if method.lower() == "get": params = { 'q' : json.dumps(kwargs) } else: params = kwargs + #-- Following is a patch/hack made to circumvent issues encountered during + #-- migrating sem3 infra to k8s sem3stage and sem3prod clusters accordingly + #-- however this doesn't address the root cause, it only bypass the issue by retries on client side + #-- it performs auto retry (upto 1 attempt) whenever we encounter following errors + #-- Everytime we receive either of the following errors, retry the request #-- 1. Connection reset by peer try: response = self.fetch(method, endpoint, params) except (ConnectionResetError, ConnectionError) as e: duration = int(time.time()) - self.oauth_session_start - logging.debug("Encountered connection error, duration since client created: {d}s, Recreating semantics3 client".format(d=duration)) - logging.debug(str(e)) - self.oauth.close() #-- Close connection pool associated with current session - self.__init__() #-- Initiate new client - return self.query(method, endpoint, kwargs) + if retry_attempt <= 1: + logging.debug("Encountered connection error, duration since client created: {d}s, Recreating semantics3 client".format(d=duration)) + logging.debug(str(e)) + self.oauth.close() #-- Close connection pool associated with current session + self.__init__() #-- Initiate new client + return self.query(method, endpoint, kwargs, retry_attempt+1) + else: + logging.debug("Encountered connection error, duration since client created: {d}s, Exceeded max retry attempts, Skipping") + logging.debug(str(e)) + #-- 2. Gateway timeout (status code 504) error retry if response.status_code == 504: - logging.debug("Encountered a gateway timeout error, Sending the request again") - return self.query(method, endpoint, kwargs) + if retry_attempt <= 1: + logging.debug("Encountered a gateway timeout error, Sending the request again") + return self.query(method, endpoint, kwargs, retry_attempt+1) + else: + logging.debug("Encountered a gateway timeout error, Exceeded max retry attempts, Skipping") #-- Check if response received is a malformed json try: From ae0f43be4b4e33a5e93a398c38016eebf2e24abd Mon Sep 17 00:00:00 2001 From: Dinesh N Date: Thu, 17 Mar 2022 14:41:22 +0530 Subject: [PATCH 4/5] Added exception handling for Connection Aborted / RequestExceptions --- semantics3/semantics3.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/semantics3/semantics3.py b/semantics3/semantics3.py index f51370a..af81173 100644 --- a/semantics3/semantics3.py +++ b/semantics3/semantics3.py @@ -1,10 +1,12 @@ +import re import time import json import logging from requests_oauthlib import OAuth1Session from url_normalize import url_normalize -from urllib3.exceptions import HTTPError +#from urllib3.exceptions import HTTPError, ProtocolError +from requests.exceptions import ConnectionError, RequestException try: import urllib.parse as urllib @@ -119,6 +121,12 @@ def iter(self): if(offset < total_count): self.run_query() + def _reinitialize_client(self): + duration = int(time.time()) - self.oauth_session_start + logging.debug("Encountered connection error. Duration since client created: {d}s, Recreating semantics3 client".format(d=duration)) + self.oauth.close() #-- Close connection pool associated with current session + self.__init__() #-- Initiate new client + def query(self, method, endpoint, kwargs, retry_attempt=0): if method.lower() == "get": params = { 'q' : json.dumps(kwargs) } @@ -134,18 +142,17 @@ def query(self, method, endpoint, kwargs, retry_attempt=0): #-- 1. Connection reset by peer try: response = self.fetch(method, endpoint, params) - except (ConnectionResetError, ConnectionError) as e: - duration = int(time.time()) - self.oauth_session_start - if retry_attempt <= 1: - logging.debug("Encountered connection error, duration since client created: {d}s, Recreating semantics3 client".format(d=duration)) - logging.debug(str(e)) - self.oauth.close() #-- Close connection pool associated with current session - self.__init__() #-- Initiate new client + except (ConnectionResetError, ConnectionAbortedError, ConnectionError, RequestException) as e: + err_str = str(e) + if retry_attempt <= 1 and re.search("Connection (?:aborted|reset by peer)", err_str): + logging.debug(err_str) + self._reinitialize_client() return self.query(method, endpoint, kwargs, retry_attempt+1) + elif retry_attempt > 1: + logging.debug("Exceeded max retry attempts, Skipping. Error: {e}".format(e=str(e))) else: - logging.debug("Encountered connection error, duration since client created: {d}s, Exceeded max retry attempts, Skipping") - logging.debug(str(e)) - + logging.debug("Encountered unknown error: {e}".format(e=str(e))) + #-- 2. Gateway timeout (status code 504) error retry if response.status_code == 504: From 369e6c52b460ad44c2bfedb22abcf27785f807cb Mon Sep 17 00:00:00 2001 From: Dinesh N Date: Thu, 17 Mar 2022 16:00:45 +0530 Subject: [PATCH 5/5] Minor bug fixes --- semantics3/semantics3.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/semantics3/semantics3.py b/semantics3/semantics3.py index af81173..a2f57cc 100644 --- a/semantics3/semantics3.py +++ b/semantics3/semantics3.py @@ -125,7 +125,8 @@ def _reinitialize_client(self): duration = int(time.time()) - self.oauth_session_start logging.debug("Encountered connection error. Duration since client created: {d}s, Recreating semantics3 client".format(d=duration)) self.oauth.close() #-- Close connection pool associated with current session - self.__init__() #-- Initiate new client + self.oauth = OAuth1Session(self.api_key, client_secret=self.api_secret) + self.oauth_session_start = int(time.time()) def query(self, method, endpoint, kwargs, retry_attempt=0): if method.lower() == "get": @@ -133,6 +134,8 @@ def query(self, method, endpoint, kwargs, retry_attempt=0): else: params = kwargs + response = None + #-- Following is a patch/hack made to circumvent issues encountered during #-- migrating sem3 infra to k8s sem3stage and sem3prod clusters accordingly #-- however this doesn't address the root cause, it only bypass the issue by retries on client side @@ -148,19 +151,22 @@ def query(self, method, endpoint, kwargs, retry_attempt=0): logging.debug(err_str) self._reinitialize_client() return self.query(method, endpoint, kwargs, retry_attempt+1) - elif retry_attempt > 1: + elif retry_attempt > 1 and re.search("Connection (?:aborted|reset by peer)", err_str): logging.debug("Exceeded max retry attempts, Skipping. Error: {e}".format(e=str(e))) + raise Semantics3Error(500, str(e)) else: logging.debug("Encountered unknown error: {e}".format(e=str(e))) + raise Semantics3Error(500, str(e)) - #-- 2. Gateway timeout (status code 504) error retry + #-- 2. Gateway timeout (status code 504) error if response.status_code == 504: if retry_attempt <= 1: logging.debug("Encountered a gateway timeout error, Sending the request again") return self.query(method, endpoint, kwargs, retry_attempt+1) else: logging.debug("Encountered a gateway timeout error, Exceeded max retry attempts, Skipping") + raise Semantics3Error(504, 'Bad Gateway') #-- Check if response received is a malformed json try: