所以我有数据,这些数据在文本文件中。每行都是要执行的计算。该文件大约有1亿行。
首先,我将所有内容加载到ram中,然后有一个方法可以执行计算并给出以下结果:
def process(data_line):
#do computation
return result
然后我用2000行数据包这样称呼它,然后将结果保存到disk:POOL_SIZE = 15 #nbcore - 1
PACKET_SIZE = 2000
pool = Pool(processes=POOL_SIZE)
data_lines = util.load_data_lines(to_be_computed_filename)
number_of_packets = int(number_of_lines/ PACKET_SIZE)
for i in range(number_of_packets):
lines_packet = data_lines[:PACKET_SIZE]
data_lines = data_lines[PACKET_SIZE:]
results = pool.map(process, lines_packet)
save_computed_data_to_disk(to_be_computed_filename, results)
# process the last packet, which is smaller
results.extend(pool.map(process, data_lines))
save_computed_data_to_disk(to_be_computed_filename, results)
print("Done")
问题是,当我向磁盘写入数据时,我的CPU没有任何计算能力,并且具有8个内核。它正在查看任务管理器,似乎浪费了很多CPU时间。完成计算后,我必须写入磁盘,因为结果比输入大1000倍。
无论如何,我将不得不在某个时候写入磁盘。如果在这里没有浪费时间,那么以后会浪费掉。
在允许一个内核写入磁盘的同时仍与其他内核进行计算时,我该怎么做?切换到C吗?
以这种速度,我可以在75小时内处理1亿条线,但是我有120亿条线要处理,因此任何改进都是值得的。
时间示例:
Processing packet 2/15 953 of C:/processing/drop_zone\to_be_processed_txt_files\t_to_compute_303620.txt
Lauching task and waiting for it to finish...
Task completed, Continuing
Packet was processed in 11.534576654434204 seconds
We are currently going at a rate of 0.002306915330886841 sec/words
Wich is 433.47928145051293 words per seconds
Saving in temporary file
Printing writing 5000 computed line to disk took 0.04400920867919922 seconds
saving word to resume from : 06 20 25 00 00
Estimated time for processing the remaining packets is : 51:19:25
最佳答案
您说您有8个核心,但您有:
POOL_SIZE = 15 #nbcore - 1
假设您想让一个处理器空闲(大概是用于主进程?),为什么这个数字不为7?但是,为什么还要免费阅读处理器呢?您正在连续调用map
。当主进程正在等待这些调用返回时,它需要知道CPU。这就是为什么如果在实例化池时未指定池大小的原因,则默认为您拥有的CPU数量,而不是该数量减一。我将在下面对此进行更多说明。由于您有一个很大的内存列表,因此您有可能在循环中花费了很多时间,在每次循环迭代时都重写了该列表。相反,您可以只获取列表的一部分并将其作为可迭代参数传递给
map
:POOL_SIZE = 15 # ????
PACKET_SIZE = 2000
data_lines = util.load_data_lines(to_be_computed_filename)
number_of_packets, remainder = divmod(number_of_lines, PACKET_SIZE)
with Pool(processes=POOL_SIZE) as pool:
offset = 0
for i in range(number_of_packets):
results = pool.map(process, data_lines[offset:offset+PACKET_SIZE])
offset += PACKET_SIZE
save_computed_data_to_disk(to_be_computed_filename, results)
if remainder:
results = pool.map(process, data_lines[offset:offset+remainder])
save_computed_data_to_disk(to_be_computed_filename, results)
print("Done")
在每次调用map
之间,主要过程是将结果写到to_be_computed_filename
中。同时,池中的每个进程都处于空闲状态。 这应该提供给另一个进程(实际上是在主进程下运行的线程):import multiprocessing
import queue
import threading
POOL_SIZE = 15 # ????
PACKET_SIZE = 2000
data_lines = util.load_data_lines(to_be_computed_filename)
number_of_packets, remainder = divmod(number_of_lines, PACKET_SIZE)
def save_data(q):
while True:
results = q.get()
if results is None:
return # signal to terminate
save_computed_data_to_disk(to_be_computed_filename, results)
q = queue.Queue()
t = threading.Thread(target=save_data, args=(q,))
t.start()
with Pool(processes=POOL_SIZE) as pool:
offset = 0
for i in range(number_of_packets):
results = pool.map(process, data_lines[offset:offset+PACKET_SIZE])
offset += PACKET_SIZE
q.put(results)
if remainder:
results = pool.map(process, data_lines[offset:offset+remainder])
q.put(results)
q.put(None)
t.join() # wait for thread to terminate
print("Done")
我选择在主进程的线程中运行save_data
。这也可能是另一个过程,在这种情况下,您将需要使用multiprocessing.Queue
实例。但是我认为主进程线程主要是在等待map
完成,因此不会争夺GIL。现在,如果您不让处理器腾出时间来处理线程作业save_data
,那么只有在创建所有结果之后,它才能最终完成大部分的保存工作。您将需要对此进行一些试验。理想情况下,我还将修改输入文件的读取,以便不必先将其全部读取到内存中,而是一行一行地读取它,从而产生2000行块并将其作为作业提交给
map
处理:import multiprocessing
import queue
import threading
POOL_SIZE = 15 # ????
PACKET_SIZE = 2000
def save_data(q):
while True:
results = q.get()
if results is None:
return # signal to terminate
save_computed_data_to_disk(to_be_computed_filename, results)
def read_data():
"""
yield lists of PACKET_SIZE
"""
lines = []
with open(some_file, 'r') as f:
for line in iter(f.readline(), ''):
lines.append(line)
if len(lines) == PACKET_SIZE:
yield lines
lines = []
if lines:
yield lines
q = queue.Queue()
t = threading.Thread(target=save_data, args=(q,))
t.start()
with Pool(processes=POOL_SIZE) as pool:
for l in read_data():
results = pool.map(process, l)
q.put(results)
q.put(None)
t.join() # wait for thread to terminate
print("Done")
关于python - 这是我从python multiprocess可以得到的最多的东西吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65447699/