Skip to content

Commit 1589592

Browse files
authored
Removing Pub / Sub HelperThreadRegistry. (googleapis#4537)
I'll likely put back in **some** of the features (especially the concept of the namedtuple that knows about the thread **AND** the queue / threading.Event).
1 parent a2cc7af commit 1589592

5 files changed

Lines changed: 27 additions & 214 deletions

File tree

pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@
130130

131131

132132
_LOGGER = logging.getLogger(__name__)
133-
_BIDIRECTIONAL_CONSUMER_NAME = 'ConsumeBidirectionalStream'
133+
_BIDIRECTIONAL_CONSUMER_NAME = 'Thread-ConsumeBidirectionalStream'
134134

135135

136136
class Consumer(object):
@@ -188,10 +188,7 @@ def __init__(self, policy):
188188
self._put_lock = threading.Lock()
189189

190190
self.active = False
191-
self.helper_threads = _helper_threads.HelperThreadRegistry()
192-
""":class:`_helper_threads.HelperThreads`: manages the helper threads.
193-
The policy may use this to schedule its own helper threads.
194-
"""
191+
self._consumer_thread = None
195192

196193
def send_request(self, request):
197194
"""Queue a request to be sent to gRPC.
@@ -330,14 +327,20 @@ def start_consuming(self):
330327
"""Start consuming the stream."""
331328
self.active = True
332329
self._exiting.clear()
333-
self.helper_threads.start(
334-
_BIDIRECTIONAL_CONSUMER_NAME,
335-
self.send_request,
336-
self._blocking_consume,
330+
thread = threading.Thread(
331+
name=_BIDIRECTIONAL_CONSUMER_NAME,
332+
target=self._blocking_consume,
337333
)
334+
thread.daemon = True
335+
thread.start()
336+
_LOGGER.debug('Started helper thread %s', thread.name)
337+
self._consumer_thread = thread
338338

339339
def stop_consuming(self):
340340
"""Signal the stream to stop and block until it completes."""
341341
self.active = False
342342
self._exiting.set()
343-
self.helper_threads.stop_all()
343+
_LOGGER.debug('Stopping helper thread %s', self._consumer_thread.name)
344+
self.send_request(_helper_threads.STOP)
345+
self._consumer_thread.join()
346+
self._consumer_thread = None

pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py

Lines changed: 0 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,17 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import collections
1615
import logging
17-
import threading
1816
import uuid
1917

20-
import six
2118

2219
__all__ = (
23-
'HelperThreadRegistry',
2420
'QueueCallbackWorker',
2521
'STOP',
2622
)
2723

2824
_LOGGER = logging.getLogger(__name__)
2925

30-
_HelperThread = collections.namedtuple(
31-
'HelperThreads',
32-
['name', 'thread', 'queue_put'],
33-
)
34-
3526

3627
# Helper thread stop indicator. This could be a sentinel object or None,
3728
# but the sentinel object's ID can change if the process is forked, and
@@ -40,88 +31,6 @@
4031
STOP = uuid.uuid4()
4132

4233

43-
def _current_thread():
44-
"""Get the currently active thread.
45-
46-
This is provided as a test helper so that it can be mocked easily.
47-
Mocking ``threading.current_thread()`` directly may have unintended
48-
consequences on code that relies on it.
49-
50-
Returns:
51-
threading.Thread: The current thread.
52-
"""
53-
return threading.current_thread()
54-
55-
56-
class HelperThreadRegistry(object):
57-
def __init__(self):
58-
self._helper_threads = {}
59-
60-
def start(self, name, queue_put, target):
61-
"""Create and start a helper thread.
62-
63-
Args:
64-
name (str): The name of the helper thread.
65-
queue_put (Callable): The ``put()`` method for a
66-
concurrency-safe queue.
67-
target (Callable): The target of the thread.
68-
69-
Returns:
70-
threading.Thread: The created thread.
71-
"""
72-
# Create and start the helper thread.
73-
thread = threading.Thread(
74-
name='Thread-ConsumerHelper-{}'.format(name),
75-
target=target,
76-
)
77-
thread.daemon = True
78-
thread.start()
79-
80-
# Keep track of the helper thread, so we are able to stop it.
81-
self._helper_threads[name] = _HelperThread(name, thread, queue_put)
82-
_LOGGER.debug('Started helper thread %s', name)
83-
return thread
84-
85-
def stop(self, name):
86-
"""Stops a helper thread.
87-
88-
Sends the stop message and blocks until the thread joins.
89-
90-
Args:
91-
name (str): The name of the thread.
92-
"""
93-
# Attempt to retrieve the thread; if it is gone already, no-op.
94-
helper_thread = self._helper_threads.get(name)
95-
if helper_thread is None:
96-
return
97-
98-
if helper_thread.thread is _current_thread():
99-
# The current thread cannot ``join()`` itself but it can
100-
# still send a signal to stop.
101-
_LOGGER.debug('Cannot stop current thread %s', name)
102-
helper_thread.queue_put(STOP)
103-
# We return and stop short of ``pop()``-ing so that the
104-
# thread that invoked the current helper can properly stop
105-
# it.
106-
return
107-
108-
# Join the thread if it is still alive.
109-
if helper_thread.thread.is_alive():
110-
_LOGGER.debug('Stopping helper thread %s', name)
111-
helper_thread.queue_put(STOP)
112-
helper_thread.thread.join()
113-
114-
# Remove the thread from our tracking.
115-
self._helper_threads.pop(name, None)
116-
117-
def stop_all(self):
118-
"""Stop all helper threads."""
119-
# This could be more efficient by sending the stop signal to all
120-
# threads before joining any of them.
121-
for name in list(six.iterkeys(self._helper_threads)):
122-
self.stop(name)
123-
124-
12534
class QueueCallbackWorker(object):
12635
"""A helper that executes a callback for every item in the queue.
12736

pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,12 @@ def test_blocking_consume_on_exception():
9696
policy.on_response.side_effect = exc
9797

9898
consumer = _consumer.Consumer(policy=policy)
99+
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
99100
policy.on_exception.side_effect = OnException()
100101

101102
# Establish that we get responses until we are sent the exiting event.
102103
consumer._blocking_consume()
104+
assert consumer._consumer_thread is None
103105

104106
# Check mocks.
105107
policy.call_rpc.assert_called_once()
@@ -118,10 +120,12 @@ def test_blocking_consume_two_exceptions():
118120
policy.on_response.side_effect = (exc1, exc2)
119121

120122
consumer = _consumer.Consumer(policy=policy)
123+
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
121124
policy.on_exception.side_effect = OnException(acceptable=exc1)
122125

123126
# Establish that we get responses until we are sent the exiting event.
124127
consumer._blocking_consume()
128+
assert consumer._consumer_thread is None
125129

126130
# Check mocks.
127131
assert policy.call_rpc.call_count == 2
@@ -133,16 +137,16 @@ def test_blocking_consume_two_exceptions():
133137

134138
def test_start_consuming():
135139
consumer = create_consumer()
136-
helper_threads = consumer.helper_threads
137-
with mock.patch.object(helper_threads, 'start', autospec=True) as start:
140+
with mock.patch.object(threading, 'Thread', autospec=True) as Thread:
138141
consumer.start_consuming()
139-
assert consumer._exiting.is_set() is False
140-
assert consumer.active is True
141-
start.assert_called_once_with(
142-
'ConsumeBidirectionalStream',
143-
consumer.send_request,
144-
consumer._blocking_consume,
145-
)
142+
143+
assert consumer._exiting.is_set() is False
144+
assert consumer.active is True
145+
Thread.assert_called_once_with(
146+
name=_consumer._BIDIRECTIONAL_CONSUMER_NAME,
147+
target=consumer._blocking_consume,
148+
)
149+
assert consumer._consumer_thread is Thread.return_value
146150

147151

148152
def basic_queue_generator(queue, received):

pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py

Lines changed: 0 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -12,111 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import threading
16-
1715
import mock
1816
from six.moves import queue
1917

2018
from google.cloud.pubsub_v1.subscriber import _helper_threads
2119

2220

23-
def test_start():
24-
registry = _helper_threads.HelperThreadRegistry()
25-
queue_ = queue.Queue()
26-
target = mock.Mock(spec=())
27-
with mock.patch.object(threading.Thread, 'start', autospec=True) as start:
28-
registry.start('foo', queue_, target)
29-
assert start.called
30-
31-
32-
def test_stop_noop():
33-
registry = _helper_threads.HelperThreadRegistry()
34-
assert len(registry._helper_threads) == 0
35-
registry.stop('foo')
36-
assert len(registry._helper_threads) == 0
37-
38-
39-
@mock.patch.object(
40-
_helper_threads, '_current_thread', return_value=mock.sentinel.thread)
41-
def test_stop_current_thread(_current_thread):
42-
registry = _helper_threads.HelperThreadRegistry()
43-
queue_ = mock.Mock(spec=('put',))
44-
45-
name = 'here'
46-
registry._helper_threads[name] = _helper_threads._HelperThread(
47-
name=name,
48-
queue_put=queue_.put,
49-
thread=_current_thread.return_value,
50-
)
51-
assert list(registry._helper_threads.keys()) == [name]
52-
registry.stop(name)
53-
# Make sure it hasn't been removed from the registry ...
54-
assert list(registry._helper_threads.keys()) == [name]
55-
# ... but it did receive the STOP signal.
56-
queue_.put.assert_called_once_with(_helper_threads.STOP)
57-
58-
# Verify that our mock was only called once.
59-
_current_thread.assert_called_once_with()
60-
61-
62-
def test_stop_dead_thread():
63-
registry = _helper_threads.HelperThreadRegistry()
64-
registry._helper_threads['foo'] = _helper_threads._HelperThread(
65-
name='foo',
66-
queue_put=None,
67-
thread=threading.Thread(target=lambda: None),
68-
)
69-
assert len(registry._helper_threads) == 1
70-
registry.stop('foo')
71-
assert len(registry._helper_threads) == 0
72-
73-
74-
@mock.patch.object(queue.Queue, 'put')
75-
@mock.patch.object(threading.Thread, 'is_alive')
76-
@mock.patch.object(threading.Thread, 'join')
77-
def test_stop_alive_thread(join, is_alive, put):
78-
is_alive.return_value = True
79-
80-
# Set up a registry with a helper thread in it.
81-
registry = _helper_threads.HelperThreadRegistry()
82-
queue_ = queue.Queue()
83-
registry._helper_threads['foo'] = _helper_threads._HelperThread(
84-
name='foo',
85-
queue_put=queue_.put,
86-
thread=threading.Thread(target=lambda: None),
87-
)
88-
89-
# Assert that the helper thread is present, and removed correctly
90-
# on stop.
91-
assert len(registry._helper_threads) == 1
92-
registry.stop('foo')
93-
assert len(registry._helper_threads) == 0
94-
95-
# Assert that all of our mocks were called in the expected manner.
96-
is_alive.assert_called_once_with()
97-
join.assert_called_once_with()
98-
put.assert_called_once_with(_helper_threads.STOP)
99-
100-
101-
def test_stop_all():
102-
registry = _helper_threads.HelperThreadRegistry()
103-
registry._helper_threads['foo'] = _helper_threads._HelperThread(
104-
name='foo',
105-
queue_put=None,
106-
thread=threading.Thread(target=lambda: None),
107-
)
108-
assert len(registry._helper_threads) == 1
109-
registry.stop_all()
110-
assert len(registry._helper_threads) == 0
111-
112-
113-
def test_stop_all_noop():
114-
registry = _helper_threads.HelperThreadRegistry()
115-
assert len(registry._helper_threads) == 0
116-
registry.stop_all()
117-
assert len(registry._helper_threads) == 0
118-
119-
12021
def test_queue_callback_worker():
12122
queue_ = queue.Queue()
12223
callback = mock.Mock(spec=())

pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from google.auth import credentials
2222
import mock
2323
import pytest
24-
import six
2524
from six.moves import queue
2625

2726
from google.cloud.pubsub_v1 import subscriber
@@ -105,10 +104,7 @@ def test_open():
105104
assert policy._dispatch_thread is threads[0]
106105
threads[0].start.assert_called_once_with()
107106

108-
threads_dict = consumer.helper_threads._helper_threads
109-
assert len(threads_dict) == 1
110-
helper_thread = next(six.itervalues(threads_dict))
111-
assert helper_thread.thread is threads[1]
107+
assert consumer._consumer_thread is threads[1]
112108
threads[1].start.assert_called_once_with()
113109

114110
assert policy._leases_thread is threads[2]

0 commit comments

Comments
 (0)