Skip to content

Commit e3f45fd

Browse files
committed
Merge pull request #1516 from dhermes/happybase-connection-pool
Adding HappyBase connection pool.
2 parents 317d66d + 0f0f010 commit e3f45fd

4 files changed

Lines changed: 240 additions & 2 deletions

File tree

gcloud/bigtable/happybase/pool.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Copyright 2016 Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Google Cloud Bigtable HappyBase pool module."""
16+
17+
18+
import threading
19+
20+
import six
21+
22+
from gcloud.bigtable.happybase.connection import Connection
23+
from gcloud.bigtable.happybase.connection import _get_cluster
24+
25+
26+
_MIN_POOL_SIZE = 1
27+
"""Minimum allowable size of a connection pool."""
28+
29+
30+
class ConnectionPool(object):
31+
"""Thread-safe connection pool.
32+
33+
.. note::
34+
35+
All keyword arguments are passed unmodified to the
36+
:class:`.Connection` constructor **except** for ``autoconnect``.
37+
This is because the ``open`` / ``closed`` status of a connection
38+
is managed by the pool. In addition, if ``cluster`` is not passed,
39+
the default / inferred cluster is determined by the pool and then
40+
passed to each :class:`.Connection` that is created.
41+
42+
:type size: int
43+
:param size: The maximum number of concurrently open connections.
44+
45+
:type kwargs: dict
46+
:param kwargs: Keyword arguments passed to :class:`.Connection`
47+
constructor.
48+
49+
:raises: :class:`TypeError <exceptions.TypeError>` if ``size``
50+
is non an integer.
51+
:class:`ValueError <exceptions.ValueError>` if ``size``
52+
is not positive.
53+
"""
54+
def __init__(self, size, **kwargs):
55+
if not isinstance(size, six.integer_types):
56+
raise TypeError('Pool size arg must be an integer')
57+
58+
if size < _MIN_POOL_SIZE:
59+
raise ValueError('Pool size must be positive')
60+
61+
self._lock = threading.Lock()
62+
self._queue = six.moves.queue.LifoQueue(maxsize=size)
63+
self._thread_connections = threading.local()
64+
65+
connection_kwargs = kwargs
66+
connection_kwargs['autoconnect'] = False
67+
if 'cluster' not in connection_kwargs:
68+
connection_kwargs['cluster'] = _get_cluster(
69+
timeout=kwargs.get('timeout'))
70+
71+
for _ in six.moves.range(size):
72+
connection = Connection(**connection_kwargs)
73+
self._queue.put(connection)
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
# Copyright 2016 Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
import unittest2
17+
18+
19+
class TestConnectionPool(unittest2.TestCase):
20+
21+
def _getTargetClass(self):
22+
from gcloud.bigtable.happybase.pool import ConnectionPool
23+
return ConnectionPool
24+
25+
def _makeOne(self, *args, **kwargs):
26+
return self._getTargetClass()(*args, **kwargs)
27+
28+
def test_constructor_defaults(self):
29+
import six
30+
import threading
31+
from gcloud.bigtable.happybase.connection import Connection
32+
33+
size = 11
34+
cluster_copy = _Cluster()
35+
all_copies = [cluster_copy] * size
36+
cluster = _Cluster(copies=all_copies) # Avoid implicit environ check.
37+
pool = self._makeOne(size, cluster=cluster)
38+
39+
self.assertTrue(isinstance(pool._lock, type(threading.Lock())))
40+
self.assertTrue(isinstance(pool._thread_connections, threading.local))
41+
self.assertEqual(pool._thread_connections.__dict__, {})
42+
43+
queue = pool._queue
44+
self.assertTrue(isinstance(queue, six.moves.queue.LifoQueue))
45+
self.assertTrue(queue.full())
46+
self.assertEqual(queue.maxsize, size)
47+
for connection in queue.queue:
48+
self.assertTrue(isinstance(connection, Connection))
49+
self.assertTrue(connection._cluster is cluster_copy)
50+
51+
def test_constructor_passes_kwargs(self):
52+
table_prefix = 'foo'
53+
table_prefix_separator = '<>'
54+
cluster = _Cluster() # Avoid implicit environ check.
55+
56+
size = 1
57+
pool = self._makeOne(size, table_prefix=table_prefix,
58+
table_prefix_separator=table_prefix_separator,
59+
cluster=cluster)
60+
61+
for connection in pool._queue.queue:
62+
self.assertEqual(connection.table_prefix, table_prefix)
63+
self.assertEqual(connection.table_prefix_separator,
64+
table_prefix_separator)
65+
66+
def test_constructor_ignores_autoconnect(self):
67+
from gcloud._testing import _Monkey
68+
from gcloud.bigtable.happybase.connection import Connection
69+
from gcloud.bigtable.happybase import pool as MUT
70+
71+
class ConnectionWithOpen(Connection):
72+
73+
_open_called = False
74+
75+
def open(self):
76+
self._open_called = True
77+
78+
# First make sure the custom Connection class does as expected.
79+
cluster_copy1 = _Cluster()
80+
cluster_copy2 = _Cluster()
81+
cluster_copy3 = _Cluster()
82+
cluster = _Cluster(
83+
copies=[cluster_copy1, cluster_copy2, cluster_copy3])
84+
connection = ConnectionWithOpen(autoconnect=False, cluster=cluster)
85+
self.assertFalse(connection._open_called)
86+
self.assertTrue(connection._cluster is cluster_copy1)
87+
connection = ConnectionWithOpen(autoconnect=True, cluster=cluster)
88+
self.assertTrue(connection._open_called)
89+
self.assertTrue(connection._cluster is cluster_copy2)
90+
91+
# Then make sure autoconnect=True is ignored in a pool.
92+
size = 1
93+
with _Monkey(MUT, Connection=ConnectionWithOpen):
94+
pool = self._makeOne(size, autoconnect=True, cluster=cluster)
95+
96+
for connection in pool._queue.queue:
97+
self.assertTrue(isinstance(connection, ConnectionWithOpen))
98+
self.assertTrue(connection._cluster is cluster_copy3)
99+
self.assertFalse(connection._open_called)
100+
101+
def test_constructor_infers_cluster(self):
102+
from gcloud._testing import _Monkey
103+
from gcloud.bigtable.happybase.connection import Connection
104+
from gcloud.bigtable.happybase import pool as MUT
105+
106+
size = 1
107+
cluster_copy = _Cluster()
108+
all_copies = [cluster_copy] * size
109+
cluster = _Cluster(copies=all_copies)
110+
get_cluster_calls = []
111+
112+
def mock_get_cluster(timeout=None):
113+
get_cluster_calls.append(timeout)
114+
return cluster
115+
116+
with _Monkey(MUT, _get_cluster=mock_get_cluster):
117+
pool = self._makeOne(size)
118+
119+
for connection in pool._queue.queue:
120+
self.assertTrue(isinstance(connection, Connection))
121+
# We know that the Connection() constructor will
122+
# call cluster.copy().
123+
self.assertTrue(connection._cluster is cluster_copy)
124+
125+
self.assertEqual(get_cluster_calls, [None])
126+
127+
def test_constructor_non_integer_size(self):
128+
size = None
129+
with self.assertRaises(TypeError):
130+
self._makeOne(size)
131+
132+
def test_constructor_non_positive_size(self):
133+
size = -10
134+
with self.assertRaises(ValueError):
135+
self._makeOne(size)
136+
size = 0
137+
with self.assertRaises(ValueError):
138+
self._makeOne(size)
139+
140+
141+
class _Client(object):
142+
143+
def __init__(self):
144+
self.stop_calls = 0
145+
146+
def stop(self):
147+
self.stop_calls += 1
148+
149+
150+
class _Cluster(object):
151+
152+
def __init__(self, copies=()):
153+
self.copies = list(copies)
154+
# Included to support Connection.__del__
155+
self._client = _Client()
156+
157+
def copy(self):
158+
if self.copies:
159+
result = self.copies[0]
160+
self.copies[:] = self.copies[1:]
161+
return result
162+
else:
163+
return self

scripts/run_pylint.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
PRODUCTION_RC = os.path.join(SCRIPTS_DIR, 'pylintrc_default')
4444
TEST_RC = os.path.join(SCRIPTS_DIR, 'pylintrc_reduced')
4545
TEST_DISABLED_MESSAGES = [
46+
'abstract-method',
47+
'arguments-differ',
48+
'assignment-from-no-return',
4649
'attribute-defined-outside-init',
4750
'exec-used',
4851
'import-error',
@@ -55,8 +58,6 @@
5558
'too-many-locals',
5659
'too-many-public-methods',
5760
'unbalanced-tuple-unpacking',
58-
'arguments-differ',
59-
'assignment-from-no-return',
6061
]
6162
TEST_RC_ADDITIONS = {
6263
'MESSAGES CONTROL': {

scripts/verify_included_modules.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
'gcloud.bigtable.cluster',
3535
'gcloud.bigtable.column_family',
3636
'gcloud.bigtable.happybase.connection',
37+
'gcloud.bigtable.happybase.pool',
3738
'gcloud.bigtable.happybase.table',
3839
'gcloud.bigtable.row',
3940
'gcloud.bigtable.row_data',

0 commit comments

Comments
 (0)