diff --git a/semantics3/semantics3.py b/semantics3/semantics3.py index c6657e0..a2f57cc 100644 --- a/semantics3/semantics3.py +++ b/semantics3/semantics3.py @@ -1,6 +1,12 @@ +import re +import time import json +import logging + from requests_oauthlib import OAuth1Session from url_normalize import url_normalize +#from urllib3.exceptions import HTTPError, ProtocolError +from requests.exceptions import ConnectionError, RequestException try: import urllib.parse as urllib @@ -37,6 +43,7 @@ def __init__(self, api_key=None, api_secret=None, endpoint=None, api_base='https self.cache_size = 10 self.api_base = api_base self.timeout = timeout + self.oauth_session_start = int(time.time()) def fetch(self, method, endpoint, params): api_endpoint = url_normalize(self.api_base + endpoint) @@ -47,7 +54,7 @@ def fetch(self, method, endpoint, params): params = params, headers={'User-Agent':'Semantics3 Python Lib/0.2'}, timeout=self.timeout - ) + ) else: content = self.oauth.request( method, @@ -55,7 +62,7 @@ def fetch(self, method, endpoint, params): data = json.dumps(params), headers={'User-Agent':'Semantics3 Python Lib/0.2', 'Content-Type':'application/json'}, timeout=self.timeout - ) + ) return content def remove(self, endpoint, *fields): @@ -114,12 +121,54 @@ def iter(self): if(offset < total_count): self.run_query() - def query(self, method, endpoint, kwargs): + def _reinitialize_client(self): + duration = int(time.time()) - self.oauth_session_start + logging.debug("Encountered connection error. Duration since client created: {d}s, Recreating semantics3 client".format(d=duration)) + self.oauth.close() #-- Close connection pool associated with current session + self.oauth = OAuth1Session(self.api_key, client_secret=self.api_secret) + self.oauth_session_start = int(time.time()) + + def query(self, method, endpoint, kwargs, retry_attempt=0): if method.lower() == "get": params = { 'q' : json.dumps(kwargs) } else: params = kwargs - response = self.fetch(method, endpoint, params) + + response = None + + #-- Following is a patch/hack made to circumvent issues encountered during + #-- migrating sem3 infra to k8s sem3stage and sem3prod clusters accordingly + #-- however this doesn't address the root cause, it only bypass the issue by retries on client side + #-- it performs auto retry (upto 1 attempt) whenever we encounter following errors + + #-- Everytime we receive either of the following errors, retry the request + #-- 1. Connection reset by peer + try: + response = self.fetch(method, endpoint, params) + except (ConnectionResetError, ConnectionAbortedError, ConnectionError, RequestException) as e: + err_str = str(e) + if retry_attempt <= 1 and re.search("Connection (?:aborted|reset by peer)", err_str): + logging.debug(err_str) + self._reinitialize_client() + return self.query(method, endpoint, kwargs, retry_attempt+1) + elif retry_attempt > 1 and re.search("Connection (?:aborted|reset by peer)", err_str): + logging.debug("Exceeded max retry attempts, Skipping. Error: {e}".format(e=str(e))) + raise Semantics3Error(500, str(e)) + else: + logging.debug("Encountered unknown error: {e}".format(e=str(e))) + raise Semantics3Error(500, str(e)) + + + #-- 2. Gateway timeout (status code 504) error + if response.status_code == 504: + if retry_attempt <= 1: + logging.debug("Encountered a gateway timeout error, Sending the request again") + return self.query(method, endpoint, kwargs, retry_attempt+1) + else: + logging.debug("Encountered a gateway timeout error, Exceeded max retry attempts, Skipping") + raise Semantics3Error(504, 'Bad Gateway') + + #-- Check if response received is a malformed json try: response_json = response.json() except: