python - 这是我从python multiprocess可以得到的最多的东西吗?

标签 python python-3.x multithreading

所以我有数据,这些数据在文本文件中。每行都是要执行的计算。该文件大约有1亿行。
首先,我将所有内容加载到ram中,然后有一个方法可以执行计算并给出以下结果:

def process(data_line):
    #do computation
    return result
然后我用2000行数据包这样称呼它,然后将结果保存到disk:
POOL_SIZE = 15 #nbcore - 1
PACKET_SIZE = 2000
pool = Pool(processes=POOL_SIZE)

data_lines = util.load_data_lines(to_be_computed_filename)
number_of_packets = int(number_of_lines/ PACKET_SIZE)
for i in range(number_of_packets):
    lines_packet = data_lines[:PACKET_SIZE]
    data_lines = data_lines[PACKET_SIZE:]
    results = pool.map(process, lines_packet)
    save_computed_data_to_disk(to_be_computed_filename, results)

# process the last packet, which is smaller
results.extend(pool.map(process, data_lines))
save_computed_data_to_disk(to_be_computed_filename, results)
print("Done")
问题是,当我向磁盘写入数据时,我的CPU没有任何计算能力,并且具有8个内核。它正在查看任务管理器,似乎浪费了很多CPU时间。
完成计算后,我必须写入磁盘,因为结果比输入大1000倍。
无论如何,我将不得不在某个时候写入磁盘。如果在这里没有浪费时间,那么以后会浪费掉。
CPUTime
在允许一个内核写入磁盘的同时仍与其他内核进行计算时,我该怎么做?切换到C吗?
以这种速度,我可以在75小时内处理1亿条线,但是我有120亿条线要处理,因此任何改进都是值得的。
时间示例:
Processing packet 2/15 953 of C:/processing/drop_zone\to_be_processed_txt_files\t_to_compute_303620.txt
Lauching task and waiting for it to finish...
Task completed, Continuing
Packet was processed in 11.534576654434204 seconds
We are currently going at a rate of 0.002306915330886841 sec/words
Wich is 433.47928145051293 words per seconds
Saving in temporary file
Printing writing 5000 computed line to disk took 0.04400920867919922 seconds
saving word to resume from : 06 20 25 00 00
Estimated time for processing the remaining packets is : 51:19:25

最佳答案

您说您有8个核心,但您有:

POOL_SIZE = 15 #nbcore - 1
假设您想让一个处理器空闲(大概是用于主进程?),为什么这个数字不为7?但是,为什么还要免费阅读处理器呢?您正在连续调用map。当主进程正在等待这些调用返回时,它需要知道CPU。这就是为什么如果在实例化池时未指定池大小的原因,则默认为您拥有的CPU数量,而不是该数量减一。我将在下面对此进行更多说明。
由于您有一个很大的内存列表,因此您有可能在循环中花费了很多时间,在每次循环迭代时都重写了该列表。相反,您可以只获取列表的一部分并将其作为可迭代参数传递给map:
POOL_SIZE = 15 # ????
PACKET_SIZE = 2000
data_lines = util.load_data_lines(to_be_computed_filename)
number_of_packets, remainder = divmod(number_of_lines, PACKET_SIZE)
with Pool(processes=POOL_SIZE) as pool:
    offset = 0
    for i in range(number_of_packets):
        results = pool.map(process, data_lines[offset:offset+PACKET_SIZE])
        offset += PACKET_SIZE
        save_computed_data_to_disk(to_be_computed_filename, results)
    if remainder:
        results = pool.map(process, data_lines[offset:offset+remainder])
        save_computed_data_to_disk(to_be_computed_filename, results)
print("Done")
在每次调用map之间,主要过程是将结果写到to_be_computed_filename中。同时,池中的每个进程都处于空闲状态。 这应该提供给另一个进程(实际上是在主进程下运行的线程):
import multiprocessing
import queue
import threading

POOL_SIZE = 15 # ????
PACKET_SIZE = 2000
data_lines = util.load_data_lines(to_be_computed_filename)
number_of_packets, remainder = divmod(number_of_lines, PACKET_SIZE)

def save_data(q):
    while True:
        results = q.get()
        if results is None:
            return # signal to terminate
        save_computed_data_to_disk(to_be_computed_filename, results)

q = queue.Queue()
t = threading.Thread(target=save_data, args=(q,))
t.start()

with Pool(processes=POOL_SIZE) as pool:
    offset = 0
    for i in range(number_of_packets):
        results = pool.map(process, data_lines[offset:offset+PACKET_SIZE])
        offset += PACKET_SIZE
        q.put(results)
    if remainder:
        results = pool.map(process, data_lines[offset:offset+remainder])
        q.put(results)
q.put(None)
t.join() # wait for thread to terminate
print("Done")
我选择在主进程的线程中运行save_data。这也可能是另一个过程,在这种情况下,您将需要使用multiprocessing.Queue实例。但是我认为主进程线程主要是在等待map完成,因此不会争夺GIL。现在,如果您不让处理器腾出时间来处理线程作业save_data,那么只有在创建所有结果之后,它才能最终完成大部分的保存工作。您将需要对此进行一些试验。
理想情况下,我还将修改输入文件的读取,以便不必先将其全部读取到内存中,而是一行一行地读取它,从而产生2000行块并将其作为作业提交给map处理:
import multiprocessing
import queue
import threading

POOL_SIZE = 15 # ????
PACKET_SIZE = 2000


def save_data(q):
    while True:
        results = q.get()
        if results is None:
            return # signal to terminate
        save_computed_data_to_disk(to_be_computed_filename, results)


def read_data():
    """
    yield lists of PACKET_SIZE
    """
    lines = []
    with open(some_file, 'r') as f:
        for line in iter(f.readline(), ''):
            lines.append(line)
            if len(lines) == PACKET_SIZE:
                yield lines
                lines = []
        if lines:
            yield lines

q = queue.Queue()
t = threading.Thread(target=save_data, args=(q,))
t.start()

with Pool(processes=POOL_SIZE) as pool:
    for l in read_data():
        results = pool.map(process, l)
        q.put(results)
q.put(None)
t.join() # wait for thread to terminate
print("Done")

关于python - 这是我从python multiprocess可以得到的最多的东西吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65447699/

相关文章:

Python 任务队列替代方案和框架

python - 分隔行 python pandas

python - 我的 pip 在 Windows 上坏了,我该如何修复?

python - 用 Armadillo 函数替换 `sparse.eye`

python-3.x - 使用 pyinstaller 将 Exe 转换为 python?

java - 为什么 Object 成员变量在 Java 中不能既是 final 又是 volatile?

c++ - list::empty() 多线程行为?

android - 优先考虑新 Activity

python - 在 Python 中对句子进行分词并重新连接结果

python - 如何使用非包含范围生成类似示例中的列表? (Python)