如何手动将谷歌云语音设置为文本 result.is_final 为真

问题描述

我正在用 python 编写一个脚本来转录电话,我正在使用谷歌云语音到文本 api 以便这样做,但问题是谷歌 stt 将整个对话转录为文本行,它不考虑将 is_final 标志打开为 true 的任何静音,它只会在 1 分钟后打开 is_final 为 true,因为 google stt 最多只能转录 1 分钟,然后它会自行重置。

所以我想问的是:

我可以手动将 is_final 设置为 true 吗?

这是我的代码:

#!/usr/bin/env python3
import os,sys,wave,threading

from flask import Flask,request,jsonify
from flask_sockets import Sockets
from google.cloud.speech import RecognitionConfig,StreamingRecognitionConfig
from models.SpeechClientBridge import SpeechClientBridge

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = PATH

config = RecognitionConfig(
    encoding=RecognitionConfig.AudioEncoding.LINEAR16,sample_rate_hertz=16000,language_code="iw-IL",)
streaming_config = StreamingRecognitionConfig(config=config,interim_results=True )

app = Flask(__name__)
sockets = Sockets(app)


@app.route("/ncco")
def answer_call():
    ncco = [
        {
            "action": "connect","from": "number","endpoint": [
                {
                    "type": "websocket","uri": "wss://{0}/socket".format(request.host),"content-type": "audio/l16;rate=16000",}
            ],},]

    return jsonify(ncco)


def on_transcription_response(response,ws):
    num_chars_printed = 0
    if not response.results:
        return

    result = response.results[0]
    if not result.alternatives:
        return

    transcript = result.alternatives[0].transcript
    overwrite_chars = " " * (num_chars_printed - len(transcript))

    # print(result.is_final)

    if not result.is_final:
        sys.stdout.write(transcript + overwrite_chars + "\r")
        sys.stdout.flush()
        num_chars_printed = len(transcript)


    else:
        print('==>'+transcript + overwrite_chars)


@app.route("/event",methods=["POST"])
def events():
    return "200"


@sockets.route("/socket",methods=["GET"])
def transcript(ws):
    print ('req:'+ str(request.data))
    bridge = SpeechClientBridge(streaming_config,on_transcription_response,ws)
    t = threading.Thread(target=bridge.start)
    t.start()

    while not ws.closed:
        message = ws.receive()
        if message is None:
            bridge.add_request(None)
            bridge.terminate()
            break
        if type(message) == str:
            print(message)
        else:
            bridge.add_request(message)


    bridge.terminate()
    print("WS connection closed")


if __name__ == "__main__":
    from gevent import pywsgi
    from geventwebsocket.handler import WebSocketHandler
    server = pywsgi.WSGIServer(("",3000),app,handler_class=WebSocketHandler)
    print('server is Up')
    server.serve_forever()

SpeechClientBridge 类:

import queue

from google.cloud import speech


class SpeechClientBridge:
    def __init__(self,streaming_config,on_response,ws):
        self._on_response = on_response
        self._ws = ws # webSocket
        self._queue = queue.Queue()
        self._ended = False
        self.streaming_config = streaming_config

    def start(self):
        client = speech.SpeechClient()
        stream = self.generator()
        requests = (
            speech.StreamingRecognizeRequest(audio_content=content)
            for content in stream
        )
        responses = client.streaming_recognize(self.streaming_config,requests)
        self.process_responses_loop(responses)

    def terminate(self):
        self._ended = True

    def add_request(self,buffer):
        if buffer:
            self._queue.put(bytes(buffer),block=False)
        else:
            self._queue.put(buffer,block=False)

    def process_responses_loop(self,responses):
        for response in responses:
            print(response)
            self._on_response(response,self._ws)

            if self._ended:
                break

    def generator(self):
        while not self._ended:
            # Use a blocking get() to ensure there's at least one chunk of
            # data,and stop iteration if the chunk is None,indicating the
            # end of the audio stream.
            chunk = self._queue.get()
            if chunk is None:
                return
            data = [chunk]

            # Now consume whatever other data's still buffered.
            while True:
                try:
                    chunk = self._queue.get(block=False)
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break

            yield b"".join(data)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)