66from functools import wraps
77import json
88import socket
9+ import threading
910import random
1011import requests
1112import requests .exceptions
@@ -73,7 +74,7 @@ def __init__(self,
7374 proxies = None ,
7475 ):
7576 """Construct a new InfluxDBClient object."""
76- self ._host = host
77+ self .__host = host
7778 self ._port = port
7879 self ._username = username
7980 self ._password = password
@@ -98,7 +99,7 @@ def __init__(self,
9899 else :
99100 self ._proxies = proxies
100101
101- self ._baseurl = "{0}://{1}:{2}" .format (
102+ self .__baseurl = "{0}://{1}:{2}" .format (
102103 self ._scheme ,
103104 self ._host ,
104105 self ._port )
@@ -108,6 +109,22 @@ def __init__(self,
108109 'Accept' : 'text/plain'
109110 }
110111
112+ # _baseurl and _host are properties to allow InfluxDBClusterClient
113+ # to override them with thread-local variables
114+ @property
115+ def _baseurl (self ):
116+ return self ._get_baseurl ()
117+
118+ def _get_baseurl (self ):
119+ return self .__baseurl
120+
121+ @property
122+ def _host (self ):
123+ return self ._get_host ()
124+
125+ def _get_host (self ):
126+ return self .__host
127+
111128 @staticmethod
112129 def from_DSN (dsn , ** kwargs ):
113130 """Return an instance of :class:`~.InfluxDBClient` from the provided
@@ -740,6 +757,8 @@ def __init__(self,
740757 self .bad_hosts = [] # Corresponding server has failures in history
741758 self .shuffle = shuffle
742759 host , port = self .hosts [0 ]
760+ self ._hosts_lock = threading .Lock ()
761+ self ._thread_local = threading .local ()
743762 self ._client = client_base_class (host = host ,
744763 port = port ,
745764 username = username ,
@@ -757,6 +776,10 @@ def __init__(self,
757776
758777 setattr (self , method , self ._make_func (orig_attr ))
759778
779+ self ._client ._get_host = self ._get_host
780+ self ._client ._get_baseurl = self ._get_baseurl
781+ self ._update_client_host (self .hosts [0 ])
782+
760783 @staticmethod
761784 def from_DSN (dsn , client_base_class = InfluxDBClient ,
762785 shuffle = True , ** kwargs ):
@@ -791,19 +814,29 @@ def from_DSN(dsn, client_base_class=InfluxDBClient,
791814 return cluster_client
792815
793816 def _update_client_host (self , host ):
794- self ._client ._host , self ._client ._port = host
795- self ._client ._baseurl = "{0}://{1}:{2}" .format (self ._client ._scheme ,
796- self ._client ._host ,
797- self ._client ._port )
817+ self ._thread_local .host , self ._thread_local .port = host
818+ self ._thread_local .baseurl = "{0}://{1}:{2}" .format (
819+ self ._client ._scheme ,
820+ self ._client ._host ,
821+ self ._client ._port
822+ )
823+
824+ def _get_baseurl (self ):
825+ return self ._thread_local .baseurl
826+
827+ def _get_host (self ):
828+ return self ._thread_local .host
798829
799830 def _make_func (self , orig_func ):
800831
801832 @wraps (orig_func )
802833 def func (* args , ** kwargs ):
803- if self .shuffle :
804- random .shuffle (self .hosts )
834+ with self ._hosts_lock :
835+ if self .shuffle :
836+ random .shuffle (self .hosts )
837+
838+ hosts = self .hosts + self .bad_hosts
805839
806- hosts = self .hosts + self .bad_hosts
807840 for h in hosts :
808841 bad_host = False
809842 try :
@@ -815,13 +848,15 @@ def func(*args, **kwargs):
815848 except Exception as e :
816849 # Errors that might caused by server failure, try another
817850 bad_host = True
818- if h in self .hosts :
819- self .hosts .remove (h )
820- self .bad_hosts .append (h )
851+ with self ._hosts_lock :
852+ if h in self .hosts :
853+ self .hosts .remove (h )
854+ self .bad_hosts .append (h )
821855 finally :
822- if not bad_host and h in self .bad_hosts :
823- self .bad_hosts .remove (h )
824- self .hosts .append (h )
856+ with self ._hosts_lock :
857+ if not bad_host and h in self .bad_hosts :
858+ self .bad_hosts .remove (h )
859+ self .hosts .append (h )
825860
826861 raise InfluxDBServerError ("InfluxDB: no viable server!" )
827862
0 commit comments