我正在使用 gstreamer 在 python 中构建流应用程序。
应用程序使用 tee
元素将数据写入 rtmpsink 和 filesink。在理想环境(本地网络)中启动和流式传输工作正常,但是如果与流式传输服务器断开连接该怎么办?我正在尝试找出如何保持管道运行,从而在发生错误后继续写入文件接收器...
我要存档的内容:
- 至少我想在流部分(rtmpsink)发生错误后保留我的存档文件(filesink)。在此,如果发生错误,我们有一些备份。
- 手动重新连接到流媒体服务器。
- 建立某种机制来检查连接并在可能的情况下重新连接流部分 (rtmpsink)。
问题:
是否可以存档我正在尝试做的事情?
如何存档(动态管道/探针/额外元素)?
任何解释、示例或指向正确方向的信息都将非常感激。
注意:
Gst 版本: gstreamer 1.3.90(rtmpsink、faac、x264enc)
操作系统: ubuntu 14.04 LTS
流媒体服务器: wowza 4.x
最佳答案
我还一直致力于尝试在出现错误后重新连接到 RTMP 服务器的管道。原则上我同意@mpr's answer (使用与 shmsink/shmsrc 对连接的两个管道)但我无法使其可靠地工作,因此我最终使用了不同的策略。
我正在使用rtmp2sink
,当遇到错误时,它将在管道总线上发布一条消息,然后返回 GST_FLOW_FLUSHING,这会导致管道刷新所有内容。这不是我感兴趣的,所以我在 rtmp2sink 前面添加了一个 GhostPad,它捕获返回值并将其转回 GST_FLOW_OK。此时,我还重置了 rtmp2sink
元素以使其重新连接。
这看起来相当可靠,至少对于我使用的 RTMP 服务器,我不需要做任何特殊的事情来处理来自编码器的关键帧。
所有这些都已使用 Gstreamer 版本 1.18.5 进行了测试。下面是一个非常基本的 Python 示例,展示了这种方法:
#!/usr/bin/env python3
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
def _handle_message(_, message, loop):
"""handle messages posted to pipeline bus"""
if message.type == Gst.MessageType.EOS:
print("End-of-stream")
loop.quit()
elif message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
if message.src.__class__.__name__ == "GstRtmp2Sink" and err.matches(
Gst.ResourceError.quark(), Gst.ResourceError(err.code)
):
resource_error = Gst.ResourceError(err.code)
print(f"caught {resource_error} from rtmp2sink, ignoring")
else:
print(f"caught error {err} ({debug}), exiting")
loop.quit()
return True
def _wrap_rtmp_sink(rtmpsink: Gst.Element):
"""wrap RTMP sink to make it handle reconnections"""
def _chain(pad: Gst.Pad, _, buffer: Gst.Buffer):
internal_pad = pad.get_internal()
result = internal_pad.push(buffer)
if result == Gst.FlowReturn.FLUSHING or result == Gst.FlowReturn.ERROR:
print(f"Restarting RTMP sink after {result}")
rtmpsink.set_state(Gst.State.NULL)
rtmpsink.set_state(Gst.State.PLAYING)
return Gst.FlowReturn.OK
return result
sinkpad = rtmpsink.get_static_pad("sink")
peer = sinkpad.get_peer()
peer.unlink(sinkpad)
ghost_pad = Gst.GhostPad.new("proxypad", sinkpad)
ghost_pad.set_chain_function_full(_chain)
peer.link(ghost_pad)
ghost_pad.activate_mode(Gst.PadMode.PUSH, True)
# hang on to GhostPad to avoid Python garbage collecting it
rtmpsink._ghost_pad = ghost_pad
def main():
Gst.init(None)
pipeline = Gst.parse_launch(
f"""
videotestsrc
! video/x-raw,width=1280,height=720,framerate=30/1
! avenc_h264_videotoolbox
! h264parse
! flvmux.video
audiotestsrc
! faac
! flvmux.audio
flvmux name=flvmux streamable=true
! queue
! rtmp2sink name=rtmp location=rtmp://10.1.0.10/test/test
"""
)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", _handle_message, loop)
_wrap_rtmp_sink(pipeline.get_by_name("rtmp"))
pipeline.set_state(Gst.State.PLAYING)
loop.run()
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
main()
关于python - Gstreamer,如何从(rtmpsink)错误中恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27905606/