Skip to content

Commit 29b67c6

Browse files
committed
Merge pull request #1523 from dhermes/happybase-conn-pool-finish
Completing implementation of HappyBase connection pool.
2 parents 2c223e7 + 306b7c9 commit 29b67c6

File tree

2 files changed

+180
-0
lines changed

2 files changed

+180
-0
lines changed

gcloud/bigtable/happybase/pool.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Google Cloud Bigtable HappyBase pool module."""
1616

1717

18+
import contextlib
1819
import threading
1920

2021
import six
@@ -27,6 +28,14 @@
2728
"""Minimum allowable size of a connection pool."""
2829

2930

31+
class NoConnectionsAvailable(RuntimeError):
32+
"""Exception raised when no connections are available.
33+
34+
This happens if a timeout was specified when obtaining a connection,
35+
and no connection became available within the specified timeout.
36+
"""
37+
38+
3039
class ConnectionPool(object):
3140
"""Thread-safe connection pool.
3241
@@ -71,3 +80,72 @@ def __init__(self, size, **kwargs):
7180
for _ in six.moves.range(size):
7281
connection = Connection(**connection_kwargs)
7382
self._queue.put(connection)
83+
84+
def _acquire_connection(self, timeout=None):
85+
"""Acquire a connection from the pool.
86+
87+
:type timeout: int
88+
:param timeout: (Optional) Time (in seconds) to wait for a connection
89+
to open.
90+
91+
:rtype: :class:`.Connection`
92+
:returns: An active connection from the queue stored on the pool.
93+
:raises: :class:`NoConnectionsAvailable` if ``Queue.get`` fails
94+
before the ``timeout`` (only if a timeout is specified).
95+
"""
96+
try:
97+
return self._queue.get(block=True, timeout=timeout)
98+
except six.moves.queue.Empty:
99+
raise NoConnectionsAvailable('No connection available from pool '
100+
'within specified timeout')
101+
102+
@contextlib.contextmanager
103+
def connection(self, timeout=None):
104+
"""Obtain a connection from the pool.
105+
106+
Must be used as a context manager, for example::
107+
108+
with pool.connection() as connection:
109+
pass # do something with the connection
110+
111+
If ``timeout`` is omitted, this method waits forever for a connection
112+
to become available.
113+
114+
:type timeout: int
115+
:param timeout: (Optional) Time (in seconds) to wait for a connection
116+
to open.
117+
118+
:rtype: :class:`.Connection`
119+
:returns: An active connection from the pool.
120+
:raises: :class:`NoConnectionsAvailable` if no connection can be
121+
retrieved from the pool before the ``timeout`` (only if
122+
a timeout is specified).
123+
"""
124+
connection = getattr(self._thread_connections, 'current', None)
125+
126+
retrieved_new_cnxn = False
127+
if connection is None:
128+
# In this case we need to actually grab a connection from the
129+
# pool. After retrieval, the connection is stored on a thread
130+
# local so that nested connection requests from the same
131+
# thread can re-use the same connection instance.
132+
#
133+
# NOTE: This code acquires a lock before assigning to the
134+
# thread local; see
135+
# ('https://emptysqua.re/blog/'
136+
# 'another-thing-about-pythons-threadlocals/')
137+
retrieved_new_cnxn = True
138+
connection = self._acquire_connection(timeout)
139+
with self._lock:
140+
self._thread_connections.current = connection
141+
142+
# This is a no-op for connections that have already been opened
143+
# since they just call Client.start().
144+
connection.open()
145+
yield connection
146+
147+
# Remove thread local reference after the outermost 'with' block
148+
# ends. Afterwards the thread no longer owns the connection.
149+
if retrieved_new_cnxn:
150+
del self._thread_connections.current
151+
self._queue.put(connection)

gcloud/bigtable/happybase/test_pool.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,83 @@ def test_constructor_non_positive_size(self):
137137
with self.assertRaises(ValueError):
138138
self._makeOne(size)
139139

