用于大数据处理的python

标签 python multithreading algorithm queue multiprocessing

我对 python 比较陌生,并且能够根据在表单上回答的类似问题来回答我的大部分问题,但我遇到了困难,需要一些帮助。

我有一个生成字符串输出的简单嵌套 for 循环脚本。接下来我需要做的是根据字符串也将匹配的数值对每个分组进行模拟。

我的真正问题是如何以最好的方式解决这个问题?我不确定多线程是否会工作,因为字符串已生成,然后需要进行模拟,一次一组。我正在阅读有关队列的信息,但不确定是否可以将它们传递到队列中进行存储,然后按照它们进入队列的相同顺序进行模拟。

无论我进行了何种研究,我都乐于接受任何人就此事提出的任何建议。

谢谢!

编辑:我不是在寻找关于如何进行模拟的答案,而是在寻找一种在计算模拟时存储组合的方法

例子

X = ["a","b"]
Y = ["c","d","e"]
Z= ["f","g"]

for A in itertools.combinations(X,1):
    for B in itertools.combinations(Y,2):
        for C in itertools.combinations(Z, 2):

        D = A + B + C
        print(D)

最佳答案

正如评论中所暗示的那样,multiprocessing模块是你要找的。由于全局解释器锁 (GIL),线程将无法帮助您,它一次只能执行一个 Python 线程。特别是,我会看看 multiprocessing pools .这些对象为您提供了一个接口(interface),让子流程池与主流程并行为您工作,您可以稍后返回并检查结果。

您的示例片段可能如下所示:

import multiprocessing

X = ["a","b"]
Y = ["c","d","e"]
Z= ["f","g"]

pool = multiprocessing.pool() # by default, this will create a number of workers equal to
                 # the number of CPU cores you have available
combination_list = [] # create a list to store the combinations

for A in itertools.combinations(X,1):
    for B in itertools.combinations(Y,2):
        for C in itertools.combinations(Z, 2):

        D = A + B + C
        combination_list.append(D) # append this combination to the list

results = pool.map(simulation_function, combination_list)
# simulation_function is the function you're using to actually run your
# simulation - assuming it only takes one parameter: the combination

pool.map 的调用是阻塞的——这意味着一旦调用它,主进程中的执行将停止,直到所有模拟完成,但它是并行运行它们的。当它们完成时,无论您的模拟函数返回什么,都将在 results 中可用,其顺序与输入参数在 combination_list 中的顺序相同。

如果你不想等他们,你也可以使用apply_async在你的池中并存储结果以供以后查看:

import multiprocessing

X = ["a","b"]
Y = ["c","d","e"]
Z= ["f","g"]

pool = multiprocessing.pool()
result_list = [] # create a list to store the simulation results

for A in itertools.combinations(X,1):
    for B in itertools.combinations(Y,2):
        for C in itertools.combinations(Z, 2):

        D = A + B + C
        result_list.append(pool.apply_async(
                simulation_function,
                args=(D,))) # note the extra comma - args must be a tuple

# do other stuff
# now iterate over result_list to check the results when they're ready

如果使用这种结构,result_list 将充满multiprocessing.AsyncResult objects ,它允许您使用 result.ready() 检查它们是否准备就绪,如果准备就绪,则使用 result.get() 检索结果。这种方法将导致在计算组合时立即开始模拟,而不是等到所有组合都计算完毕才开始处理它们。缺点是管理和检索结果有点复杂。例如,您必须确保结果已准备好或准备好捕获异常,您需要准备好捕获辅助函数中可能引发的异常,等等。这些注意事项在文档中有很好的解释。

如果计算组合实际上并不需要很长时间,并且您不介意在它们全部准备就绪之前停止主进程,我建议使用 pool.map 方法。

关于用于大数据处理的python,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33471193/

相关文章:

python - 是否可以执行 bash 脚本来下载 `pip install my-package` 上的非 python 依赖项

python - Tensorflow Eager 和 Tensorboard 图?

Python Selenium Firefox - 不安全连接错误警报

java - java.util.concurrent.CountDownLatch 的实现是否可以处理向上递增的初始计数?

python - Django 中 clean 和 full_clean 函数的主要区别是什么?

c# - 通用 BeginInvoke 方案,确保在同一线程上下文中进行函数调用

multithreading - Libevent 多线程支持

algorithm - 是否可以使用 Grover 算法进行真正的数据搜索?

c# - 帕斯卡三角函数

algorithm - 根据 map 中的第二个值进行搜索