python - 多处理池返回错误结果

标签 python multiprocessing

这里又是一个困惑的并行编码器!

我们的内部 Hive 数据库有一个 API 层,我们需要使用它来访问数据。有 300 秒的查询超时限制,因此我想使用多处理并行执行多个查询:

from multiprocessing import Pool
import pandas as pd
import time
from hive2pandas_anxpy import Hive2Pandas   # custom module for querying our Hive db and converting the results to a Pandas dataframe
import datetime

def run_query(hour):
    start_time = time.time()
    start_datetime = datetime.datetime.now()

    query = """SELECT id, revenue from table where date='2014-05-20 %s' limit 50""" % hour
    h2p = Hive2Pandas(query, 'username')
    h2p.run()

    elapsed_time = int(time.time() - start_time)
    end_datetime = datetime.datetime.now()

    return {'query':query, 'start_time':start_datetime, 'end_time':end_datetime, 'elapsed_time':elapsed_time, 'data':h2p.data_df}

if __name__ == '__main__':

    start_time = time.time()
    pool = Pool(4)
    hours = ['17','18','19']
    results = pool.map_async(run_query, hours)
    pool.close()
    pool.join()
    print int(time.time() - start_time)

我遇到的问题是其中一个查询始终不返回数据,但是当我以通常的方式运行相同的查询时,它会返回数据。由于我是多重处理的新手,我想知道我上面的使用方式是否存在任何明显的问题?

最佳答案

我认为您遇到的问题是结果对象在您想要使用它时尚未准备好。另外,如果您有已知的超时时间,我建议您在代码中使用它来发挥您的优势。

此代码显示了如何在 300 秒后强制超时(如果届时尚未收集到所有结果)的示例。

if __name__ == '__main__':
    start_time = time.time()
    hours = ['17','18','19']

    with Pool(processes=4) as pool: 
        results = pool.map_async(run_query, hours)
        print(results.get(timeout=300))

    print int(time.time() - start_time)

否则,您仍应使用 results.get() 返回数据,或为 map_async 指定回调函数。

关于python - 多处理池返回错误结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23785145/

相关文章:

python - 寻找一个工作的 linux 命令行工具从 rapidshare 下载

python - 如何使用 StdLib 和 Python 3 在一个范围内并行化迭代?

python - 在多处理中使用共享列表的正确方法是什么

python - 从类定义中的列表理解访问类变量

python - 顺序排列

python 多处理: setting class attribute value

python - Flask Web 服务器的多处理与多线程

java - 单例 Bean 如何服务并发请求?

python - 有没有办法检查两个列表是否包含 Python 中的任何相同值?

python - 具有 unicode 字符的 SPARQL 正则表达式过滤器