问题描述
我正在使用Rust和Tokio编写多线程并发的Kafka生产者。该项目有2种模式,一种是在无限循环中运行的交互模式,另一种是将文件作为参数然后读取文件并通过多个线程将这些消息发送给Kafka的文件模式。互动模式效果很好!但是文件模式有问题。
为实现这一目标,我最初是从Rayon开始的,但后来切换到了更灵活的运行时。东京现在,我能够并行化在tokio中通过指定数量的线程发送数据的任务,但是,似乎在生成所有消息之前,运行时已被丢弃。这是我的代码:
pub fn worker(brokers: String,f: File,t: usize,topic: Arc<String>) {
let reader = BufReader::new(f);
let mut rt = runtime::Builder::new()
.threaded_scheduler()
.core_threads(t)
.build()
.unwrap();
let producers: Arc<Vec<Mutex<BaseProducer>>> = Arc::new(
(0..t)
.map(|_| get_producer(&brokers))
.collect::<Vec<Mutex<BaseProducer>>>(),);
let acounter = atomic::AtomicUsize::new(0);
let _results: Vec<_> = reader
.lines()
.map(|line| line.unwrap())
.map(move |line| {
let prods = producers.clone();
let tp = topic.clone();
let cnt = acounter.swap(
(acounter.load(atomic::Ordering::SeqCst) + 1) % t,atomic::Ordering::SeqCst,);
rt.block_on(async move {
match prods[cnt]
.lock()
.unwrap()
.send(BaseRecord::to(&(*tp)).payload(&line).key(""))
{
Ok(_) => (),Err(e) => eprintln!("{:?}",e),};
})
})
.collect();
}
fn get_producer(brokers: &String) -> Mutex<BaseProducer> {
Mutex::new(
BaseProducer::from_config(
ClientConfig::new()
.set("bootstrap.servers",&brokers)
.set("message.timeout.ms","5000"),)
.expect("Producer creation error"),)
}
作为高级演练:我创建可变的生产者,使其等于指定的线程数,并且该线程中的每个任务都将使用这些生产者之一。依次逐行读取文件,然后将每一行移入封闭文件,将其作为消息发送给Kafka。
在大多数情况下,代码可以正常工作,但是存在一些问题,即使我在运行时使用block_on
函数,也无法完成所有任务而退出运行时。应该阻塞直到将来完成为止(在我的例子中,异步阻塞在这里)。
我认为问题在于,运行时被丢弃,而Tokio中的所有线程均未成功退出。
我尝试用这种方法读取文件,将100,000条记录记录在一个线程上,我能够产生28,000条记录。在2个线程上,接近46,000条记录。并且在利用我的cpu的所有8个内核时,我不确定地收到99,000-100,000条消息。
我已经检查了关于SO的几个答案,但对我来说没有帮助。我还阅读了tokio :: runtime :: Runtime here的文档,并尝试使用spawn然后使用futures :: future :: join,但这都不起作用。
感谢您的帮助!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)