这里又是一个困惑的并行编码器!
我们的内部 Hive 数据库有一个 API 层,我们需要使用它来访问数据。有 300 秒的查询超时限制,因此我想使用多处理并行执行多个查询:
from multiprocessing import Pool
import pandas as pd
import time
from hive2pandas_anxpy import Hive2Pandas # custom module for querying our Hive db and converting the results to a Pandas dataframe
import datetime
def run_query(hour):
start_time = time.time()
start_datetime = datetime.datetime.now()
query = """SELECT id, revenue from table where date='2014-05-20 %s' limit 50""" % hour
h2p = Hive2Pandas(query, 'username')
h2p.run()
elapsed_time = int(time.time() - start_time)
end_datetime = datetime.datetime.now()
return {'query':query, 'start_time':start_datetime, 'end_time':end_datetime, 'elapsed_time':elapsed_time, 'data':h2p.data_df}
if __name__ == '__main__':
start_time = time.time()
pool = Pool(4)
hours = ['17','18','19']
results = pool.map_async(run_query, hours)
pool.close()
pool.join()
print int(time.time() - start_time)
我遇到的问题是其中一个查询始终不返回数据,但是当我以通常的方式运行相同的查询时,它会返回数据。由于我是多重处理的新手,我想知道我上面的使用方式是否存在任何明显的问题?
最佳答案
我认为您遇到的问题是结果对象在您想要使用它时尚未准备好。另外,如果您有已知的超时时间,我建议您在代码中使用它来发挥您的优势。
此代码显示了如何在 300 秒后强制超时(如果届时尚未收集到所有结果)的示例。
if __name__ == '__main__':
start_time = time.time()
hours = ['17','18','19']
with Pool(processes=4) as pool:
results = pool.map_async(run_query, hours)
print(results.get(timeout=300))
print int(time.time() - start_time)
否则,您仍应使用 results.get()
返回数据,或为 map_async
指定回调函数。
关于python - 多处理池返回错误结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23785145/