如何使用 Tonic 对 gRPC 双向流执行半关闭?

问题描述

我正在使用 Tonic 库为 Google's Dialogflow 流检测意图实现 Rust 库。

流媒体工作正常,但是一旦所有音频数据都被推送到流中,我需要发送一个 half-close 调用。如果没有这个,检测意图流将不会使用最终文本调用意图检测,并且会超时抱怨没有提供音频数据超过 11 秒。

我如何通过 Tonic 发送半关闭?我需要类似 this Golang example 的东西。

我的防锈代码如下。注意:明确删除发件人似乎没有任何区别。结果将是相同的(超时,API 永远不会识别出不会有更多数据到达,因此它应该已经执行 NLP)。

use google_cognitive_apis::api::grpc::google::cloud::dialogflow::v2beta1::{
    query_input::Input,InputAudioConfig,QueryInput,StreamingDetectIntentRequest,};
use google_cognitive_apis::dialogflow::sessions_client::SessionsClient;

use log::*;
use std::env;
use std::fs::{self,File};
use std::io::Read;

#[tokio::main]
async fn main() {
    env::set_var("RUST_LOG","info");
    env_logger::init();
    info!("sessions_client_streaming_detect_intent example");

    let credentials = fs::read_to_string("/tmp/gcp-cred.json").unwrap();

    let guid = "10db5977-7f28-4a57-92fb-88459ff8c239";
    let session_id = SessionsClient::get_session_string("gcp-proj-id",guid);

    #[allow(deprecated)]
    let streaming_detect_intent_req = StreamingDetectIntentRequest {
        session: session_id.to_owned(),query_params: None,query_input: Some(QueryInput {
            input: Some(Input::AudioConfig(InputAudioConfig {
                audio_encoding: 1,// linear16
                sample_rate_hertz: 8000,language_code: "en".to_owned(),enable_word_info: false,phrase_hints: vec![],speech_contexts: vec![],model: "".to_string(),model_variant: 0,single_utterance: false,disable_no_speech_recognized_event: false,})),}),output_audio_config: None,output_audio_config_mask: None,input_audio: vec![],};

    let mut sessions_client =
        SessionsClient::create_async(credentials,streaming_detect_intent_req,None)
            .await
            .unwrap();

    let audio_sender = sessions_client.get_audio_sink().unwrap();

    let mut result_receiver = sessions_client.get_streaming_result_receiver(None);

    tokio::spawn(async move {
        let recognition_result = sessions_client.streaming_detect_intent().await;

        match recognition_result {
            Err(err) => error!("streaming_detect_intent error {:?}",err),Ok(_) => info!("streaming_detect_intent ok!"),}
    });

    tokio::spawn(async move {
        let mut file = File::open("/tmp/hello_rust_8.wav").unwrap();
        let chunk_size = 1024;

        loop {
            let mut chunk = Vec::with_capacity(chunk_size);
            let n = file
                .by_ref()
                .take(chunk_size as u64)
                .read_to_end(&mut chunk)
                .unwrap();
            if n == 0 {
                break;
            }

            let streaming_request =
                SessionsClient::streaming_request_from_bytes(session_id.to_string(),chunk);

            audio_sender.send(streaming_request).await.unwrap();

            if n < chunk_size {
                drop(audio_sender);
                info!("audio sender dropped");
                break;
            }
        }
    });

    while let Some(reco_result) = result_receiver.recv().await {
        info!("recognition result {:?}",reco_result);
    }
}

示例日志:

[2021-05-21T10:15:15Z INFO session_client_streaming_detect_intent] session_client_streaming_detect_intent 示例

[2021-05-21T10:15:16Z INFO sessions_client_streaming_detect_intent] 音频发送器掉线

[2021-05-21T10:15:16Z INFO sessions_client_streaming_detect_intent] 识别结果 StreamingDetectIntentResponse { response_id: "",kNowledge_result: Some(StreamingRecognitionResult { message_type: Transcript,transcript: "he llo”,is_final:false,置信度:0.0,稳定性:0.01,speech_word_info:[],speech_end_offset:一些(持续时间{秒:0,纳米:480000000}),dtmf_digits:无}),query_result:无,alternative_query_results:[],webhook_stat 我们:无,输出音频:[],输出音频配置:无}

[2021-05-21T10:15:17Z INFO sessions_client_streaming_detect_intent] 识别结果 StreamingDetectIntentResponse { response_id: "",transcript: "he llo Ray”,is_final:false,置信度:0.0,稳定性:0.01,speech_word_info:[],speech_end_offset:一些(持续时间{秒:0,纳米:930000000}),dtmf_digits:无}),query_result:无,alternative_query_results:[ ],网络钩子_ 状态:无,输出音频:[],输出音频配置:无}

[2021-05-21T10:15:17Z INFO sessions_client_streaming_detect_intent] 识别结果 StreamingDetectIntentResponse { response_id: "",transcript: "he llo Russ”,is_final:false,置信度:0.0,稳定性:0.01,speech_word_info:[],speech_end_offset:一些(持续时间{秒:0,纳米:960000000}),dtmf_digits:无}),query_result:无,alternative_query_results:[] ],网络钩子 _status: 无,output_audio: [],output_audio_config: None } [2021-05-21T10:15:17Z INFO sessions_client_streaming_detect_intent] 识别结果 StreamingDetectIntentResponse { response_id: "",script: "he

llo Russ”,is_final:false,置信度:0.0,稳定性:0.01,speech_word_info:[],speech_end_offset:Some(Duration { seconds: 1,nanos: 50000000 }),dtmf_digits: None }),query_result: None,替代查询结果:[],webhook_ 状态:无,输出音频:[],输出音频配置:无}

[2021-05-21T10:15:17Z INFO sessions_client_streaming_detect_intent] 识别结果 StreamingDetectIntentResponse { response_id: "",transcript: "he llo rust”,is_final:false,置信度:0.0,稳定性:0.01,speech_word_info:[],speech_end_offset:一些(持续时间{秒:1,纳米:260000000}),dtmf_digits:无}),query_result:无,alternative_query_results:[] ],网络钩子 _status: None,output_audio_config: None }

[2021-05-21T10:15:27Z INFO sessions_client_streaming_detect_intent] 识别结果 StreamingDetectIntentResponse { response_id: "",transcript: "he llo rust”,is_final:true,信心:0.60010684,稳定性:0.0,speech_word_info:[],speech_end_offset:一些(持续时间{秒:1,纳米:380000000}),dtmf_digits:无}),query_result:无,替代], 我们 bhook_status:无,output_audio:[],output_audio_config:无}

[2021-05-21T10:15:27Z ERROR session_client_streaming_detect_intent] streaming_detect_intent 错误错误 { message: "status: OutOfRange,message: "while call Cloud Speech API: Audio Timeout Error: Long duration elapsed without a 音频。音频应该接近实时发送。”,详细信息:[],元数据:MetadataMap { headers:{“grpc-server-stats-bin”:“AAAhxq1zAgAAAA”} }”,代码:无}>

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)