python - 从多个 shell 子进程中非阻塞实时读取 (Python)

标签 python multithreading asynchronous ffmpeg subprocess

我正在使用 ffmpeg 构建实时多视频流监控和 subrocess 。 我目前有以下代码,灵感来自 "Async and await with subprocesses" post .

问题是在一段时间后输出停止打印并且进程进入僵尸模式。我猜测这个问题与PIPE的过载或者死锁有关。需要帮助。

"""Async and await example using subprocesses

Note:
    Requires Python 3.6.
"""

import os
import sys
import time
import platform
import asyncio

async def run_command_shell(command):
    """Run command in subprocess (shell)

    Note:
        This can be used if you wish to execute e.g. "copy"
        on Windows, which can only be executed in the shell.
    """
    # Create subprocess
    process = await asyncio.create_subprocess_shell(
        command,
        stderr=asyncio.subprocess.PIPE)

    # Status
    print('Started:', command, '(pid = ' + str(process.pid) + ')')

    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()

    # Progress
    if process.returncode == 0:
        print('Done:', command, '(pid = ' + str(process.pid) + ')')
    else:
        print('Failed:', command, '(pid = ' + str(process.pid) + ')')

    # Result
    result = stderr.decode().strip()

    # Real time print
    print(result)

    # Return stdout
    return result


def make_chunks(l, n):
    """Yield successive n-sized chunks from l.

    Note:
        Taken from https://stackoverflow.com/a/312464
    """
    if sys.version_info.major == 2:
        for i in xrange(0, len(l), n):
            yield l[i:i + n]
    else:
        # Assume Python 3
        for i in range(0, len(l), n):
            yield l[i:i + n]


def run_asyncio_commands(tasks, max_concurrent_tasks=0):
    """Run tasks asynchronously using asyncio and return results

    If max_concurrent_tasks are set to 0, no limit is applied.

    Note:
        By default, Windows uses SelectorEventLoop, which does not support
        subprocesses. Therefore ProactorEventLoop is used on Windows.
        https://docs.python.org/3/library/asyncio-eventloops.html#windows
    """

    all_results = []

    if max_concurrent_tasks == 0:
        chunks = [tasks]
    else:
        chunks = make_chunks(l=tasks, n=max_concurrent_tasks)

    for tasks_in_chunk in chunks:
        if platform.system() == 'Windows':
            loop = asyncio.ProactorEventLoop()
            asyncio.set_event_loop(loop)
        else:
            loop = asyncio.get_event_loop()

        commands = asyncio.gather(*tasks_in_chunk)  # Unpack list using *
        results = loop.run_until_complete(commands)
        all_results += results
        loop.close()
    return all_results


if __name__ == '__main__':

    start = time.time()

    if platform.system() == 'Windows':
        # Commands to be executed on Windows
        commands = [
            ['hostname']
        ]
    else:
        # Commands to be executed on Unix
        commands = [
            ['du', '-sh', '/var/tmp'],
            ['hostname'],
        ]
    cmds = [["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"]]

    tasks = []
    for command in cmds:
        tasks.append(run_command_shell(*command))


    # # Shell execution example
    # tasks = [run_command_shell('copy c:/somefile d:/new_file')]

    # # List comprehension example
    # tasks = [
    #     run_command(*command, get_project_path(project))
    #     for project in accessible_projects(all_projects)
    # ]

    results = run_asyncio_commands(tasks, max_concurrent_tasks=20)  # At most 20 parallel tasks
    print('Results:', results)

    end = time.time()
    rounded_end = ('{0:.4f}'.format(round(end-start,4)))
    print('Script ran in about', str(rounded_end), 'seconds')

相关:Non-blocking read from multiple subprocesses (Python)

最佳答案

事实证明,该问题可能与通过多线程异步等进行的代码优化无关。

原因可能是服务器限制,例如打开文件/文件描述符 (FD) 的最大数量、防火墙、其他配置文件。

如果您偶然发现类似的问题:

<小时/>

安装htop

Htop是一款适用于 Linux/Unix 类系统的交互式实时进程监控应用程序,也是 top 的便捷替代品。命令,这是所有 Linux 操作系统上预装的默认进程监控工具。

这可能有助于澄清原因。

<小时/>

测试单个 ffmpeg 命令

正如jfs所说,我需要Minimal, Complete, and Verifiable example 。因此,我们从一个非常小的开始:测试一个流程。

ffmpeg -y -i udp://224.10.0.123:1234  -f null -

In my case it turned out that any multicast will hang in 2:10 - 2:20 minutes. Process alive but in zombie mode. This is very strange, because a couple of days ago everything worked perfectly.

<小时/>

测试另一个软件 ( VLC's multicat )

Multicat最新官方版本编号为2.2,已发布here .

拿到它,不要忘记,biTStream需要在构建时安装。

使用命令检查流以从流中录制视频:

timeout 10 multicat -uU @224.10.0.123:1234 test.ts

In my case, the same thing happened on the 2nd minute. The command does not stop executing, but the file ceased to be recorded.

<小时/>

检查打开文件/文件描述符的最大数量 more info

使用以下命令显示打开文件描述符的最大数量:

cat /proc/sys/fs/file-max

要查看硬值和软值,请发出以下命令:

ulimit -Hn
ulimit -Sn

At some point in the execution of one of my python scripts, I saw a similar error, but the increase in this parameter did not help me.

<小时/>

摘要

所以问题与我的脚本的并行执行无关。在另一台虚拟机上验证成功。我联系了设置此虚拟机的人并向他解释说最近几天出现了问题,表明问题出在防火墙中。他说他没有碰任何东西。但在这次通话之后,一切都开始完美。 (我几乎确定他弄坏了它):D

GL大家!

关于python - 从多个 shell 子进程中非阻塞实时读取 (Python),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48618709/

相关文章:

android - 如果我的应用程序在后台运行,doInBackground 是否仍然运行?

python - 按轴 0 中的单列组合 pandas DataFrame

python - 为什么我不理解枚举函数?

python - 在 Python 中,函数是执行所需操作的代码块,而方法是特定于某些对象的函数。这个说法是真的吗?

c - 具有多个 Pthread 的 MPI

c# - 为什么文件异步API会阻塞

python - django-admin:覆盖保存方法时为 "super() argument 1 must be type, not None"

java - 同步问题: I want the main thread to be run before another thread but it sometimes doesn´t

c - TCP套接字到多个IP/端口

javascript - 如何在不在外部缓存的情况下将 'this' 传递给 Promise?