python - 防止从空 FIFO 读取数据时发生阻塞

标签 python python-3.x subprocess

在 Python 3 Web 应用程序中,我需要使用一个命令行实用程序来处理图像,将其输出写入命名管道 (fifo),然后将该输出(管道的内容)解析为PIL/枕头图像。这是基本流程(工作代码很长并且没有错误!):

from os import mkfifo
from os import unlink
from PIL import Image
from subprocess import DEVNULL
from subprocess import PIPE
from subprocess import Popen

fifo_path = '/tmp/myfifo.bmp'
cmd = '/usr/bin/convert -resize 100 /path/to/some.tif ' + fifo_path
# make a named pipe
mkfifo(fifo_path)
# execute
proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)
# parse the image
pillow_image = Image.open(fifo_path)
# finish the process:
proc_exit = proc.wait()
# remove the pipe:
unlink(fifo_path)
# just for proof:
pillow_image.show()

(在上面的示例中,我已将实际需要使用的实用程序替换为 ImageMagick,只是因为您不太可能拥有它 - 它根本不会影响问题。)

这在大多数情况下都很有效,而且我可以处理大多数异常(为了清楚起见,在上面省略了),但有一种情况我无法弄清楚如何处理,即如果出现问题该怎么办shellout,导致空管,例如如果图像不存在或由于某种原因损坏,例如:

fifo_path = '/tmp/myfifo.bmp'
cmd = '/usr/bin/convert -resize 100 /path/to/some/bad_or_missing.tif ' + fifo_path
# make a named pipe
mkfifo(fifo_path)
# execute
proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)
# parse the image
pillow_image = Image.open(fifo_path) # STUCK
...

应用程序卡在这里,因为我无法到达 proc_exit = proc.wait() 我无法设置 timeout (例如 proc_exit = proc.wait(timeout=2)),这是我通常会做的。

我尝试将整个业务包装在上下文管理器中,类似于 this answer ,但是这个配方不是线程安全的,这是一个问题,而且我找不到一个线程或多处理解决方案,可以让我在加入线程或进程时访问 PIL/Pillow Image 实例(不是我的强项,但是像这样):

from multiprocessing import Process
from os import mkfifo
from os import unlink
from PIL import Image
from subprocess import DEVNULL
from subprocess import PIPE
from subprocess import Popen

def do_it(cmd, fifo_path):
    mkfifo(fifo_path)
    # I hear you like subprocesses with your subprocesses...
    sub_proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)
    pillow_image = Image.open(fifo_path)
    proc_exit = sub_proc.wait()
    unlink(fifo_path)

fifo_path = '/tmp/myfifo.bmp'
cmd = '/usr/bin/convert -resize 100 /path/to/some/bad_or_missing.tif ' + fifo_path
proc = Process(target=do_it, args=(cmd, fifo_path))
proc.daemon = True
proc.start()
proc.join(timeout=3) # I can set a timeout here
# Seems heavy anyway, and how do I get pillow_image back for further work?
pillow_image.show()

希望这些能够说明我的问题以及我所尝试的方法。提前致谢。

最佳答案

POSIX read(2) :

When attempting to read from an empty pipe or FIFO:

If no process has the pipe open for writing, read() shall return 0 to indicate end-of-file.

当且仅当命令终止而没有打开 Image.open(fifo_path) 进行写入且被阻止时,fifo_path 可能会卡住。

Normally, opening the FIFO blocks until the other end is opened also.

