From e25576f324c225ff37fd12b1e190b314b8514b45 Mon Sep 17 00:00:00 2001 From: Georgi Dimitrov Date: Fri, 18 Sep 2015 21:25:58 +0000 Subject: [PATCH 1/2] Refactor in InfluxDBClusterClient. Use a single client and maintain a list of hosts instead of list of clients. Also, to avoid code duplication, parsing DSNs is now moved to a separate function that is called from both InfluxDBClient.fromDSN and InfluxDBClusterClient.fromDSN classmethods --- influxdb/client.py | 182 +++++++++++++++++++--------------- influxdb/tests/client_test.py | 102 +++++++------------ 2 files changed, 138 insertions(+), 146 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 6c180ce4..0897e051 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -132,36 +132,10 @@ def from_DSN(dsn, **kwargs): additional `udp_port` parameter (cf. examples). """ - init_args = {} - conn_params = urlparse(dsn) - scheme_info = conn_params.scheme.split('+') - if len(scheme_info) == 1: - scheme = scheme_info[0] - modifier = None - else: - modifier, scheme = scheme_info - - if scheme != 'influxdb': - raise ValueError('Unknown scheme "{}".'.format(scheme)) - if modifier: - if modifier == 'udp': - init_args['use_udp'] = True - elif modifier == 'https': - init_args['ssl'] = True - else: - raise ValueError('Unknown modifier "{}".'.format(modifier)) - - if conn_params.hostname: - init_args['host'] = conn_params.hostname - if conn_params.port: - init_args['port'] = conn_params.port - if conn_params.username: - init_args['username'] = conn_params.username - if conn_params.password: - init_args['password'] = conn_params.password - if conn_params.path and len(conn_params.path) > 1: - init_args['database'] = conn_params.path[1:] - + init_args = parse_dsn(dsn) + host, port = init_args.pop('hosts')[0] + init_args['host'] = host + init_args['port'] = port init_args.update(kwargs) return InfluxDBClient(**init_args) @@ -720,8 +694,8 @@ def send_packet(self, packet): class InfluxDBClusterClient(object): """The :class:`~.InfluxDBClusterClient` is the client for connecting - to a cluster of InfluxDB servers. It's basically a proxy to multiple - InfluxDBClients. + to a cluster of InfluxDB servers. Each query hits different host from the + list of hosts. :param hosts: all hosts to be included in the cluster, each of which should be in the format (address, port), @@ -731,7 +705,7 @@ class InfluxDBClusterClient(object): :param shuffle: whether the queries should hit servers evenly(randomly), defaults to True :type shuffle: bool - :param client_base_class: the base class for all clients in the cluster. + :param client_base_class: the base class for the cluster client. This parameter is used to enable the support of different client types. Defaults to :class:`~.InfluxDBClient` """ @@ -749,26 +723,27 @@ def __init__(self, shuffle=True, client_base_class=InfluxDBClient, ): - self.clients = [] - self.bad_clients = [] # Corresponding server has failures in history + self.clients = [self] # Keep it backwards compatible + self.hosts = hosts + self.bad_hosts = [] # Corresponding server has failures in history self.shuffle = shuffle - for h in hosts: - self.clients.append(client_base_class(host=h[0], port=h[1], - username=username, - password=password, - database=database, - ssl=ssl, - verify_ssl=verify_ssl, - timeout=timeout, - use_udp=use_udp, - udp_port=udp_port)) + host, port = self.hosts[0] + self._client = client_base_class(host=host, + port=port, + username=username, + password=password, + database=database, + ssl=ssl, + verify_ssl=verify_ssl, + timeout=timeout, + use_udp=use_udp, + udp_port=udp_port) for method in dir(client_base_class): - if method.startswith('_'): - continue - orig_func = getattr(client_base_class, method) - if not callable(orig_func): + orig_attr = getattr(client_base_class, method, '') + if method.startswith('_') or not callable(orig_attr): continue - setattr(self, method, self._make_func(orig_func)) + + setattr(self, method, self._make_func(orig_attr)) @staticmethod def from_DSN(dsn, client_base_class=InfluxDBClient, @@ -791,53 +766,100 @@ def from_DSN(dsn, client_base_class=InfluxDBClient, @host1:8086,usr:pwd@host2:8086/db_name', timeout=5) >> type(cluster) - >> cluster.clients - [, + >> cluster.hosts + [('host1', 8086), ('host2', 8086)] + >> cluster._client ] """ - conn_params = urlparse(dsn) - netlocs = conn_params.netloc.split(',') - cluster_client = InfluxDBClusterClient( - hosts=[], - client_base_class=client_base_class, - shuffle=shuffle, - **kwargs) - for netloc in netlocs: - single_dsn = '%(scheme)s://%(netloc)s%(path)s' % ( - {'scheme': conn_params.scheme, - 'netloc': netloc, - 'path': conn_params.path} - ) - cluster_client.clients.append(client_base_class.from_DSN( - single_dsn, - **kwargs)) + init_args = parse_dsn(dsn) + init_args.update(**kwargs) + init_args['shuffle'] = shuffle + init_args['client_base_class'] = client_base_class + cluster_client = InfluxDBClusterClient(**init_args) return cluster_client + def _update_client_host(self, host): + self._client._host, self._client._port = host + self._client._baseurl = "{0}://{1}:{2}".format(self._client._scheme, + self._client._host, + self._client._port) + def _make_func(self, orig_func): @wraps(orig_func) def func(*args, **kwargs): if self.shuffle: - random.shuffle(self.clients) - clients = self.clients + self.bad_clients - for c in clients: - bad_client = False + random.shuffle(self.hosts) + + hosts = self.hosts + self.bad_hosts + for h in hosts: + bad_host = False try: - return orig_func(c, *args, **kwargs) + self._update_client_host(h) + return orig_func(self._client, *args, **kwargs) except InfluxDBClientError as e: # Errors caused by user's requests, re-raise raise e except Exception as e: # Errors that might caused by server failure, try another - bad_client = True - if c in self.clients: - self.clients.remove(c) - self.bad_clients.append(c) + bad_host = True + if h in self.hosts: + self.hosts.remove(h) + self.bad_hosts.append(h) finally: - if not bad_client and c in self.bad_clients: - self.bad_clients.remove(c) - self.clients.append(c) + if not bad_host and h in self.bad_hosts: + self.bad_hosts.remove(h) + self.hosts.append(h) raise InfluxDBServerError("InfluxDB: no viable server!") return func + + +def parse_dsn(dsn): + conn_params = urlparse(dsn) + init_args = {} + scheme_info = conn_params.scheme.split('+') + if len(scheme_info) == 1: + scheme = scheme_info[0] + modifier = None + else: + modifier, scheme = scheme_info + + if scheme != 'influxdb': + raise ValueError('Unknown scheme "{}".'.format(scheme)) + + if modifier: + if modifier == 'udp': + init_args['use_udp'] = True + elif modifier == 'https': + init_args['ssl'] = True + else: + raise ValueError('Unknown modifier "{}".'.format(modifier)) + + netlocs = conn_params.netloc.split(',') + + init_args['hosts'] = [] + for netloc in netlocs: + parsed = _parse_netloc(netloc) + init_args['hosts'].append((parsed['host'], int(parsed['port']))) + init_args['username'] = parsed['username'] + init_args['password'] = parsed['password'] + + if conn_params.path and len(conn_params.path) > 1: + init_args['database'] = conn_params.path[1:] + + return init_args + + +def _parse_netloc(netloc): + import re + parsed = re.findall(r'(\w*):(\w*)@(\w*):(\d*)', netloc) + if not parsed: + raise ValueError('Invalid netloc "{}".'.format(netloc)) + + info = parsed[0] + return {'username': info[0] or None, + 'password': info[1] or None, + 'host': info[2] or 'localhost', + 'port': info[3] or 8086} diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 4f09970d..7fc912cb 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -715,7 +715,9 @@ def test_revoke_privilege_invalid(self): class FakeClient(InfluxDBClient): - fail = False + + def __init__(self, *args, **kwargs): + super(FakeClient, self).__init__(*args, **kwargs) def query(self, query, @@ -724,9 +726,10 @@ def query(self, database=None): if query == 'Fail': raise Exception("Fail") - - if self.fail: - raise Exception("Fail") + elif query == 'Fail once' and self._host == 'host1': + raise Exception("Fail Once") + elif query == 'Fail twice' and self._host in 'host1 host2': + raise Exception("Fail Twice") else: return "Success" @@ -747,32 +750,28 @@ def test_init(self): database='database', shuffle=False, client_base_class=FakeClient) - self.assertEqual(3, len(cluster.clients)) - self.assertEqual(0, len(cluster.bad_clients)) - for idx, client in enumerate(cluster.clients): - self.assertEqual(self.hosts[idx][0], client._host) - self.assertEqual(self.hosts[idx][1], client._port) + self.assertEqual(3, len(cluster.hosts)) + self.assertEqual(0, len(cluster.bad_hosts)) + self.assertIn((cluster._client._host, + cluster._client._port), cluster.hosts) def test_one_server_fails(self): cluster = InfluxDBClusterClient(hosts=self.hosts, database='database', shuffle=False, client_base_class=FakeClient) - cluster.clients[0].fail = True - self.assertEqual('Success', cluster.query('')) - self.assertEqual(2, len(cluster.clients)) - self.assertEqual(1, len(cluster.bad_clients)) + self.assertEqual('Success', cluster.query('Fail once')) + self.assertEqual(2, len(cluster.hosts)) + self.assertEqual(1, len(cluster.bad_hosts)) def test_two_servers_fail(self): cluster = InfluxDBClusterClient(hosts=self.hosts, database='database', shuffle=False, client_base_class=FakeClient) - cluster.clients[0].fail = True - cluster.clients[1].fail = True - self.assertEqual('Success', cluster.query('')) - self.assertEqual(1, len(cluster.clients)) - self.assertEqual(2, len(cluster.bad_clients)) + self.assertEqual('Success', cluster.query('Fail twice')) + self.assertEqual(1, len(cluster.hosts)) + self.assertEqual(2, len(cluster.bad_hosts)) def test_all_fail(self): cluster = InfluxDBClusterClient(hosts=self.hosts, @@ -781,8 +780,8 @@ def test_all_fail(self): client_base_class=FakeClient) with self.assertRaises(InfluxDBServerError): cluster.query('Fail') - self.assertEqual(0, len(cluster.clients)) - self.assertEqual(3, len(cluster.bad_clients)) + self.assertEqual(0, len(cluster.hosts)) + self.assertEqual(3, len(cluster.bad_hosts)) def test_all_good(self): cluster = InfluxDBClusterClient(hosts=self.hosts, @@ -790,8 +789,8 @@ def test_all_good(self): shuffle=True, client_base_class=FakeClient) self.assertEqual('Success', cluster.query('')) - self.assertEqual(3, len(cluster.clients)) - self.assertEqual(0, len(cluster.bad_clients)) + self.assertEqual(3, len(cluster.hosts)) + self.assertEqual(0, len(cluster.bad_hosts)) def test_recovery(self): cluster = InfluxDBClusterClient(hosts=self.hosts, @@ -801,68 +800,39 @@ def test_recovery(self): with self.assertRaises(InfluxDBServerError): cluster.query('Fail') self.assertEqual('Success', cluster.query('')) - self.assertEqual(1, len(cluster.clients)) - self.assertEqual(2, len(cluster.bad_clients)) + self.assertEqual(1, len(cluster.hosts)) + self.assertEqual(2, len(cluster.bad_hosts)) def test_dsn(self): cli = InfluxDBClusterClient.from_DSN(self.dsn_string) - self.assertEqual(2, len(cli.clients)) - self.assertEqual('http://host1:8086', cli.clients[0]._baseurl) - self.assertEqual('uSr', cli.clients[0]._username) - self.assertEqual('pWd', cli.clients[0]._password) - self.assertEqual('db', cli.clients[0]._database) - self.assertFalse(cli.clients[0].use_udp) - self.assertEqual('http://host2:8086', cli.clients[1]._baseurl) - self.assertEqual('uSr', cli.clients[1]._username) - self.assertEqual('pWd', cli.clients[1]._password) - self.assertEqual('db', cli.clients[1]._database) - self.assertFalse(cli.clients[1].use_udp) + self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts) + self.assertEqual('http://host1:8086', cli._client._baseurl) + self.assertEqual('uSr', cli._client._username) + self.assertEqual('pWd', cli._client._password) + self.assertEqual('db', cli._client._database) + self.assertFalse(cli._client.use_udp) cli = InfluxDBClusterClient.from_DSN('udp+' + self.dsn_string) - self.assertTrue(cli.clients[0].use_udp) - self.assertTrue(cli.clients[1].use_udp) + self.assertTrue(cli._client.use_udp) cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string) - self.assertEqual('https://host1:8086', cli.clients[0]._baseurl) - self.assertEqual('https://host2:8086', cli.clients[1]._baseurl) + self.assertEqual('https://host1:8086', cli._client._baseurl) cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string, **{'ssl': False}) - self.assertEqual('http://host1:8086', cli.clients[0]._baseurl) - self.assertEqual('http://host2:8086', cli.clients[1]._baseurl) - - def test_dsn_single_client(self): - cli = InfluxDBClusterClient.from_DSN('influxdb://usr:pwd@host:8086/db') - self.assertEqual('http://host:8086', cli.clients[0]._baseurl) - self.assertEqual('usr', cli.clients[0]._username) - self.assertEqual('pwd', cli.clients[0]._password) - self.assertEqual('db', cli.clients[0]._database) - self.assertFalse(cli.clients[0].use_udp) - - cli = InfluxDBClusterClient.from_DSN( - 'udp+influxdb://usr:pwd@host:8086/db') - self.assertTrue(cli.clients[0].use_udp) - - cli = InfluxDBClusterClient.from_DSN( - 'https+influxdb://usr:pwd@host:8086/db') - self.assertEqual('https://host:8086', cli.clients[0]._baseurl) - - cli = InfluxDBClusterClient.from_DSN( - 'https+influxdb://usr:pwd@host:8086/db', - **{'ssl': False}) - self.assertEqual('http://host:8086', cli.clients[0]._baseurl) + self.assertEqual('http://host1:8086', cli._client._baseurl) def test_dsn_password_caps(self): cli = InfluxDBClusterClient.from_DSN( 'https+influxdb://usr:pWd@host:8086/db') - self.assertEqual('pWd', cli.clients[0]._password) + self.assertEqual('pWd', cli._client._password) def test_dsn_mixed_scheme_case(self): cli = InfluxDBClusterClient.from_DSN( 'hTTps+inFLUxdb://usr:pWd@host:8086/db') - self.assertEqual('pWd', cli.clients[0]._password) - self.assertEqual('https://host:8086', cli.clients[0]._baseurl) + self.assertEqual('pWd', cli._client._password) + self.assertEqual('https://host:8086', cli._client._baseurl) cli = InfluxDBClusterClient.from_DSN( 'uDP+influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db') - self.assertTrue(cli.clients[0].use_udp) + self.assertTrue(cli._client.use_udp) From 2b427757ec0c0e63ee440fa67ac23ed6343f2806 Mon Sep 17 00:00:00 2001 From: Georgi Dimitrov Date: Sun, 13 Sep 2015 15:14:26 +0000 Subject: [PATCH 2/2] Fix PEP8 warning in client tests --- influxdb/tests/client_test.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 7fc912cb..27f2c6f3 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -337,12 +337,13 @@ def test_write_points_with_precision_fails(self): cli.write_points_with_precision([]) def test_query(self): - example_response = \ - '{"results": [{"series": [{"measurement": "sdfsdfsdf", ' \ - '"columns": ["time", "value"], "values": ' \ - '[["2009-11-10T23:00:00Z", 0.64]]}]}, {"series": ' \ - '[{"measurement": "cpu_load_short", "columns": ["time", "value"], ' \ + example_response = ( + '{"results": [{"series": [{"measurement": "sdfsdfsdf", ' + '"columns": ["time", "value"], "values": ' + '[["2009-11-10T23:00:00Z", 0.64]]}]}, {"series": ' + '[{"measurement": "cpu_load_short", "columns": ["time", "value"], ' '"values": [["2009-11-10T23:00:00Z", 0.64]]}]}]}' + ) with requests_mock.Mocker() as m: m.register_uri(