问题描述
我想修改一个库,该库上载异步文件,以便它可以在什么迭代中“公开”或“通知”,我的目标是防止直接从库中“打印/写入”,而是从库中执行使用该库的方法,以便通过使用“公开的”数据可以执行某些操作。我用于在每次交互中测试任务的代码是:
use futures::stream::TryStreamExt;
use tokio_util::codec::{BytesCodec,FramedRead};
let stream = FramedRead::new(file,BytesCodec::new())
.inspect_ok(|chunk| {
// do X with chunk...
});
// reqwest
let body = Body::wrap_stream(stream);
client.put(url).body(body)
我正在考虑使用一个通道来“共享状态”(如果可以这样称呼),这样,如果我收到一个Sender
作为自变量,则可以从该方法写入拨打电话,使用Receiver
做某事(例如显示进度栏),这就是我正在尝试的:
use tokio::sync::mpsc;
fn upload(file: String,sender: Option<mpsc::Sender<usize>) {
let stream = if let Some(mut tx) = sender {
FramedRead::new(file,BytesCodec::new()).inspect_ok(move |chunk|
tx.send(chunk.len())
)
} else {
FramedRead::new(file,BytesCodec::new())
};
// reqwest
let body = Body::wrap_stream(stream);
client.put(url).body(body)
let body = Body::wrap_stream(stream);
}
我得到的错误是:
`if` and `else` have incompatible types
我也尝试过:
let stream = FramedRead::new(file,BytesCodec::new());
if let Some(mut tx) = sender {
stream.inspect_ok(move |chunk|
tx.send(chunk.len())
);
}
但是得到mismatched types
如何实现?有更好的渠道方法吗?
例如,我想使用箱子indicatif这样的东西:
use tokio::sync::mpsc;
use indicatif::ProgressBar;
use mylibrary::upload;
let (mut tx,mut rx) = mpsc::channel(100);
let bar = ProgressBar::new(1000);
// here pass the sender so that in the receiver i could increment the progress bar
let response = upload("file",Some(tx));
while let Some(i) = rx.recv().await {
bar.inc(1);
}
bar.finish();
解决方法
在if
内生成主体,以使两个手臂都键入Body
:
use tokio::sync::mpsc;
fn upload(file: String,sender: Option<mpsc::Sender<usize>) {
let stream = FramedRead::new(file,BytesCodec::new());
let body = if let Some(mut tx) = sender {
Body::wrap_stream(stream
.inspect_ok(move |chunk| tx.send(chunk.len()))
)
} else {
Body::wrap_stream(stream)
};
// not sure where `client` or `url` are defined?
client.put(url).body(body)
}
或者,根据@SvenMarnach's suggestion:
use tokio::sync::mpsc;
use futures::future::Either;
fn upload(file: String,BytesCodec::new());
let stream = if let Some(mut tx) = sender {
Either::Left(stream
.inspect_ok(move |chunk| tx.send(chunk.len()))
)
} else {
Either::Right(stream)
};
let body = Body::wrap_stream(stream);
// not sure where `client` or `url` are defined?
client.put(url).body(body)
}