如何使用Tokio轻松包装同步网络I / O?

问题描述

不幸的是,我对Rust中的并发开发存在明显的了解。这个问题源于为解决看似“琐碎”的问题而进行的数周努力。


问题域

开发一个名为twistrs的Rust库,它是域名置换和枚举库。库的目的是提供一个根域(例如google.com)并生成该域的排列(例如guugle.com)并丰富该排列(例如解析为{{1} }。

其目标之一是使其性能比Python counterpart快得多。最值得注意的是,网络调用,例如但不限于DNS查找。

当前的设计方案

该库背后的想法(除了作为学习基础之外)是开发一个非常琐碎的安全性库,可以将其实现以满足各种需求。您(作为客户端)可以选择在内部直接与permutationenrichment模块进行交互,也可以使用提供的异步/并行实现库。

Twistrs proposed architecture

请注意,内部没有没有共享状态。这可能效率很低,但是暂时避免了很多问题,因此暂时没有意义。

当前问题

在内部,DNS查找是同步完成的,并且本质上是阻止的。我在将其转换为并发代码时遇到麻烦。我能得到的最接近的结果是使用tokio mpsc通道,并执行一个tokio任务:

123.123.123.123

也就是说,精明的读者会立即注意到它被阻止了,并有效地以任何一种方式同步运行了DNS查找。

由于在use twistrs::enrich::{Result,DomainMetadata}; use twistrs::permutate::Domain; use tokio::sync::mpsc; #[tokio::main] async fn main() { let domain = Domain::new("google.com").unwrap(); let _permutations = domain.all().unwrap().collect::<Vec<String>>(); let (mut tx,mut rx) = mpsc::channel(1000); tokio::spawn(async move { for (i,v) in _permutations.into_iter().enumerate() { let domain_metadata = DomainMetadata::new(v.clone()); let dns_resolution = domain_metadata.dns_resolvable(); if let Err(_) = tx.send((i,dns_resolution)).await { println!("receiver dropped"); return; } } }); while let Some(i) = rx.recv().await { println!("got: {:?}",i); } } 上进行了move(并且tx不是隐含tx),因此无法在for循环中尝试生成Tokio任务:

Copy

删除for (i,v) in _permutations.into_iter().enumerate() { tokio::spawn(async move { let domain_metadata = DomainMetadata::new(v.clone()); let dns_resolution = domain_metadata.dns_resolvable(); if let Err(_) = tx.send((i,dns_resolution)).await { println!("receiver dropped"); return; } }); } 当然不会发生任何事情,因为需要轮询生成的任务。我该如何有效地将所有这些同步调用包装到异步任务中,这些任务可以独立运行并最终收敛到集合中?

我遇到的一个类似的Rust项目是batch_resolve,在此方面做得非常好(!)。但是,我发现实现要实现的目标异常复杂(也许我错了)。对于实现此目标的任何帮助或见解,我们深表感谢。

如果您想要一种快速的方法来重现此内容,只需克隆项目并使用本文中的第一个代码段来更新await

解决方法

编辑:我误解了您的问题,但没有意识到DNS解析本身不是异步的。以下方法实际上不适用于同步代码,只会由于阻塞代码而导致执行器停顿,但是如果您切换到异步解析方法,我将保留它。如果适合您的需求,我建议使用tokio的异步lookup_host()


异步执行器旨在处理大量并行任务,因此您可以尝试使用Semaphore为每个正在运行的任务创建上限,从而为每个请求生成一个新任务。该代码可能如下所示:

let (mut tx,mut rx) = mpsc::channel(1000);
let semaphore = Arc::new(Semaphore::new(1000)); // allow up to 1000 tasks to run at once

for (i,v) in _permutations.into_iter().enumerate() {
    let domain_metadata = DomainMetadata::new(v.clone());
    let mut tx = tx.clone(); // every task will have its own copy of the sender
    let permit = semaphore.acquire_owned().await; // wait until we have a permit

    let dns_resolution = domain_metadata.dns_resolvable();
    tokio::spawn(async move {
        if let Err(_) = tx.send((i,dns_resolution)).await {
            println!("receiver dropped");
            return;
        }
        drop(permit); // explicitly release the permit,to make sure it was moved into this task
    }); // note: task spawn results and handle dropped here
}

while let Some(i) = rx.recv().await {
    println!("got: {:?}",i);
}

如果事实证明额外任务的开销过大,则可以尝试使用futures板箱中的FuturesUnordered之类的工具,将任务合并到一个单一的未来中。这样一来,您就可以提取任意数量的期货,并在单个任务中反复轮询所有期货。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...