如何将 ParallelIterator 转换回顺序迭代器?

问题描述

我正在遍历数据库中数 GB 的输入项。在每个输入项上,我正在做一些 cpu 密集型处理,产生一个或多个新的输出项,总共数十 GB。然后将输出项存储在另一个数据库表中。

通过使用 Rayon 进行并行处理,我获得了不错的加速。但是,数据库 API 不是线程安全的;它是 Send 但不是 Sync,因此 I/O 必须被序列化。

理想情况下,我只想写:

input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .ser_bridge() // End parallelism. This function does not exist.
    .for_each(|output_item| {
        output_database.write_item(output_item);
    });

基本上我想要par_bridge()的反面;在调用它的线程上运行的东西,从每个线程读取项目,并连续生成它们。但在目前的 Rayon 实现中,这似乎并不存在。我不确定这是因为理论上不可能,还是它不适合库的当前设计。

输出太大,无法先将其全部收集到 Vec 中;它需要直接流式传输到数据库中。

顺便说一下,我没有和 Rayon 结婚;如果有其他更合适的板条箱,我很乐意切换。

解决方法

您可以将输出数据库包装在 Arc<Mutex> 中以防止并行访问:

let output_database = Arc::new (Mutex::new (output_database));
input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .for_each_with (output_database,|output_database,output_item| {
        output_database.lock().write_item(output_item);
    });
,

我认为顺序无关紧要,因此您不需要对输出数据进行排序。

您可以使用 mpsc::channel 将您的数据从 for_each 闭包传输到您的数据库 API,例如

use std::sync::mpsc;

let (tx,rx) = mpsc::channel();

input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .for_each(move |output_item| {
        tx.send(output_item).unwrap();
    });

在第二个线程中,您可以使用 rx 变量接收数据并将其写入数据库。