python - 在Python中使用多处理读取文件时的同步

标签 python file-io synchronization multiprocessing python-multiprocessing

我有一个 python 函数,可以从大文件中读取随机片段并对其进行一些处理。我希望处理发生在多个进程中,因此要利用多重处理。我在父进程中打开文件(以二进制模式)并将文件描述符传递给每个子进程,然后使用 multiprocessing.Lock() 同步对文件的访问。对于单个工作线程,事情会按预期工作,但是对于更多工作线程,即使有锁,文件读取也会随机返回错误数据(通常来自文件的一个部分和文件的另一部分)。此外,文件中的位置(由 file.tell() 返回)通常会变得困惑。这一切都表明访问描述符的基本竞争条件,但我的理解是 multiprocessing.Lock() 应该防止对其进行并发访问。 file.seek() 和/或 file.read() 是否具有某种不包含在锁定/解锁屏障内的异步操作?这是怎么回事?

一个简单的解决方法是让每个进程单独打开文件并获取自己的文件描述符(我已经确认这确实有效),但我想了解我缺少什么。以文本模式打开文件也可以防止问题发生,但不适用于我的用例,也不能解释二进制情况下发生的情况。

我已经在许多 Linux 系统和 OS X 以及各种本地和远程文件系统上运行了以下重现器。我总是遇到一些错误的文件位置和至少一些校验和错误。我知道读取不能保证读取请求的全部数据量,但我已经确认这不是这里发生的情况,并省略了该代码以保持简洁。

import argparse
import multiprocessing
import random
import string

def worker(worker, args):
    rng = random.Random(1234 + worker)
    for i in range(args.count):
        block = rng.randrange(args.blockcount)
        start = block * args.blocksize
        with args.lock:
            args.fd.seek(start)
            data = args.fd.read(args.blocksize)
            pos = args.fd.tell()
            if pos != start + args.blocksize:
                print(i, "bad file position", start, start + args.blocksize, pos)
            cksm = sum(data)
            if cksm != args.cksms[block]:
                print(i, "bad checksum", cksm, args.cksms[block])

args = argparse.Namespace()
args.file = '/tmp/text'
args.count = 1000
args.blocksize = 1000
args.blockcount = args.count
args.filesize = args.blocksize * args.blockcount
args.num_workers = 4

args.cksms = multiprocessing.Array('i', [0] * args.blockcount)
with open(args.file, 'w') as f:
    for i in range(args.blockcount):
        data = ''.join(random.choice(string.ascii_lowercase) for x in range(args.blocksize))
        args.cksms[i] = sum(data.encode())
        f.write(data)
args.fd = open(args.file, 'rb')  
args.lock = multiprocessing.Lock()

procs = []
for i in range(args.num_workers):
    p = multiprocessing.Process(target=worker, args=(i, args))
    procs.append(p)
    p.start()

示例输出:

$ python test.py
158 bad file position 969000 970000 741000
223 bad file position 908000 909000 13000
232 bad file position 679000 680000 960000
263 bad file position 959000 960000 205000
390 bad file position 771000 772000 36000
410 bad file position 148000 149000 42000
441 bad file position 677000 678000 21000
459 bad file position 143000 144000 636000
505 bad file position 579000 580000 731000
505 bad checksum 109372 109889
532 bad file position 962000 963000 243000
494 bad file position 418000 419000 2000
569 bad file position 266000 267000 991000
752 bad file position 732000 733000 264000
840 bad file position 801000 802000 933000
799 bad file position 332000 333000 989000
866 bad file position 150000 151000 248000
866 bad checksum 109116 109375
887 bad file position 39000 40000 974000
937 bad file position 18000 19000 938000
969 bad file position 20000 21000 24000
953 bad file position 542000 543000 767000
977 bad file position 694000 695000 782000

最佳答案

这似乎是由缓冲引起的:使用 open(args.file, 'rb', buffering=0) 我无法再重现。

https://docs.python.org/3/library/functions.html#open

buffering is an optional integer used to set the buffering policy. Pass 0 to switch buffering off [...] When no buffering argument is given, the default buffering policy works as follows: [...] Binary files are buffered in fixed-size chunks; the size of the buffer [...] will typically be 4096 or 8192 bytes long. [...]

关于python - 在Python中使用多处理读取文件时的同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57438262/

相关文章:

c - C 中的文件 I/O 管理

c# - SynchronizationContext.CreateCopy 的目的

python - 如何在 sqlalchemy 中 "inspect"自定义类型

c++ - 读入一个文件并将其分成 100 个较小的文件非常慢

python - 向 Spark DataFrame 添加一个空列

c++ - 复制另一个进程正在使用的文件

c# - Monitor.Wait 和 "exitContext"参数

database - 使用 DTO 模式同步两个模式?

python - 以 "w"模式打开文件 : IOError: [Errno 2] No such file or directory

python - 在 python 中为 float + obj 覆盖 + 运算符