如何将消息从标准库生成的线程发送到 Tokio 异步任务?

问题描述

我有一个设置,其中我的程序使用 std::thread::spawn 生成多个线程用于 cpu 密集型计算。

我需要一个 GRPC 服务器来处理传入的命令以及由工作线程完成的流输出。我将 tonic 用于 GRPC 服务器,它仅在 Tokio 未来内提供异步实现。

我需要能够从我的“普通”标准库线程向 Tokio 未来发送消息。

我已经将我的代码精简到最低限度:

use std::thread;
use tokio::sync::mpsc; // 1.9.0

fn main() {
    let (tx,mut rx) = mpsc::channel(1);

    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
    tokio_runtime.spawn(async move {
        // the code below starts the GRPC server in reality,here I'm just demonstrating trying to receive a message
        while let Some(v) = rx.recv().await {}
    });

    let h = thread::spawn(move || {
        // do work
        tx.send(1).await; //<------ error occurs here since I can't await in a non-async block
    });

    h.join().unwrap();
}

我的主要工作线程如何与 Tokio 衍生的 GRPC 服务器通信?

解决方法

您可以使用 tokio 的 sync 功能。有两个选项 - UnboundedSenderSender::blocking_send()

无界发送方的问题在于它没有背压,如果您的生产者比消费者快,您的应用程序可能会因内存不足错误而崩溃,或者耗尽您的生产者使用的其他有限资源。

>

作为一般规则,您应该避免使用无界队列,这样我们就有了使用 blocking_send() 的更好选择:

Playground

use std::thread;
use tokio::sync::mpsc; // 1.9.0

fn main() {
    let (tx,mut rx) = mpsc::channel(1);

    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
    tokio_runtime.spawn(async move {
        // the code below starts the GRPC server in reality,here I'm just demonstrating trying to receive a message
        while let Some(v) = rx.recv().await {
            println!("Received: {:?}",v);
        }
    });

    let h = thread::spawn(move || {
        // do work
        tx.blocking_send(1).unwrap();
    });

    h.join().unwrap();
}