parallel-processing - Julia:分区迭代器上的并行 for 循环

标签 parallel-processing julia

所以我试图遍历某些东西的分区列表,比如 1:n对于一些 n在 13 到 21 之间。理想情况下,我想要运行的代码如下所示:

valid_num = @parallel (+) for p in partitions(1:n)
  int(is_valid(p))
end

println(valid_num)

这将使用 @parallel for映射减少我的问题。例如,将此与 Julia 文档中的示例进行比较:
nheads = @parallel (+) for i=1:200000000
  Int(rand(Bool))
end

但是,如果我尝试调整循环,则会收到以下错误:
ERROR: `getindex` has no method matching getindex(::SetPartitions{UnitRange{Int64}}, ::Int64)
 in anonymous at no file:1433
 in anonymous at multi.jl:1279
 in run_work_thunk at multi.jl:621
 in run_work_thunk at multi.jl:630
 in anonymous at task.jl:6

我认为这是因为我试图迭代不是 1:n 形式的东西(编辑:我认为这是因为你不能调用 p[3] 如果 p=partitions(1:n) )。

我试过使用 pmap要解决这个问题,但是因为分区的数量可以变得非常大,非常快(1:13 有超过 250 万个分区,当我到达 1:21 时,事情会变得很大),构造这么大的数组就变成了一个问题。我让它跑了一夜,它仍然没有完成。

有人对我如何在 Julia 中有效地做到这一点有任何建议吗?我可以使用约 30 核计算机,而且我的任务似乎很容易并行化,所以如果有人知道在 Julia 中执行此操作的好方法,我将不胜感激。

非常感谢!

最佳答案

下面的代码给出了 511,即一组 10 个中大小为 2 的分区数。

using Iterators
s = [1,2,3,4,5,6,7,8,9,10]
is_valid(p) = length(p)==2
valid_num = @parallel (+) for i = 1:30
  sum(map(is_valid, takenth(chain(1:29,drop(partitions(s), i-1)), 30)))
end

此解决方案结合了taketh、drop 和chain 迭代器,以获得与下面PREVIOUS ANSWER 下的take_every 迭代器相同的效果。请注意,在此解决方案中,每个进程都必须计算每个分区。但是,因为每个进程对 drop 使用不同的参数。 ,没有两个进程会在同一个分区上调用 is_valid。

除非您想进行大量数学运算来弄清楚如何实际跳过分区,否则无法避免在至少一个进程上按顺序计算分区。我认为西蒙的回答是在一个进程上执行此操作并分配分区。我的要求每个工作进程自己计算分区,这意味着计算是重复的。但是,它是并行复制的,这(如果您实际上有 30 个处理器)不会花费您的时间。

以下是有关如何实际计算分区上的迭代器的资源:http://www.informatik.uni-ulm.de/ni/Lehre/WS03/DMM/Software/partitions.pdf .

以前的答案(比必要的更复杂)

我在写我的时候注意到了西蒙的回答。我们的解决方案似乎与我相似,除了我使用迭代器来避免将分区存储在内存中。我不确定哪种尺寸设置实际上会更快,但我认为两种选择都很好。假设计算 is_valid 比计算分区本身花费的时间要长得多,您可以执行以下操作:
s = [1,2,3,4]
is_valid(p) = length(p)==2
valid_num = @parallel (+) for i = 1:30
  foldl((x,y)->(x + int(is_valid(y))), 0, take_every(partitions(s), i-1, 30))
end

这给了我 7,即一组 4 的大小为 2 的分区数。take_every 函数返回一个迭代器,该迭代器返回从第 i 个开始的每 30 个分区。这是代码:
import Base: start, done, next
immutable TakeEvery{Itr}
  itr::Itr
  start::Any
  value::Any
  flag::Bool
  skip::Int64
end
function take_every(itr, offset, skip)
  value, state = Nothing, start(itr)
  for i = 1:(offset+1)
    if done(itr, state)
      return TakeEvery(itr, state, value, false, skip)
    end
    value, state = next(itr, state)
  end
  if done(itr, state)
    TakeEvery(itr, state, value, true, skip)
  else
    TakeEvery(itr, state, value, false, skip)
  end
end
function start{Itr}(itr::TakeEvery{Itr})
  itr.value, itr.start, itr.flag
end
function next{Itr}(itr::TakeEvery{Itr}, state)
  value, state_, flag = state
  for i=1:itr.skip
    if done(itr.itr, state_)
      return state[1], (value, state_, false)
    end
    value, state_ = next(itr.itr, state_)
  end
  if done(itr.itr, state_)
    state[1], (value, state_, !flag)
  else
    state[1], (value, state_, false)
  end
end
function done{Itr}(itr::TakeEvery{Itr}, state)
  done(itr.itr, state[2]) && !state[3]
end

关于parallel-processing - Julia:分区迭代器上的并行 for 循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31442027/

相关文章:

dataframe - Julia:将具有多个字符串列的 Dataframe 转换为 float 组

julia - 如何在没有溢出的情况下在 Julia 中保存 10^19

签名中带有数组类型的 ccall 从 Julia 调用 C 中的结构

python - 使用 MPI 提高性能

c++ - 在 C++ 中写入相同值的竞争条件?

string - 如何将 unicode 字符串附加到 Julia 中的字符串列表?

julia - 可用作宏中运算符的 ASCII 字符序列

c - 创建鬼区 MPI 的正确方法 [光晕]

java - Java 8 中意外的并行流性能

.net - 线程与并行处理