Skip to content

Commit 1a37fce

Browse files
authored
Making it impossible to call Policy.open() on an already opened policy. (googleapis#4606)
Similar with `Policy.close()`. Fixes googleapis#4488. Also returning the future from `Policy.close()`.
1 parent 78bda88 commit 1a37fce

File tree

2 files changed

+71
-8
lines changed

2 files changed

+71
-8
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,26 @@ def _get_executor(executor):
147147
return executor
148148

149149
def close(self):
150-
"""Close the existing connection."""
150+
"""Close the existing connection.
151+
152+
.. warning::
153+
154+
This method is not thread-safe. For example, if this method is
155+
called while another thread is executing :meth:`open`, then the
156+
policy could end up in an undefined state. The **same** policy
157+
instance is not intended to be used by multiple workers (though
158+
each policy instance **does** have a thread-safe private queue).
159+
160+
Returns:
161+
~google.api_core.future.Future: The future that **was** attached
162+
to the subscription.
163+
164+
Raises:
165+
ValueError: If the policy has not been opened yet.
166+
"""
167+
if self._future is None:
168+
raise ValueError('This policy has not been opened yet.')
169+
151170
# Stop consuming messages.
152171
self._request_queue.put(_helper_threads.STOP)
153172
self._dispatch_thread.join() # Wait until stopped.
@@ -159,9 +178,11 @@ def close(self):
159178

160179
# The subscription is closing cleanly; resolve the future if it is not
161180
# resolved already.
162-
if self._future is not None and not self._future.done():
181+
if not self._future.done():
163182
self._future.set_result(None)
183+
future = self._future
164184
self._future = None
185+
return future
165186

166187
def _start_dispatch(self):
167188
"""Start a thread to dispatch requests queued up by callbacks.
@@ -213,6 +234,14 @@ def _start_lease_worker(self):
213234
def open(self, callback):
214235
"""Open a streaming pull connection and begin receiving messages.
215236
237+
.. warning::
238+
239+
This method is not thread-safe. For example, if this method is
240+
called while another thread is executing :meth:`close`, then the
241+
policy could end up in an undefined state. The **same** policy
242+
instance is not intended to be used by multiple workers (though
243+
each policy instance **does** have a thread-safe private queue).
244+
216245
For each message received, the ``callback`` function is fired with
217246
a :class:`~.pubsub_v1.subscriber.message.Message` as its only
218247
argument.
@@ -222,9 +251,15 @@ def open(self, callback):
222251
223252
Returns:
224253
~google.api_core.future.Future: A future that provides
225-
an interface to block on the subscription if desired, and
226-
handle errors.
254+
an interface to block on the subscription if desired, and
255+
handle errors.
256+
257+
Raises:
258+
ValueError: If the policy has already been opened.
227259
"""
260+
if self._future is not None:
261+
raise ValueError('This policy has already been opened.')
262+
228263
# Create the Future that this method will return.
229264
# This future is the main thread's interface to handle exceptions,
230265
# block on the subscription, etc.

pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,35 @@ def test_close():
5555
policy = create_policy()
5656
policy._dispatch_thread = dispatch_thread
5757
policy._leases_thread = leases_thread
58+
future = mock.Mock(spec=('done',))
59+
future.done.return_value = True
60+
policy._future = future
61+
5862
consumer = policy._consumer
5963
with mock.patch.object(consumer, 'stop_consuming') as stop_consuming:
60-
policy.close()
64+
closed_fut = policy.close()
6165
stop_consuming.assert_called_once_with()
6266

6367
assert policy._dispatch_thread is None
6468
dispatch_thread.join.assert_called_once_with()
6569
assert policy._leases_thread is None
6670
leases_thread.join.assert_called_once_with()
71+
assert closed_fut is future
72+
assert policy._future is None
73+
future.done.assert_called_once_with()
74+
75+
76+
def test_close_without_future():
77+
policy = create_policy()
78+
assert policy._future is None
79+
80+
with pytest.raises(ValueError) as exc_info:
81+
policy.close()
6782

83+
assert exc_info.value.args == ('This policy has not been opened yet.',)
6884

69-
def test_close_with_future():
85+
86+
def test_close_with_unfinished_future():
7087
dispatch_thread = mock.Mock(spec=threading.Thread)
7188
leases_thread = mock.Mock(spec=threading.Thread)
7289

@@ -77,14 +94,15 @@ def test_close_with_future():
7794
consumer = policy._consumer
7895
with mock.patch.object(consumer, 'stop_consuming') as stop_consuming:
7996
future = policy.future
80-
policy.close()
97+
closed_fut = policy.close()
8198
stop_consuming.assert_called_once_with()
8299

83100
assert policy._dispatch_thread is None
84101
dispatch_thread.join.assert_called_once_with()
85102
assert policy._leases_thread is None
86103
leases_thread.join.assert_called_once_with()
87-
assert policy.future != future
104+
assert policy._future is None
105+
assert closed_fut is future
88106
assert future.result() is None
89107

90108

@@ -111,6 +129,16 @@ def test_open():
111129
threads[2].start.assert_called_once_with()
112130

113131

132+
def test_open_already_open():
133+
policy = create_policy()
134+
policy._future = mock.sentinel.future
135+
136+
with pytest.raises(ValueError) as exc_info:
137+
policy.open(None)
138+
139+
assert exc_info.value.args == ('This policy has already been opened.',)
140+
141+
114142
def test_dispatch_callback_valid_actions():
115143
policy = create_policy()
116144
kwargs = {'foo': 10, 'bar': 13.37}

0 commit comments

Comments
 (0)