@@ -140,36 +140,10 @@ def from_DSN(dsn, **kwargs):
140140 additional `udp_port` parameter (cf. examples).
141141 """
142142
143- init_args = {}
144- conn_params = urlparse (dsn )
145- scheme_info = conn_params .scheme .split ('+' )
146- if len (scheme_info ) == 1 :
147- scheme = scheme_info [0 ]
148- modifier = None
149- else :
150- modifier , scheme = scheme_info
151-
152- if scheme != 'influxdb' :
153- raise ValueError ('Unknown scheme "{}".' .format (scheme ))
154- if modifier :
155- if modifier == 'udp' :
156- init_args ['use_udp' ] = True
157- elif modifier == 'https' :
158- init_args ['ssl' ] = True
159- else :
160- raise ValueError ('Unknown modifier "{}".' .format (modifier ))
161-
162- if conn_params .hostname :
163- init_args ['host' ] = conn_params .hostname
164- if conn_params .port :
165- init_args ['port' ] = conn_params .port
166- if conn_params .username :
167- init_args ['username' ] = conn_params .username
168- if conn_params .password :
169- init_args ['password' ] = conn_params .password
170- if conn_params .path and len (conn_params .path ) > 1 :
171- init_args ['database' ] = conn_params .path [1 :]
172-
143+ init_args = parse_dsn (dsn )
144+ host , port = init_args .pop ('hosts' )[0 ]
145+ init_args ['host' ] = host
146+ init_args ['port' ] = port
173147 init_args .update (kwargs )
174148
175149 return InfluxDBClient (** init_args )
@@ -732,8 +706,8 @@ def send_packet(self, packet):
732706
733707class InfluxDBClusterClient (object ):
734708 """The :class:`~.InfluxDBClusterClient` is the client for connecting
735- to a cluster of InfluxDB servers. It's basically a proxy to multiple
736- InfluxDBClients .
709+ to a cluster of InfluxDB servers. Each query hits different host from the
710+ list of hosts .
737711
738712 :param hosts: all hosts to be included in the cluster, each of which
739713 should be in the format (address, port),
@@ -743,7 +717,7 @@ class InfluxDBClusterClient(object):
743717 :param shuffle: whether the queries should hit servers evenly(randomly),
744718 defaults to True
745719 :type shuffle: bool
746- :param client_base_class: the base class for all clients in the cluster.
720+ :param client_base_class: the base class for the cluster client .
747721 This parameter is used to enable the support of different client
748722 types. Defaults to :class:`~.InfluxDBClient`
749723 """
@@ -761,26 +735,27 @@ def __init__(self,
761735 shuffle = True ,
762736 client_base_class = InfluxDBClient ,
763737 ):
764- self .clients = []
765- self .bad_clients = [] # Corresponding server has failures in history
738+ self .clients = [self ] # Keep it backwards compatible
739+ self .hosts = hosts
740+ self .bad_hosts = [] # Corresponding server has failures in history
766741 self .shuffle = shuffle
767- for h in hosts :
768- self .clients .append (client_base_class (host = h [0 ], port = h [1 ],
769- username = username ,
770- password = password ,
771- database = database ,
772- ssl = ssl ,
773- verify_ssl = verify_ssl ,
774- timeout = timeout ,
775- use_udp = use_udp ,
776- udp_port = udp_port ))
742+ host , port = self .hosts [0 ]
743+ self ._client = client_base_class (host = host ,
744+ port = port ,
745+ username = username ,
746+ password = password ,
747+ database = database ,
748+ ssl = ssl ,
749+ verify_ssl = verify_ssl ,
750+ timeout = timeout ,
751+ use_udp = use_udp ,
752+ udp_port = udp_port )
777753 for method in dir (client_base_class ):
778- if method .startswith ('_' ):
779- continue
780- orig_func = getattr (client_base_class , method )
781- if not callable (orig_func ):
754+ orig_attr = getattr (client_base_class , method , '' )
755+ if method .startswith ('_' ) or not callable (orig_attr ):
782756 continue
783- setattr (self , method , self ._make_func (orig_func ))
757+
758+ setattr (self , method , self ._make_func (orig_attr ))
784759
785760 @staticmethod
786761 def from_DSN (dsn , client_base_class = InfluxDBClient ,
@@ -803,53 +778,100 @@ def from_DSN(dsn, client_base_class=InfluxDBClient,
803778 @host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
804779 >> type(cluster)
805780 <class 'influxdb.client.InfluxDBClusterClient'>
806- >> cluster.clients
807- [<influxdb.client.InfluxDBClient at 0x7feb480295d0>,
781+ >> cluster.hosts
782+ [('host1', 8086), ('host2', 8086)]
783+ >> cluster._client
808784 <influxdb.client.InfluxDBClient at 0x7feb438ec950>]
809785 """
810- conn_params = urlparse (dsn )
811- netlocs = conn_params .netloc .split (',' )
812- cluster_client = InfluxDBClusterClient (
813- hosts = [],
814- client_base_class = client_base_class ,
815- shuffle = shuffle ,
816- ** kwargs )
817- for netloc in netlocs :
818- single_dsn = '%(scheme)s://%(netloc)s%(path)s' % (
819- {'scheme' : conn_params .scheme ,
820- 'netloc' : netloc ,
821- 'path' : conn_params .path }
822- )
823- cluster_client .clients .append (client_base_class .from_DSN (
824- single_dsn ,
825- ** kwargs ))
786+ init_args = parse_dsn (dsn )
787+ init_args .update (** kwargs )
788+ init_args ['shuffle' ] = shuffle
789+ init_args ['client_base_class' ] = client_base_class
790+ cluster_client = InfluxDBClusterClient (** init_args )
826791 return cluster_client
827792
793+ def _update_client_host (self , host ):
794+ self ._client ._host , self ._client ._port = host
795+ self ._client ._baseurl = "{0}://{1}:{2}" .format (self ._client ._scheme ,
796+ self ._client ._host ,
797+ self ._client ._port )
798+
828799 def _make_func (self , orig_func ):
829800
830801 @wraps (orig_func )
831802 def func (* args , ** kwargs ):
832803 if self .shuffle :
833- random .shuffle (self .clients )
834- clients = self .clients + self .bad_clients
835- for c in clients :
836- bad_client = False
804+ random .shuffle (self .hosts )
805+
806+ hosts = self .hosts + self .bad_hosts
807+ for h in hosts :
808+ bad_host = False
837809 try :
838- return orig_func (c , * args , ** kwargs )
810+ self ._update_client_host (h )
811+ return orig_func (self ._client , * args , ** kwargs )
839812 except InfluxDBClientError as e :
840813 # Errors caused by user's requests, re-raise
841814 raise e
842815 except Exception as e :
843816 # Errors that might caused by server failure, try another
844- bad_client = True
845- if c in self .clients :
846- self .clients .remove (c )
847- self .bad_clients .append (c )
817+ bad_host = True
818+ if h in self .hosts :
819+ self .hosts .remove (h )
820+ self .bad_hosts .append (h )
848821 finally :
849- if not bad_client and c in self .bad_clients :
850- self .bad_clients .remove (c )
851- self .clients .append (c )
822+ if not bad_host and h in self .bad_hosts :
823+ self .bad_hosts .remove (h )
824+ self .hosts .append (h )
852825
853826 raise InfluxDBServerError ("InfluxDB: no viable server!" )
854827
855828 return func
829+
830+
831+ def parse_dsn (dsn ):
832+ conn_params = urlparse (dsn )
833+ init_args = {}
834+ scheme_info = conn_params .scheme .split ('+' )
835+ if len (scheme_info ) == 1 :
836+ scheme = scheme_info [0 ]
837+ modifier = None
838+ else :
839+ modifier , scheme = scheme_info
840+
841+ if scheme != 'influxdb' :
842+ raise ValueError ('Unknown scheme "{}".' .format (scheme ))
843+
844+ if modifier :
845+ if modifier == 'udp' :
846+ init_args ['use_udp' ] = True
847+ elif modifier == 'https' :
848+ init_args ['ssl' ] = True
849+ else :
850+ raise ValueError ('Unknown modifier "{}".' .format (modifier ))
851+
852+ netlocs = conn_params .netloc .split (',' )
853+
854+ init_args ['hosts' ] = []
855+ for netloc in netlocs :
856+ parsed = _parse_netloc (netloc )
857+ init_args ['hosts' ].append ((parsed ['host' ], int (parsed ['port' ])))
858+ init_args ['username' ] = parsed ['username' ]
859+ init_args ['password' ] = parsed ['password' ]
860+
861+ if conn_params .path and len (conn_params .path ) > 1 :
862+ init_args ['database' ] = conn_params .path [1 :]
863+
864+ return init_args
865+
866+
867+ def _parse_netloc (netloc ):
868+ import re
869+ parsed = re .findall (r'(\w*):(\w*)@(\w*):(\d*)' , netloc )
870+ if not parsed :
871+ raise ValueError ('Invalid netloc "{}".' .format (netloc ))
872+
873+ info = parsed [0 ]
874+ return {'username' : info [0 ] or None ,
875+ 'password' : info [1 ] or None ,
876+ 'host' : info [2 ] or 'localhost' ,
877+ 'port' : info [3 ] or 8086 }
0 commit comments