140+
def _makeOneWithMockQueue(self, queue_return):
141+
from gcloud._testing import _Monkey
142+
from gcloud.bigtable.happybase import pool as MUT
143+
144+
# We are going to use a fake queue, so we don't want any connections
145+
# or clusters to be created in the constructor.
146+
size = -1
147+
cluster = object()
148+
with _Monkey(MUT, _MIN_POOL_SIZE=size):
149+
pool = self._makeOne(size, cluster=cluster)
150+
151+
pool._queue = _Queue(queue_return)
152+
return pool
153+
154+
def test__acquire_connection(self):
155+
queue_return = object()
156+
pool = self._makeOneWithMockQueue(queue_return)
157+
158+
timeout = 432
159+
connection = pool._acquire_connection(timeout=timeout)
160+
self.assertTrue(connection is queue_return)
161+
self.assertEqual(pool._queue._get_calls, [(True, timeout)])
162+
self.assertEqual(pool._queue._put_calls, [])
163+
164+
def test__acquire_connection_failure(self):
165+
from gcloud.bigtable.happybase.pool import NoConnectionsAvailable
166+
167+
pool = self._makeOneWithMockQueue(None)
168+
timeout = 1027
169+
with self.assertRaises(NoConnectionsAvailable):
170+
pool._acquire_connection(timeout=timeout)
171+
self.assertEqual(pool._queue._get_calls, [(True, timeout)])
172+
self.assertEqual(pool._queue._put_calls, [])
173+
174+
def test_connection_is_context_manager(self):
175+
import contextlib
176+
import six
177+
178+
queue_return = _Connection()
179+
pool = self._makeOneWithMockQueue(queue_return)
180+
cnxn_context = pool.connection()
181+
if six.PY3: # pragma: NO COVER
182+
self.assertTrue(isinstance(cnxn_context,
183+
contextlib._GeneratorContextManager))
184+
else:
185+
self.assertTrue(isinstance(cnxn_context,
186+
contextlib.GeneratorContextManager))
187+
188+
def test_connection_no_current_cnxn(self):
189+
queue_return = _Connection()
190+
pool = self._makeOneWithMockQueue(queue_return)
191+
timeout = 55
192+
193+
self.assertFalse(hasattr(pool._thread_connections, 'current'))
194+
with pool.connection(timeout=timeout) as connection:
195+
self.assertEqual(pool._thread_connections.current, queue_return)
196+
self.assertTrue(connection is queue_return)
197+
self.assertFalse(hasattr(pool._thread_connections, 'current'))
198+
199+
self.assertEqual(pool._queue._get_calls, [(True, timeout)])
200+
self.assertEqual(pool._queue._put_calls,
201+
[(queue_return, None, None)])
202+
203+
def test_connection_with_current_cnxn(self):
204+
current_cnxn = _Connection()
205+
queue_return = _Connection()
206+
pool = self._makeOneWithMockQueue(queue_return)
207+
pool._thread_connections.current = current_cnxn
208+
timeout = 8001
209+
210+
with pool.connection(timeout=timeout) as connection:
211+
self.assertTrue(connection is current_cnxn)
212+
213+
self.assertEqual(pool._queue._get_calls, [])
214+
self.assertEqual(pool._queue._put_calls, [])
215+
self.assertEqual(pool._thread_connections.current, current_cnxn)
216+
140217

141218
class _Client(object):
142219

@@ -147,6 +224,12 @@ def stop(self):
147224
self.stop_calls += 1
148225

149226

227+
class _Connection(object):
228+
229+
def open(self):
230+
pass
231+
232+
150233
class _Cluster(object):
151234

152235
def __init__(self, copies=()):
@@ -161,3 +244,22 @@ def copy(self):
161244
return result
162245
else:
163246
return self
247+
248+
249+
class _Queue(object):
250+
251+
def __init__(self, result=None):
252+
self.result = result
253+
self._get_calls = []
254+
self._put_calls = []
255+
256+
def get(self, block=None, timeout=None):
257+
self._get_calls.append((block, timeout))
258+
if self.result is None:
259+
import six
260+
raise six.moves.queue.Empty
261+
else:
262+
return self.result
263+
264+
def put(self, item, block=None, timeout=None):
265+
self._put_calls.append((item, block, timeout))

0 commit comments

Comments
 (0)