@@ -134,6 +134,7 @@ def __init__(self, batch_settings=(), **kwargs):
134134 # messages. One batch exists for each topic.
135135 self ._batch_lock = self ._batch_class .make_lock ()
136136 self ._batches = {}
137+ self ._is_stopped = False
137138
138139 @classmethod
139140 def from_service_account_file (cls , filename , batch_settings = (), ** kwargs ):
@@ -187,20 +188,19 @@ def _batch(self, topic, create=False, autocommit=True):
187188 """
188189 # If there is no matching batch yet, then potentially create one
189190 # and place it on the batches dictionary.
190- with self ._batch_lock :
191- if not create :
192- batch = self ._batches .get (topic )
193- if batch is None :
194- create = True
195-
196- if create :
197- batch = self ._batch_class (
198- autocommit = autocommit ,
199- client = self ,
200- settings = self .batch_settings ,
201- topic = topic ,
202- )
203- self ._batches [topic ] = batch
191+ if not create :
192+ batch = self ._batches .get (topic )
193+ if batch is None :
194+ create = True
195+
196+ if create :
197+ batch = self ._batch_class (
198+ autocommit = autocommit ,
199+ client = self ,
200+ settings = self .batch_settings ,
201+ topic = topic ,
202+ )
203+ self ._batches [topic ] = batch
204204
205205 return batch
206206
@@ -242,12 +242,17 @@ def publish(self, topic, data, **attrs):
242242 instance that conforms to Python Standard library's
243243 :class:`~concurrent.futures.Future` interface (but not an
244244 instance of that class).
245+
246+ Raises:
247+ RuntimeError:
248+ If called after publisher has been stopped
249+ by a `stop()` method call.
245250 """
246251 # Sanity check: Is the data being sent as a bytestring?
247252 # If it is literally anything else, complain loudly about it.
248253 if not isinstance (data , six .binary_type ):
249254 raise TypeError (
250- "Data being published to Pub/Sub must be sent " " as a bytestring."
255+ "Data being published to Pub/Sub must be sent as a bytestring."
251256 )
252257
253258 # Coerce all attributes to text strings.
@@ -266,11 +271,38 @@ def publish(self, topic, data, **attrs):
266271 message = types .PubsubMessage (data = data , attributes = attrs )
267272
268273 # Delegate the publishing to the batch.
269- batch = self ._batch (topic )
270- future = None
271- while future is None :
272- future = batch .publish (message )
273- if future is None :
274- batch = self ._batch (topic , create = True )
274+ with self ._batch_lock :
275+ if self ._is_stopped :
276+ raise RuntimeError ("Cannot publish on a stopped publisher." )
277+
278+ batch = self ._batch (topic )
279+ future = None
280+ while future is None :
281+ future = batch .publish (message )
282+ if future is None :
283+ batch = self ._batch (topic , create = True )
275284
276285 return future
286+
287+ def stop (self ):
288+ """Immediately publish all outstanding messages.
289+
290+ Asynchronously sends all outstanding messages and
291+ prevents future calls to `publish()`. Method should
292+ be invoked prior to deleting this `Client()` object
293+ in order to ensure that no pending messages are lost.
294+
295+ .. note::
296+
297+ This method is non-blocking. Use `Future()` objects
298+ returned by `publish()` to make sure all publish
299+ requests completed, either in success or error.
300+ """
301+ with self ._batch_lock :
302+ if self ._is_stopped :
303+ raise RuntimeError ("Cannot stop a publisher already stopped." )
304+
305+ self ._is_stopped = True
306+
307+ for batch in self ._batches .values ():
308+ batch .commit ()
0 commit comments