rust - 如何让每个 CPU 核心可变访问 Vec 的一部分?

标签 rust

<分区>

我有一段令人尴尬的并行图形渲染代码,我想在我的 CPU 内核上运行。我已经编写了一个测试用例(计算的函数是无意义的)来探索如何并行化它。我想使用 std Rust 对此进行编码,以便了解如何使用 std::thread。但是,我不明白如何为每个线程分配一部分帧缓冲区。我将把完整的测试用例代码放在下面,但我会先尝试分解它。

顺序形式非常简单:

let mut buffer0 = vec![vec![0i32; WIDTH]; HEIGHT];
for j in 0..HEIGHT {
    for i in 0..WIDTH {
        buffer0[j][i] = compute(i as i32,j as i32);
    }
}

我认为制作一个大小相同但重新排列为 3D 并首先由核心索引的缓冲区会有所帮助。这是相同的计算,只是对数据重新排序以显示工作原理。

let mut buffer1 = vec![vec![vec![0i32; WIDTH]; y_per_core]; num_logical_cores];
for c in 0..num_logical_cores {
    for y in 0..y_per_core {
        let j = y*num_logical_cores + c;
        if j >= HEIGHT {
            break;
        }
        for i in 0..WIDTH {
            buffer1[c][y][i] = compute(i as i32,j as i32)
        }
    }
}

但是,当我尝试将代码的内部部分放入闭包中并创建一个线程时,我得到了有关缓冲区和生命周期的错误。我基本上不明白该怎么做并且可以使用一些指导。我希望 per_core_buffer 只是临时引用属于该核心的 buffer2 中的数据并允许写入它,同步所有线程然后读取 buffer2 之后。这可能吗?

let mut buffer2 = vec![vec![vec![0i32; WIDTH]; y_per_core]; num_logical_cores];
let mut handles = Vec::new();
for c in 0..num_logical_cores {
    let per_core_buffer = &mut buffer2[c]; // <<< lifetime error
    let handle = thread::spawn(move || {
        for y in 0..y_per_core {
            let j = y*num_logical_cores + c;
            if j >= HEIGHT {
                break;
            }
            for i in 0..WIDTH {
                per_core_buffer[y][i] = compute(i as i32,j as i32)
            }
        }
    });
    handles.push(handle)
}
for handle in handles {
    handle.join().unwrap();
}

错误是这样的,我不明白:

error[E0597]: `buffer2` does not live long enough
  --> src/main.rs:50:36
   |
50 |         let per_core_buffer = &mut buffer2[c]; // <<< lifetime error
   |                                    ^^^^^^^ borrowed value does not live long enough
...
88 | }
   | - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

完整的测试用例是:

extern crate num_cpus;
use std::time::Instant;
use std::thread;

fn compute(x: i32, y: i32) -> i32 {
    (x*y) % (x+y+10000)
}

