Skip to content

Commit 08b4da6

Browse files
puneithdpebot
authored andcommitted
streaming code used to hang when it was asked to "exit" - fixed now (GoogleCloudPlatform#635)
1 parent 7bf6833 commit 08b4da6

File tree

1 file changed

+30
-15
lines changed

1 file changed

+30
-15
lines changed

speech/grpc/transcribe_streaming.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,10 @@ def _audio_data_generator(buff):
7373
A chunk of data that is the aggregate of all chunks of data in `buff`.
7474
The function will block until at least one data chunk is available.
7575
"""
76-
while True:
77-
# Use a blocking get() to ensure there's at least one chunk of data
76+
stop = False
77+
while not stop:
78+
# Use a blocking get() to ensure there's at least one chunk of data.
7879
chunk = buff.get()
79-
if not chunk:
80-
# A falsey value indicates the stream is closed.
81-
break
8280
data = [chunk]
8381

8482
# Now consume whatever other data's still buffered.
@@ -87,22 +85,31 @@ def _audio_data_generator(buff):
8785
data.append(buff.get(block=False))
8886
except queue.Empty:
8987
break
88+
89+
# If `_fill_buffer` adds `None` to the buffer, the audio stream is
90+
# closed. Yield the final bit of the buffer and exit the loop.
91+
if None in data:
92+
stop = True
93+
data.remove(None)
9094
yield b''.join(data)
9195

9296

93-
def _fill_buffer(audio_stream, buff, chunk):
97+
def _fill_buffer(audio_stream, buff, chunk, stoprequest):
9498
"""Continuously collect data from the audio stream, into the buffer."""
9599
try:
96-
while True:
100+
while not stoprequest.is_set():
97101
buff.put(audio_stream.read(chunk))
98102
except IOError:
99-
# This happens when the stream is closed. Signal that we're done.
103+
pass
104+
finally:
105+
# Add `None` to the buff, indicating that a stop request is made.
106+
# This will signal `_audio_data_generator` to exit.
100107
buff.put(None)
101108

102109

103110
# [START audio_stream]
104111
@contextlib.contextmanager
105-
def record_audio(rate, chunk):
112+
def record_audio(rate, chunk, stoprequest):
106113
"""Opens a recording stream in a context manager."""
107114
audio_interface = pyaudio.PyAudio()
108115
audio_stream = audio_interface.open(
@@ -120,14 +127,13 @@ def record_audio(rate, chunk):
120127
# This is necessary so that the input device's buffer doesn't overflow
121128
# while the calling thread makes network requests, etc.
122129
fill_buffer_thread = threading.Thread(
123-
target=_fill_buffer, args=(audio_stream, buff, chunk))
130+
target=_fill_buffer, args=(audio_stream, buff, chunk, stoprequest))
124131
fill_buffer_thread.start()
125132

126133
yield _audio_data_generator(buff)
127134

128-
audio_stream.stop_stream()
129-
audio_stream.close()
130135
fill_buffer_thread.join()
136+
audio_stream.close()
131137
audio_interface.terminate()
132138
# [END audio_stream]
133139

@@ -166,7 +172,7 @@ def request_stream(data_stream, rate, interim_results=True):
166172
yield cloud_speech.StreamingRecognizeRequest(audio_content=data)
167173

168174

169-
def listen_print_loop(recognize_stream):
175+
def listen_print_loop(recognize_stream, stoprequest):
170176
num_chars_printed = 0
171177
for resp in recognize_stream:
172178
if resp.error.code != code_pb2.OK:
@@ -198,6 +204,7 @@ def listen_print_loop(recognize_stream):
198204
# one of our keywords.
199205
if re.search(r'\b(exit|quit)\b', transcript, re.I):
200206
print('Exiting..')
207+
stoprequest.set()
201208
break
202209

203210
num_chars_printed = 0
@@ -206,9 +213,17 @@ def listen_print_loop(recognize_stream):
206213
def main():
207214
with cloud_speech.beta_create_Speech_stub(
208215
make_channel('speech.googleapis.com', 443)) as service:
216+
217+
# stoprequest is event object which is set in `listen_print_loop`
218+
# to indicate that the trancsription should be stopped.
219+
#
220+
# The `_fill_buffer` thread checks this object, and closes
221+
# the `audio_stream` once it's set.
222+
stoprequest = threading.Event()
223+
209224
# For streaming audio from the microphone, there are three threads.
210225
# First, a thread that collects audio data as it comes in
211-
with record_audio(RATE, CHUNK) as buffered_audio_data:
226+
with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data:
212227
# Second, a thread that sends requests with that data
213228
requests = request_stream(buffered_audio_data, RATE)
214229
# Third, a thread that listens for transcription responses
@@ -220,7 +235,7 @@ def main():
220235

221236
# Now, put the transcription responses to use.
222237
try:
223-
listen_print_loop(recognize_stream)
238+
listen_print_loop(recognize_stream, stoprequest)
224239

225240
recognize_stream.cancel()
226241
except face.CancellationError:

0 commit comments

Comments
 (0)