python - 使用 asyncio.Queue 将数据从子流程安全地传递到异步任务

标签 python subprocess python-asyncio python-multithreading inotifywait

我的 Amazon Linux EC2 实例中有一个目录结构。我希望有一个 Python 脚本异步监视此目录(以及所有子目录)的文件创建情况。

我决定在子进程中运行 inotifywait 并将输出传递到异步任务中进行处理。我运行子进程并监视其自己线程中的输出,并使用 put_nowait() 将标准输出传递到 asyncio.Queue() 中,该输出由正在运行的 asyncio 任务监视在主线程上。

import asyncio
import subprocess
import threading

def watch_dir(dir_to_watch: str, output_queue: asyncio.Queue):
    inotify_cmd = f'sudo inotifywait -e create -m -r {dir_to_watch}'
    proc = subprocess.Popen(inotify_cmd,
                            stdout=subprocess.PIPE,
                            shell=True)

    while True:
        line = proc.stdout.readline().rstrip()
        if not line:
            break
        output_queue.put_nowait(line)


async def process_lines(input_queue: asyncio.Queue):
    while True:
        line = await input_queue.get()
        # do stuff with line

if __name__ == '__main__':
    q = asyncio.Queue()
    dir_watch_thread = threading.Thread(target=watch_dir, args=(_dir_to_watch, q))
    dir_watch_thread.start()
    asyncio.run(process_lines(q))

有没有更好、性能更高、资源效率更高的方法来做到这一点?这是否是 asyncio.Queue() 的安全用法?我读过有关 janus 的内容,它将自己描述为通过同步和异步上下文之间的队列传递数据的安全方法。我是否需要使用这样的数据结构(以及为什么)?如果不需要,我不想包含额外的依赖项。

最佳答案

Is this even a safe usage of asyncio.Queue?

不,因为asyncio.Queuenot thread-safe 。您甚至可能会观察到这一点,其症状是从队列中读取的协程不会立即注意到有项目进入,而是仅在事件循环上发生不相关的 IO 或超时事件时才会唤醒。

解决此问题的一种方法是使用 call_soon_threadsafe :

# this requires you to pass "loop" as well
loop.call_soon_threadsafe(output_queue.put_nowait, line)

更好的方法是使用 asyncio 自己的子进程处理,它允许您完全避免线程。例如(未经测试):

async def watch_dir(dir_to_watch, output_queue):
    proc = await asyncio.create_subprocess_exec(
        'sudo', 'inotifywait', '-e', 'create', '-m',
        '-r', dir_to_watch, stdout=subprocess.PIPE)
    while True:
        line = await proc.stdout.readline()
        if not line:
            break
        await output_queue.put(line.rstrip())

async def process_lines(dir_to_watch):
    queue = asyncio.Queue()
    # run watch_dir() in the "background"
    asyncio.create_task(watch_dir(dir_to_watch), queue)
    while True:
        line = await queue.get()
        print(line)  # ...

if __name__ == '__main__':
    asyncio.run(process_lines(_watch_dir))

在上面的代码中,我用显式参数替换了 shell=True 的使用,以避免 shell 注入(inject)的可能性,尤其是与 sudo 相关的情况。

Is there a better, more performant/resource efficient way to do this?

在简单的单生产者单消费者设置中,您可以取消队列并仅使用生成器:

async def watch_dir(dir_to_watch):
    proc = await asyncio.create_subprocess_exec(
        'sudo', 'inotifywait', '-e', 'create', '-m',
        '-r', dir_to_watch, stdout=subprocess.PIPE)
    while True:
        line = await proc.stdout.readline()
        if not line:
            break
        yield line.rstrip()

async def process_lines(dir_to_watch):
    async for line in watch_dir(dir_to_watch):
        print(line)  # ...

关于python - 使用 asyncio.Queue 将数据从子流程安全地传递到异步任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57962911/

相关文章:

python - 如何将数组中 x 处的值与 x +1 处的值进行比较?

linux - 套接字读取与文件读取有何不同?

python-3.x - 在 Python 中关闭异步事件循环会导致异常结束

python - 如何替换关闭的事件循环?

python - 如何使用自定义 udf 实现对列进行舍入

python - python中的堆叠圆形条形图

python - DecoderRNN 的输出包含额外维度 (Pytorch)

python - 在不打开系统控制台的情况下调用子进程

python - 子进程错误文件

Python 子进程 AttributeError