fn main() {
    let num_logical_cores = num_cpus::get();
    const WIDTH: usize = 40000;
    const HEIGHT: usize = 10000;
    let y_per_core = HEIGHT/num_logical_cores + 1;

    // ------------------------------------------------------------
    // Serial Calculation...
    let mut buffer0 = vec![vec![0i32; WIDTH]; HEIGHT];
    let start0 = Instant::now();
    for j in 0..HEIGHT {
        for i in 0..WIDTH {
            buffer0[j][i] = compute(i as i32,j as i32);
        }
    }
    let dur0 = start0.elapsed();

    // ------------------------------------------------------------
    // On the way to Parallel Calculation...
    // Reorder the data buffer to be 3D with one 2D region per core.
    let mut buffer1 = vec![vec![vec![0i32; WIDTH]; y_per_core]; num_logical_cores];
    let start1 = Instant::now();
    for c in 0..num_logical_cores {
        for y in 0..y_per_core {
            let j = y*num_logical_cores + c;
            if j >= HEIGHT {
                break;
            }
            for i in 0..WIDTH {
                buffer1[c][y][i] = compute(i as i32,j as i32)
            }
        }
    }
    let dur1 = start1.elapsed();

    // ------------------------------------------------------------
    // Actual Parallel Calculation...
    let mut buffer2 = vec![vec![vec![0i32; WIDTH]; y_per_core]; num_logical_cores];
    let mut handles = Vec::new();
    let start2 = Instant::now();
    for c in 0..num_logical_cores {
        let per_core_buffer = &mut buffer2[c]; // <<< lifetime error
        let handle = thread::spawn(move || {
            for y in 0..y_per_core {
                let j = y*num_logical_cores + c;
                if j >= HEIGHT {
                    break;
                }
                for i in 0..WIDTH {
                    per_core_buffer[y][i] = compute(i as i32,j as i32)
                }
            }
        });
        handles.push(handle)
    }
    for handle in handles {
        handle.join().unwrap();
    }
    let dur2 = start2.elapsed();

    println!("Runtime: Serial={0:.3}ms, AlmostParallel={1:.3}ms, Parallel={2:.3}ms",
             1000.*dur0.as_secs() as f64 + 1e-6*(dur0.subsec_nanos() as f64),
             1000.*dur1.as_secs() as f64 + 1e-6*(dur1.subsec_nanos() as f64),
             1000.*dur2.as_secs() as f64 + 1e-6*(dur2.subsec_nanos() as f64));

    // Sanity check
    for j in 0..HEIGHT {
        let c = j % num_logical_cores;
        let y = j / num_logical_cores;
        for i in 0..WIDTH {
            if buffer0[j][i] != buffer1[c][y][i] {
                println!("wtf1? {0} {1} {2} {3}",i,j,buffer0[j][i],buffer1[c][y][i])
            }
            if buffer0[j][i] != buffer2[c][y][i] {
                println!("wtf2? {0} {1} {2} {3}",i,j,buffer0[j][i],buffer2[c][y][i])
            }
        }
    }

}

最佳答案

感谢@Shepmaster 的指示和说明,这对 Rust 来说不是一个简单的问题,我需要考虑 crate 以找到合理的解决方案。我才刚刚开始使用 Rust,所以我真的不清楚。

我喜欢控制线程数量的能力 scoped_threadpool给,所以我同意了。直接从上面翻译我的代码,我尝试使用带有核心的 4D 缓冲区作为最重要的索引,但遇到了麻烦,因为该 3D 向量没有实现 Copy 特性。它实现了 Copy 的事实让我担心性能,但我回到最初的问题并更直接地实现它并通过使每一行成为一个线程找到了合理的加速。复制每一行不会有很大的内存开销。

对我有用的代码是:

let mut buffer2 = vec![vec![0i32; WIDTH]; HEIGHT];
let mut pool = Pool::new(num_logical_cores as u32);
pool.scoped(|scope| {
    let mut y = 0;
    for e in &mut buffer2 {
        scope.execute(move || {
            for x in 0..WIDTH {
                (*e)[x] = compute(x as i32,y as i32);
            }
        });
        y += 1;
    }
});

在用于 400000x4000 测试用例的 6 核 12 线程 i7-8700K 上,串行运行时间为 3.2 秒,并行运行时间为 481 毫秒——合理的加速。

编辑:我继续思考这个问题,并在推特上得到了 RuSTLang 的建议,我应该考虑 rayon .我将代码转换为 rayon并通过以下代码获得了类似的加速。

let mut buffer2 = vec![vec![0i32; WIDTH]; HEIGHT];
buffer2
    .par_iter_mut()
    .enumerate()
    .map(|(y,e): (usize, &mut Vec<i32>)| {
        for x in 0..WIDTH {
            (*e)[x] = compute(x as i32,y as i32);
        }
    })
    .collect::<Vec<_>>();

关于rust - 如何让每个 CPU 核心可变访问 Vec 的一部分?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49354237/

相关文章:

rust - 实现 actix-web 的处理程序时未找到关联类型 `Context`

sorting - 如何在 Rust 中对向量进行排序?

generics - 重写一个函数以接受AsRef <Path>而不是&Path

rust - 为什么通过提取方法进行重构会触发借用检查器错误?

rust - 为什么将函数移至默认特征方法会导致借入错误?

rust - 使 String 类型与 ARM 兼容?

rust - 如何安全地创建 UnsafeCell<c_void>?

rust - Rust的确切自动引用规则是什么?

generics - 是否可以编写一个通用的rust函数来告诉您容器中是否有给定的枚举变量?

amazon-web-services - 如何使用适用于 DynamoDb 的 AWS Rust SDK 编写惯用的 Rust 错误处理?