python - mpi4py:动态数据处理

标签 python dynamic mpi mpi4py

我有一个包含股票代码的向量,例如 tickers = ['AAPL','XOM','GOOG'],在我的“传统”python 程序中,我将遍历这个 tickers 向量,选择一个股票代码字符串,如 AAPL,导入一个包含 AAPL 股票返回的 csv 文件,将返回用作通用函数的输入,然后最后生成一个 csv 文件作为输出。我有超过 4000 个代码,应用到每个代码的函数需要时间来处理。我可以使用 mpi4py 包访问计算机集群,每个作业可以访问大约 100 个处理器。我很理解(并且能够实现)这个 mpi example在 python 中:

from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
    data = [i for i in range(8)]
# dividing data into chunks
    chunks = [[] for _ in range(size)]
    for i, chunk in enumerate(data):
        chunks[i % size].append(chunk)
else:
    data = None
    chunks = None
data = comm.scatter(chunks, root=0)
print str(rank) + ': ' + str(data)

[cha@cluster] ~/utils> mpirun -np 3 ./mpi.py 
2: [2, 5]
0: [0, 3, 6]
1: [1, 4, 7]

所以在这个例子中,我们有一个大小为 8 的数据向量,并为每个处理器(总共 3 个)分配了相同数量的数据元素。我怎样才能使用上面类似的例子并为每个处理器分配一个股票代码并应用需要为每个代码运行的功能?我如何告诉 python 一旦处理器空闲,返回 tickers 向量并处理尚未处理的 ticker

最佳答案

还有另一种思考方式。您有 100 个处理器处理 4000 个数据 block 。一种看待这个问题的方法是每个处理器都获得一个数据 block 来进行操作。平均分配,每个处理器将获得 40 个代码来处理。处理器 1 将获得 0-39,处理器 2 将获得 40-79,等等。

这样一想,您就不必担心处理器完成任务后会发生什么。只是有一个循环:

block_size = len(tickers) / size # this will be 40 in your example
for i in range(block_size):
    ticker = tickers[rank * block_size + i]
    process(ticker)

def process(ticker):
    # load data
    # process data
    # output data

这有意义吗?

[编辑]
如果您想阅读更多内容,这实际上只是 row-major order 的变体。索引,一种访问存储在单维内存中的多维数据的常用方法。

关于python - mpi4py:动态数据处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24224414/

相关文章:

python - 具有类属性的列表理解

python - 大型 django rest 框架应用程序的目录结构

C++ Local var in dynamic, on stack?

javascript - 动态创建和删除一个 div

c - OpenMPI + Fortran + C 的基本测试根据奇怪的事情抛出不同的错误

python - 如何根据模型要求 reshape 我的数据?

python - 根据第一列中的日期拆分大型 csv 文件 Python 3.4.3

html - 在输入中添加新的输入字段

c - 使用带有 MPI 的 master-worker 时,哪个大小的 block 将产生最佳性能?

c++ - MPI 中发送消息数量未知时,如何接收?