Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 102 additions & 80 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,36 +132,10 @@ def from_DSN(dsn, **kwargs):
additional `udp_port` parameter (cf. examples).
"""

init_args = {}
conn_params = urlparse(dsn)
scheme_info = conn_params.scheme.split('+')
if len(scheme_info) == 1:
scheme = scheme_info[0]
modifier = None
else:
modifier, scheme = scheme_info

if scheme != 'influxdb':
raise ValueError('Unknown scheme "{}".'.format(scheme))
if modifier:
if modifier == 'udp':
init_args['use_udp'] = True
elif modifier == 'https':
init_args['ssl'] = True
else:
raise ValueError('Unknown modifier "{}".'.format(modifier))

if conn_params.hostname:
init_args['host'] = conn_params.hostname
if conn_params.port:
init_args['port'] = conn_params.port
if conn_params.username:
init_args['username'] = conn_params.username
if conn_params.password:
init_args['password'] = conn_params.password
if conn_params.path and len(conn_params.path) > 1:
init_args['database'] = conn_params.path[1:]

init_args = parse_dsn(dsn)
host, port = init_args.pop('hosts')[0]
init_args['host'] = host
init_args['port'] = port
init_args.update(kwargs)

return InfluxDBClient(**init_args)
Expand Down Expand Up @@ -720,8 +694,8 @@ def send_packet(self, packet):

class InfluxDBClusterClient(object):
"""The :class:`~.InfluxDBClusterClient` is the client for connecting
to a cluster of InfluxDB servers. It's basically a proxy to multiple
InfluxDBClients.
to a cluster of InfluxDB servers. Each query hits different host from the
list of hosts.

:param hosts: all hosts to be included in the cluster, each of which
should be in the format (address, port),
Expand All @@ -731,7 +705,7 @@ class InfluxDBClusterClient(object):
:param shuffle: whether the queries should hit servers evenly(randomly),
defaults to True
:type shuffle: bool
:param client_base_class: the base class for all clients in the cluster.
:param client_base_class: the base class for the cluster client.
This parameter is used to enable the support of different client
types. Defaults to :class:`~.InfluxDBClient`
"""
Expand All @@ -749,26 +723,27 @@ def __init__(self,
shuffle=True,
client_base_class=InfluxDBClient,
):
self.clients = []
self.bad_clients = [] # Corresponding server has failures in history
self.clients = [self] # Keep it backwards compatible
self.hosts = hosts
self.bad_hosts = [] # Corresponding server has failures in history
self.shuffle = shuffle
for h in hosts:
self.clients.append(client_base_class(host=h[0], port=h[1],
username=username,
password=password,
database=database,
ssl=ssl,
verify_ssl=verify_ssl,
timeout=timeout,
use_udp=use_udp,
udp_port=udp_port))
host, port = self.hosts[0]
self._client = client_base_class(host=host,
port=port,
username=username,
password=password,
database=database,
ssl=ssl,
verify_ssl=verify_ssl,
timeout=timeout,
use_udp=use_udp,
udp_port=udp_port)
for method in dir(client_base_class):
if method.startswith('_'):
continue
orig_func = getattr(client_base_class, method)
if not callable(orig_func):
orig_attr = getattr(client_base_class, method, '')
if method.startswith('_') or not callable(orig_attr):
continue
setattr(self, method, self._make_func(orig_func))

setattr(self, method, self._make_func(orig_attr))

@staticmethod
def from_DSN(dsn, client_base_class=InfluxDBClient,
Expand All @@ -791,53 +766,100 @@ def from_DSN(dsn, client_base_class=InfluxDBClient,
@host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
>> type(cluster)
<class 'influxdb.client.InfluxDBClusterClient'>
>> cluster.clients
[<influxdb.client.InfluxDBClient at 0x7feb480295d0>,
>> cluster.hosts
[('host1', 8086), ('host2', 8086)]
>> cluster._client
<influxdb.client.InfluxDBClient at 0x7feb438ec950>]
"""
conn_params = urlparse(dsn)
netlocs = conn_params.netloc.split(',')
cluster_client = InfluxDBClusterClient(
hosts=[],
client_base_class=client_base_class,
shuffle=shuffle,
**kwargs)
for netloc in netlocs:
single_dsn = '%(scheme)s://%(netloc)s%(path)s' % (
{'scheme': conn_params.scheme,
'netloc': netloc,
'path': conn_params.path}
)
cluster_client.clients.append(client_base_class.from_DSN(
single_dsn,
**kwargs))
init_args = parse_dsn(dsn)
init_args.update(**kwargs)
init_args['shuffle'] = shuffle
init_args['client_base_class'] = client_base_class
cluster_client = InfluxDBClusterClient(**init_args)
return cluster_client

def _update_client_host(self, host):
self._client._host, self._client._port = host
self._client._baseurl = "{0}://{1}:{2}".format(self._client._scheme,
self._client._host,
self._client._port)

def _make_func(self, orig_func):

@wraps(orig_func)
def func(*args, **kwargs):
if self.shuffle:
random.shuffle(self.clients)
clients = self.clients + self.bad_clients
for c in clients:
bad_client = False
random.shuffle(self.hosts)

hosts = self.hosts + self.bad_hosts
for h in hosts:
bad_host = False
try:
return orig_func(c, *args, **kwargs)
self._update_client_host(h)
return orig_func(self._client, *args, **kwargs)
except InfluxDBClientError as e:
# Errors caused by user's requests, re-raise
raise e
except Exception as e:
# Errors that might caused by server failure, try another
bad_client = True
if c in self.clients:
self.clients.remove(c)
self.bad_clients.append(c)
bad_host = True
if h in self.hosts:
self.hosts.remove(h)
self.bad_hosts.append(h)
finally:
if not bad_client and c in self.bad_clients:
self.bad_clients.remove(c)
self.clients.append(c)
if not bad_host and h in self.bad_hosts:
self.bad_hosts.remove(h)
self.hosts.append(h)

raise InfluxDBServerError("InfluxDB: no viable server!")

return func


def parse_dsn(dsn):
conn_params = urlparse(dsn)
init_args = {}
scheme_info = conn_params.scheme.split('+')
if len(scheme_info) == 1:
scheme = scheme_info[0]
modifier = None
else:
modifier, scheme = scheme_info

if scheme != 'influxdb':
raise ValueError('Unknown scheme "{}".'.format(scheme))

if modifier:
if modifier == 'udp':
init_args['use_udp'] = True
elif modifier == 'https':
init_args['ssl'] = True
else:
raise ValueError('Unknown modifier "{}".'.format(modifier))

netlocs = conn_params.netloc.split(',')

init_args['hosts'] = []
for netloc in netlocs:
parsed = _parse_netloc(netloc)
init_args['hosts'].append((parsed['host'], int(parsed['port'])))
init_args['username'] = parsed['username']
init_args['password'] = parsed['password']

if conn_params.path and len(conn_params.path) > 1:
init_args['database'] = conn_params.path[1:]

return init_args


def _parse_netloc(netloc):
import re
parsed = re.findall(r'(\w*):(\w*)@(\w*):(\d*)', netloc)
if not parsed:
raise ValueError('Invalid netloc "{}".'.format(netloc))

info = parsed[0]
return {'username': info[0] or None,
'password': info[1] or None,
'host': info[2] or 'localhost',
'port': info[3] or 8086}
Loading