如何以线程安全的方式使用 Rayon 的 par_iter 读取和修改变量?

问题描述

代码

use rayon::prelude::*; // 1.5.0

fn main() {
    let mut items = Vec::new();
    items.push("hello");
    items.push("foo");
    items.push("bar");
    items.push("ipsum");

    let mut counter = 0;

    let results = items.par_iter().map(|item| {
        // do something time consuming with item
        counter += 1;
        print!("completed {} items\r",counter);
        0
    });
}

产生错误

warning: unused variable: `item`
  --> src/main.rs:12:41
   |
12 |     let results = items.par_iter().map(|item| {
   |                                         ^^^^ help: if this is intentional,prefix it with an underscore: `_item`
   |
   = note: `#[warn(unused_variables)]` on by default

warning: unused variable: `results`
  --> src/main.rs:12:9
   |
12 |     let results = items.par_iter().map(|item| {
   |         ^^^^^^^ help: if this is intentional,prefix it with an underscore: `_results`

error[E0594]: cannot assign to `counter`,as it is a captured variable in a `Fn` closure
  --> src/main.rs:14:9
   |
14 |         counter += 1;
   |         ^^^^^^^^^^^^ cannot assign

解决方法

Rust 通过从两个不同的线程写入相同的变量来防止您在这里发生数据竞争。您有几个选项可以解决这个问题。这真的要视具体情况而定。

  1. 最简单的方法是对 Mutex 使用 counter。这允许您安全访问相同的变量。引入 Mutex 有耗尽并行迭代器所有加速的风险,因为一切都将通过 Mutex 访问获得顺序。如果 map 的运行时间很大并且锁定 Mutex 很短,这是可以接受的。
  2. 对于计数器原子类型的特定情况,例如 AtomicI32 工作良好,但它们很难或不可能用于更复杂的类型。
  3. 这项工作可以并行执行多次,然后合并在一起,而不是直接聚合到单个变量上。这就是人造丝的 reduce 函数所做的。每个线程将至少有一个计数器,它们将合并在一起以产生一个最终结果。