问题描述
不幸的是,我对Rust中的并发开发存在明显的了解。这个问题源于为解决看似“琐碎”的问题而进行的数周努力。
问题域
开发一个名为twistrs的Rust库,它是域名置换和枚举库。库的目的是提供一个根域(例如google.com
)并生成该域的排列(例如guugle.com
)并丰富该排列(例如解析为{{1} }。
其目标之一是使其性能比Python counterpart快得多。最值得注意的是,网络调用,例如但不限于DNS查找。
当前的设计方案
该库背后的想法(除了作为学习基础之外)是开发一个非常琐碎的安全性库,可以将其实现以满足各种需求。您(作为客户端)可以选择在内部直接与permutation或enrichment模块进行交互,也可以使用提供的异步/并行实现库。
请注意,内部没有没有共享状态。这可能效率很低,但是暂时避免了很多问题,因此暂时没有意义。
当前问题
在内部,DNS查找是同步完成的,并且本质上是阻止的。我在将其转换为并发代码时遇到麻烦。我能得到的最接近的结果是使用tokio mpsc通道,并执行一个tokio任务:
123.123.123.123
也就是说,精明的读者会立即注意到它被阻止了,并有效地以任何一种方式同步运行了DNS查找。
由于在use twistrs::enrich::{Result,DomainMetadata};
use twistrs::permutate::Domain;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let domain = Domain::new("google.com").unwrap();
let _permutations = domain.all().unwrap().collect::<Vec<String>>();
let (mut tx,mut rx) = mpsc::channel(1000);
tokio::spawn(async move {
for (i,v) in _permutations.into_iter().enumerate() {
let domain_metadata = DomainMetadata::new(v.clone());
let dns_resolution = domain_metadata.dns_resolvable();
if let Err(_) = tx.send((i,dns_resolution)).await {
println!("receiver dropped");
return;
}
}
});
while let Some(i) = rx.recv().await {
println!("got: {:?}",i);
}
}
上进行了move
(并且tx
不是隐含tx
),因此无法在for循环中尝试生成Tokio任务:
Copy
删除for (i,v) in _permutations.into_iter().enumerate() {
tokio::spawn(async move {
let domain_metadata = DomainMetadata::new(v.clone());
let dns_resolution = domain_metadata.dns_resolvable();
if let Err(_) = tx.send((i,dns_resolution)).await {
println!("receiver dropped");
return;
}
});
}
当然不会发生任何事情,因为需要轮询生成的任务。我该如何有效地将所有这些同步调用包装到异步任务中,这些任务可以独立运行并最终收敛到集合中?
我遇到的一个类似的Rust项目是batch_resolve,在此方面做得非常好(!)。但是,我发现实现要实现的目标异常复杂(也许我错了)。对于实现此目标的任何帮助或见解,我们深表感谢。
如果您想要一种快速的方法来重现此内容,只需克隆项目并使用本文中的第一个代码段来更新await
。
解决方法
编辑:我误解了您的问题,但没有意识到DNS解析本身不是异步的。以下方法实际上不适用于同步代码,只会由于阻塞代码而导致执行器停顿,但是如果您切换到异步解析方法,我将保留它。如果适合您的需求,我建议使用tokio的异步lookup_host()
。
异步执行器旨在处理大量并行任务,因此您可以尝试使用Semaphore
为每个正在运行的任务创建上限,从而为每个请求生成一个新任务。该代码可能如下所示:
let (mut tx,mut rx) = mpsc::channel(1000);
let semaphore = Arc::new(Semaphore::new(1000)); // allow up to 1000 tasks to run at once
for (i,v) in _permutations.into_iter().enumerate() {
let domain_metadata = DomainMetadata::new(v.clone());
let mut tx = tx.clone(); // every task will have its own copy of the sender
let permit = semaphore.acquire_owned().await; // wait until we have a permit
let dns_resolution = domain_metadata.dns_resolvable();
tokio::spawn(async move {
if let Err(_) = tx.send((i,dns_resolution)).await {
println!("receiver dropped");
return;
}
drop(permit); // explicitly release the permit,to make sure it was moved into this task
}); // note: task spawn results and handle dropped here
}
while let Some(i) = rx.recv().await {
println!("got: {:?}",i);
}
如果事实证明额外任务的开销过大,则可以尝试使用futures
板箱中的FuturesUnordered
之类的工具,将任务合并到一个单一的未来中。这样一来,您就可以提取任意数量的期货,并在单个任务中反复轮询所有期货。