这是一个正常的序列:

  1. cmd 尝试打开 fifo_open 进行写入时发生阻塞
  2. 您的 Python 代码在尝试打开进行读取时发生阻塞
  3. 一旦 FIFO 被两个进程打开,数据流就开始。除了名称之外,FIFO 类似于管道——只有一个管道对象——内核在内部传递所有数据而不将其写入文件系统。 The pipe is not a seekable file and therefore Image.open() may read until EOF
  4. cmd 关闭管道的末端。您的代码收到 EOF,因为没有其他进程打开 FIFO 进行写入,并且返回 Image.open(fifo_path)

    不管 cmd 的管道末端是成功完成还是由于错误而关闭,无论 cmd 是否被突然终止,都不重要:只要它的末端是关闭的即可。

    你的进程是否调用proc.wait()并不重要。 proc.wait() 不会杀死 cmdproc.wait() 不会阻止管道另一端的打开或关闭。 proc.wait() 唯一要做的就是等待子进程死亡和/或返回已经死亡的子进程的退出状态。

这是死锁情况:

  1. 在调用 Image.open() 时,cmd 甚至不会尝试打开 fifo_open 进行写入,无论出于何种原因,例如,没有 /usr/bin/convert 、错误的命令行参数、错误/无输入等
  2. 您的 Python 代码在尝试打开进行读取时发生阻塞

fifo_open 未打开用于写入,因此 Image.open(fifo_open) 在尝试打开它进行读取时永远卡住。


您可以在后台线程中打开 FIFO 进行写入,并在父级打开 FIFO 进行读取时关闭它:

#!/usr/bin/env python3
import contextlib
import os
import subprocess
import sys
import textwrap
import threading

fifo_path = "fifo"
with contextlib.ExitStack() as stack:
    os.mkfifo(fifo_path)
    stack.callback(os.remove, fifo_path)
    child = stack.enter_context(
        subprocess.Popen([
            sys.executable, '-c', textwrap.dedent('''
            import random
            import sys
            import time
            if random.random() < 0.5: # 50%
                open(sys.argv[1], 'w').write("ok")
            else:
                sys.exit("fifo is not opened for writing in the child")
            '''), fifo_path
        ]))
    stack.callback(child.kill)
    opened = threading.Event()  # set when the FIFO is opened for reading
    threading.Thread(target=open_for_writing, args=[fifo_path, opened, child],
                     daemon=True).start()
    pipe = stack.enter_context(open(fifo_path))  # open for reading
    opened.set()  # the background thread may close its end of the pipe now
    print(pipe.read()) # read data from the child or return in 3 seconds
sys.exit(child.returncode)

在 EOF 时, child 被杀死。

其中 open_for_writing() 打开 FIFO,以解锁 open(fifo_path),进而启用关闭它。为了避免 pipe.read() 返回太快,它给子进程 3 秒的时间来打开 FIFO 进行写入:

def open_for_writing(path, opened, child):
    with open(path, 'w'):
        opened.wait()  # don't close until opened for reading in the main thread
        try:
            child.wait(timeout=3)  # the child has 3 seconds to open for writing
        except subprocess.TimeoutExpired:
            pass

如果您确定子进程要么尝试打开 FIFO 要么最终退出(或者您同意子进程运行时 Python 进程挂起,那么您可以放弃超时并使用 child.wait() 而不是 child.wait(timeout=3) 。这样更改后,不再有任意超时,并且代码可以在任意慢的系统上运行(无论出于何种原因)。

代码演示了为什么应该尽可能避免线程,或者为什么应该更喜欢已建立的模式(不太通用但保证正常工作),例如通过通信进行同步。

答案中的代码应该适用于各种情况,但各部分错综复杂。即使是很小的变化,其影响也可能不会明显,直到一个非常具体的案例成为现实。

关于python - 防止从空 FIFO 读取数据时发生阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40352825/

相关文章:

python - 当我在 Python 中刷新串行缓冲区时,它停止读取

python-3.x - 如何从远程python服务连接到CDH集群

python - 将 subprocess.call() 返回值存储在变量中

python - 在子进程之前写出

Python 正在处理数据?

python - Django:WebSocket 握手期间出错:意外的响应代码:500

python - python中的二维数组

python-3.x - 从字典列表中返回特定键中具有最高值的字典

python - 使用 pyppeteer 创建 for 循环的理想方法

用于运行 shell 命令的 Python 脚本