8000 add STT v2 API infinite streaming example (#11847) · dtest/python-docs-samples@5b6e3d9 · GitHub
[go: up one dir, main page]

Skip to content
8000

Commit 5b6e3d9

Browse files
authored
add STT v2 API infinite streaming example (GoogleCloudPlatform#11847)
Fixes GoogleCloudPlatform#11596 --------- Signed-off-by: Gang Chen <gangcchen@google.com>
1 parent 4fb6db7 commit 5b6e3d9

File tree

3 files changed

+467
-1
lines changed

3 files changed

+467
-1
lines changed

speech/microphone/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
google-cloud-speech==2.20.0
1+
google-cloud-speech==2.26.0
22
pyaudio==0.2.13
33
six==1.16.0
44

Lines changed: 378 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Google Cloud Speech V2 API sample application using the streaming API.
16+
17+
NOTE: This module requires the dependencies `pyaudio` and `termcolor`.
18+
To install using pip:
19+
20+
pip install pyaudio
21+
pip install termcolor
22+
23+
Example usage:
24+
python transcribe_streaming_infinite_v2.py gcp_project_id
25+
"""
26+
27+
# [START speech_transcribe_infinite_streaming_v2]
28+
29+
import argparse
30+
import queue
31+
import re
32+
import sys
33+
import time
34+
35+
from google.cloud.speech_v2 import SpeechClient
36+
from google.cloud.speech_v2.types import cloud_speech as cloud_speech_types
37+
import pyaudio
38+
39+
# Audio recording parameters
40+
STREAMING_LIMIT = 240000 # 4 minutes
41+
SAMPLE_RATE = 16000
42+
CHUNK_SIZE = int(SAMPLE_RATE / 10) # 100ms
43+
44+
RED = "\033[0;31m"
45+
GREEN = "\033[0;32m"
46+
YELLOW = "\033[0;33m"
47+
48+
49+
def get_current_time() -> int:
50+
"""Return Current Time in MS.
51+
52+
Returns:
53+
int: Current Time in MS.
54+
"""
55+
56+
return int(round(time.time() * 1000))
57+
58+
59+
class ResumableMicrophoneStream:
60+
"""Opens a recording stream as a generator yielding the audio chunks."""
61+
62+
def __init__(
63+
self: object,
64+
rate: int,
65+
chunk_size: int,
66+
) -> None:
67+
"""Creates a resumable microphone stream.
68+
69+
Args:
70+
self: The class instance.
71+
rate: The audio file's sampling rate.
72+
chunk_size: The audio file's chunk size.
73+
74+
returns: None
75+
"""
76+
self._rate = rate
77+
self.chunk_size = chunk_size
78+
self._num_channels = 1
79+
self._buff = queue.Queue()
80+
self.closed = True
81+
self.start_time = get_current_time()
82+
self.restart_counter = 0
83+
self.audio_input = []
84+
self.last_audio_input = []
85+
self.result_end_time = 0
86+
self.is_final_end_time = 0
87+
self.final_request_end_time = 0
88+
self.bridging_offset = 0
89+
self.last_transcript_was_final = False
90+
self.new_stream = True
91+
self._audio_interface = pyaudio.PyAudio()
92+
self._audio_stream = self._audio_interface.open(
93+
format=pyaudio.paInt16,
94+
channels=self._num_channels,
95+
rate=self._rate,
96+
input=True,
97+
frames_per_buffer=self.chunk_size,
98+
# Run the audio stream asynchronously to fill the buffer object.
99+
# This is necessary so that the input device's buffer doesn't
100+
# overflow while the calling thread makes network requests, etc.
101+
stream_callback=self._fill_buffer,
102+
)
103+
104+
def __enter__(self: object) -> object:
105+
"""Opens the stream.
106+
107+
Args:
108+
self: The class instance.
109+
110+
returns: None
111+
"""
112+
self.closed = False
113+
return self
114+
115+
def __exit__(
116+
self: object,
117+
type: object,
118+
value: object,
119+
traceback: object,
120+
) -> object:
121+
"""Closes the stream and releases resources.
122+
123+
Args:
124+
self: The class instance.
125+
type: The exception type.
126+
value: The exception value.
127+
traceback: The exception traceback.
128+
129+
returns: None
130+
"""
131+
self._audio_stream.stop_stream()
132+
self._audio_stream.close()
133+
self.closed = True
134+
# Signal the generator to terminate so that the client's
135+
# streaming_recognize method will not block the process termination.
136+
self._buff.put(None)
137+
self._audio_interface.terminate()
138+
139+
def _fill_buffer(
140+
self: object,
141+
in_data: object,
142+
*args: object,
143+
**kwargs: object,
144+
) -> object:
145+
"""Continuously collect data from the audio stream, into the buffer.
146+
147+
Args:
148+
self: The class instance.
149+
in_data: The audio data as a bytes object.
150+
args: Additional arguments.
151+
kwargs: Additional arguments.
152+
153+
returns: None
154+
"""
155+
self._buff.put(in_data)
156+
return None, pyaudio.paContinue
157+
158+
def generator(self: object) -> object:
159+
"""Stream Audio from microphone to API and to local buffer
160+
161+
Args:
162+
self: The class instance.
163+
164+
returns:
165+
The data from the audio stream.
166+
"""
167+
while not self.closed:
168+
data = []
169+
170+
if self.new_stream and self.last_audio_input:
171+
chunk_time = STREAMING_LIMIT / len(self.last_audio_input)
172+
173+
if chunk_time != 0:
174+
if self.bridging_offset < 0:
175+
self.bridging_offset = 0
176+
177+
if self.bridging_offset > self.final_request_end_time:
178+
self.bridging_offset = self.final_request_end_time
179+
180+
chunks_from_ms = round(
181+
(self.final_request_end_time - self.bridging_offset)
182+
/ chunk_time
183+
)
184+
185+
self.bridging_offset = round(
186+
(len(self.last_audio_input) - chunks_from_ms) * chunk_time
187+
)
188+
189+
for i in range(chunks_from_ms, len(self.last_audio_input)):
190+
data.append(self.last_audio_input[i])
191+
192+
self.new_stream = False
193+
194+
# Use a blocking get() to ensure there's at least one chunk of
195+
# data, and stop iteration if the chunk is None, indicating the
196+
# end of the audio stream.
197+
chunk = self._buff.get()
198+
self.audio_input.append(chunk)
199+
200+
if chunk is None:
201+
return
202+
data.append(chunk)
203+
# Now consume whatever other data's still buffered.
204+
while True:
205+
try:
206+
chunk = self._buff.get(block=False)
207+
208+
if chunk is None:
209+
return
210+
data.append(chunk)
211+
self.audio_input.append(chunk)
212+
213+
except queue.Empty:
214+
break
215+
216+
yield b"".join(data)
217+
218+
219+
def listen_print_loop(responses: object, stream: object) -> None:
220+
"""Iterates through server responses and prints t 9920 hem.
221+
222+
The responses passed is a generator that will block until a response
223+
is provided by the server.
224+
225+
Each response may contain multiple results, and each result may contain
226+
multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we
227+
print only the transcription for the top alternative of the top result.
228+
229+
In this case, responses are provided for interim results as well. If the
230+
response is an interim one, print a line feed at the end of it, to allow
231+
the next result to overwrite it, until the response is a final one. For the
232+
final one, print a newline to preserve the finalized transcription.
233+
234+
Arg:
235+
responses: The responses returned from the API.
236+
stream: The audio stream to be processed.
237+
"""
238+
for response in responses:
239+
if get_current_time() - stream.start_time > STREAMING_LIMIT:
240+
stream.start_time = get_current_time()
241+
break
242+
243+
if not response.results:
244+
continue
245+
246+
result = response.results[0]
247+
248+
if not result.alternatives:
249+
continue
250+
251+
transcript = result.alternatives[0].transcript
252+
253+
result_seconds = 0
254+
result_micros = 0
255+
256+
# Speech-to-text V2 result uses attribute result_end_offset instead of result_end_time
257+
# https://cloud.google.com/speech-to-text/v2/docs/reference/rest/v2/StreamingRecognitionResult
258+
if result.result_end_offset.seconds:
259+
result_seconds = result.result_end_offset.seconds
260+
261+
if result.result_end_offset.microseconds:
262+
result_micros = result.result_end_offset.microseconds
263+
264+
stream.result_end_time = int((result_seconds * 1000) + (result_micros / 1000))
265+
266+
corrected_time = (
267+
stream.result_end_time
268+
- stream.bridging_offset
269+
+ (STREAMING_LIMIT * stream.restart_counter)
270+
)
271+
# Display interim results, but with a carriage return at the end of the
272+
# line, so subsequent lines will overwrite them.
273+
274+
if result.is_final:
275+
sys.stdout.write(GREEN)
276+
sys.stdout.write("\033[K")
277+
sys.stdout.write(str(corrected_time) + ": " + transcript + "\n")
278+
279+
stream.is_final_end_time = stream.result_end_time
280+
stream.last_transcript_was_final = True
281+
282+
# Exit recognition if any of the transcribed phrases could be
283+
# one of our keywords.
284+
if re.search(r"\b(exit|quit)\b", transcript, re.I):
285+
sys.stdout.write(YELLOW)
286+
sys.stdout.write("Exiting...\n")
287+
stream.closed = True
288+
break
289+
else:
290+
sys.stdout.write(RED)
291+
sys.stdout.write("\033[K")
292+
sys.stdout.write(str(corrected_time) + ": " + transcript + "\r")
293+
294+
stream.last_transcript_was_final = False
295+
296+
297+
def main(project_id: str) -> None:
298+
"""start bidirectional streaming from microphone input to speech API"""
299+
client = SpeechClient()
300+
301+
recognition_config = cloud_speech_types.RecognitionConfig(
302+
explicit_decoding_config=cloud_speech_types.ExplicitDecodingConfig(
303+
sample_rate_hertz=SAMPLE_RATE,
304+
encoding=cloud_speech_types.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
305+
audio_channel_count=1
306+
),
307+
language_codes=["en-US"],
308+
model="long",
309+
)
310+
streaming_config = cloud_speech_types.StreamingRecognitionConfig(
311+
config=recognition_config,
312+
streaming_features=cloud_speech_types.StreamingRecognitionFeatures(
313+
interim_results=True
314+
)
315+
)
316+
config_request = cloud_speech_types.StreamingRecognizeRequest(
317+
recognizer=f"projects/{project_id}/locations/global/recognizers/_",
318+
streaming_config=streaming_config,
319+
)
320+
321+
def requests(config: cloud_speech_types.RecognitionConfig, audio: list) -> list:
322+
"""Helper function to generate the requests list for the streaming API.
323+
324+
Args:
325+
config: The speech recognition configuration.
326+
audio: The audio data.
327+
Returns:
328+
The list of requests for the streaming API.
329+
"""
330+
yield config
331+
for chunk in audio:
332+
yield cloud_speech_types.StreamingRecognizeRequest(audio=chunk)
333+
334+
mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE)
335+
print(mic_manager.chunk_size)
336+
sys.stdout.write(YELLOW)
337+
sys.stdout.write('\nListening, say "Quit" or "Exit" to stop.\n\n')
338+
sys.stdout.write("End (ms) Transcript Results/Status\n")
339+
sys.stdout.write("=====================================================\n")
340+
341+
with mic_manager as stream:
342+
while not stream.closed:
343+
sys.stdout.write(YELLOW)
344+
sys.stdout.write(
345+
"\n" + str(STREAMING_LIMIT * stream.restart_counter) + ": NEW REQUEST\n"
346+
)
347+
348+
stream.audio_input = []
349+
audio_generator = stream.generator()
350+
351+
# Transcribes the audio into text
352+
responses_iterator = client.streaming_recognize(
353+
requests=requests(config_request, audio_generator))
354+
355+
listen_print_loop(responses_iterator, stream)
356+
357+
if stream.result_end_time > 0:
358+
stream.final_request_end_time = stream.is_final_end_time
359+
stream.result_end_time = 0
360+
stream.last_audio_input = []
361+
stream.last_audio_input = stream.audio_input
362+
stream.audio_input = []
363+
stream.restart_counter = stream.restart_counter + 1
364+
365+
if not stream.last_transcript_was_final:
366+
sys.stdout.write("\n")
367+
stream.new_stream = True
368+
369+
370+
if __name__ == "__main__":
371+
parser = argparse.ArgumentParser(
372+
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
373+
)
374+
parser.add_argument("project_id", help="GCP Project ID")
375+
args = parser.parse_args()
376+
main(args.project_id)
377+
378+
# [END speech_transcribe_infinite_streaming_v2]

0 commit comments

Comments
 (0)
0