python - Gstreamer,如何从(rtmpsink)错误中恢复

标签 python error-handling rtmp gstreamer

我正在使用 gstreamer 在 python 中构建流应用程序。

应用程序使用 tee 元素将数据写入 rtmpsink 和 filesink。在理想环境(本地网络)中启动和流式传输工作正常,但是如果与流式传输服务器断开连接该怎么办?我正在尝试找出如何保持管道运行,从而在发生错误后继续写入文件接收器...

我要存档的内容:

  1. 至少我想在流部分(rtmpsink)发生错误后保留我的存档文件(filesink)。在此,如果发生错误,我们有一些备份。
  2. 手动重新连接到流媒体服务器。
  3. 建立某种机制来检查连接并在可能的情况下重新连接流部分 (rtmpsink)。

问题:

是否可以存档我正在尝试做的事情?

如何存档(动态管道/探针/额外元素)?

任何解释、示例或指向正确方向的信息都将非常感激。

注意:

Gst 版本: gstreamer 1.3.90(rtmpsink、faac、x264enc)
操作系统: ubuntu 14.04 LTS
流媒体服务器: wowza 4.x

测试应用程序(代码): link
启动后的管道(正常): link

rtmpsink错误后的管道(写入数据失败): link
rtmpsink错误后的日志片段(写入数据失败): link

最佳答案

我还一直致力于尝试在出现错误后重新连接到 RTMP 服务器的管道。原则上我同意@mpr's answer (使用与 shmsink/shmsrc 对连接的两个管道)但我无法使其可靠地工作,因此我最终使用了不同的策略。

我正在使用rtmp2sink,当遇到错误时,它将在管道总线上发布一条消息,然后返回 GST_FLOW_FLUSHING,这会导致管道刷新所有内容。这不是我感兴趣的,所以我在 rtmp2sink 前面添加了一个 GhostPad,它捕获返回值并将其转回 GST_FLOW_OK。此时,我还重置了 rtmp2sink 元素以使其重新连接。

这看起来相当可靠,至少对于我使用的 RTMP 服务器,我不需要做任何特殊的事情来处理来自编码器的关键帧。

所有这些都已使用 Gstreamer 版本 1.18.5 进行了测试。下面是一个非常基本的 Python 示例,展示了这种方法:

#!/usr/bin/env python3
import gi

gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib

def _handle_message(_, message, loop):
    """handle messages posted to pipeline bus"""
    if message.type == Gst.MessageType.EOS:
        print("End-of-stream")
        loop.quit()
    elif message.type == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        if message.src.__class__.__name__ == "GstRtmp2Sink" and err.matches(
            Gst.ResourceError.quark(), Gst.ResourceError(err.code)
        ):
            resource_error = Gst.ResourceError(err.code)
            print(f"caught {resource_error} from rtmp2sink, ignoring")
        else:
            print(f"caught error {err} ({debug}), exiting")
            loop.quit()
    return True

def _wrap_rtmp_sink(rtmpsink: Gst.Element):
    """wrap RTMP sink to make it handle reconnections"""

    def _chain(pad: Gst.Pad, _, buffer: Gst.Buffer):
        internal_pad = pad.get_internal()
        result = internal_pad.push(buffer)
        if result == Gst.FlowReturn.FLUSHING or result == Gst.FlowReturn.ERROR:
            print(f"Restarting RTMP sink after {result}")
            rtmpsink.set_state(Gst.State.NULL)
            rtmpsink.set_state(Gst.State.PLAYING)
            return Gst.FlowReturn.OK

        return result

    sinkpad = rtmpsink.get_static_pad("sink")
    peer = sinkpad.get_peer()
    peer.unlink(sinkpad)

    ghost_pad = Gst.GhostPad.new("proxypad", sinkpad)
    ghost_pad.set_chain_function_full(_chain)
    peer.link(ghost_pad)
    ghost_pad.activate_mode(Gst.PadMode.PUSH, True)

    # hang on to GhostPad to avoid Python garbage collecting it
    rtmpsink._ghost_pad = ghost_pad

def main():
    Gst.init(None)
    pipeline = Gst.parse_launch(
        f"""
        videotestsrc
            ! video/x-raw,width=1280,height=720,framerate=30/1
            ! avenc_h264_videotoolbox
            ! h264parse
            ! flvmux.video

        audiotestsrc
            ! faac
            ! flvmux.audio

        flvmux name=flvmux streamable=true
            ! queue
            ! rtmp2sink name=rtmp location=rtmp://10.1.0.10/test/test
    """
    )

    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", _handle_message, loop)

    _wrap_rtmp_sink(pipeline.get_by_name("rtmp"))

    pipeline.set_state(Gst.State.PLAYING)
    loop.run()
    pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    main()

关于python - Gstreamer,如何从(rtmpsink)错误中恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27905606/

相关文章:

python - Django - URL 模式不起作用

python - Chaco Legend 中的自定义标签

ruby-on-rails - 使用面向方面的编程进行Rails异常处理

ios - 用于在 iPhone 上流式传输的 Youtube 协议(protocol)

python - 在树莓派上设置 Google Assistant SDK 时遇到问题

python - 为什么在将选项放在其余位置参数之前时 argparse 会失败?

api - 401未经授权响应时Xamarin注销用户

asp.net - 从Application_Error中执行 Controller Action 将引发InvalidOperationException(未找到 View )

Android 流到 rstp 服务器并使用 nginx 转换为 rtmp

docker - Nginx 到 Docker 容器的动态代理