python - 在不停止管道的情况下在失败时重新启动接收器

标签 python gstreamer python-gstreamer

今天我决定改造我的小脚本,基于gst-launch , 进入一个真正的 Python/GStreamer 应用程序,以添加一些功能。

感谢 shout2send,我开发了一个小程序,将麦克风中的音频发送到 Icecast(filesink)和本地存储(tee) .

有时 shout2send由于网络问题,可以停止。我想每 N 秒重新启动这个元素,直到连接恢复,而不停止管道,因为本地音频文件不应受到网络条件的影响。

这是我尝试过的:

  • 网络错误后一秒停止/启动管道(结果:流媒体工作,本地文件被截断)
  • 取消链接 tee , 设置 shout2send状态到 NULL并将其从管道中删除(结果:GStreamer 严重错误,如 Trying to dispose element ... but it is in PLAYING instead of the NULL state )
  • 试图了解在这种情况下如何使用 pads(结果:同上,但涉及更多代码)

  • 我该怎么办?

    这是我的代码的样子:
    import gi
    gi.require_version("Gst", "1.0")
    from gi.repository import GLib
    from gi.repository import Gst
    # [...]
    
    def message_handler(bus, message):
        if message.type == Gst.MessageType.ERROR:
            if message.src == shout2send:
                pass # TODO: restart the element
            else:
                print(message.parse_error())
                pipeline.set_state(Gst.State.NULL)
                exit(1)
        else:
            print(message.type)
    
    pipeline = Gst.Pipeline()
    message_bus = pipeline.get_bus()
    message_bus.add_signal_watch()
    message_bus.connect('message', message_handler)
    
    # [...]
    tee.link(queue0)
    queue0.link(filesink)
    tee.link(queue1)
    queue1.link(shout2send)
    

    更新 (9/12/15) : 添加非工作代码 + 日志

    我试着关注 "Dynamically changing the pipeline" fro GStreamer doc ,但我的代码不起作用。
    def event_probe(pad, info, *args):
        Gst.Pad.remove_probe(pad, info)
        queue1.unlink(shout2send)
        tee.unlink(queue1)
        pipeline.remove(shout2send)
        pipeline.remove(queue1)
        return Gst.PadProbeReturn.OK
    
    def message_handler(bus, message):
        if message.type == Gst.MessageType.ERROR:
            if message.src == shout2send:
                pad = queue1.get_static_pad('src')
                pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
            else:
                print(message.parse_error())
                pipeline.set_state(Gst.State.NULL)
                exit(1)
        else:
            print(message.type)
    

    如果我使用 GST_DEBUG=3 运行我的脚本,我会看到以下结果我在流式传输时重新启动 Icecast:
    [...]
    0:00:02.142033258  5462 0x55e414d900a0 WARN                  shout2 gstshout2.c:674:gst_shout2send_render:<shout2send> error: shout_send() failed: Socket error
    0:00:02.658137998  5462 0x55e414d90140 WARN                 basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: Internal data flow error.
    0:00:02.658169752  5462 0x55e414d90140 WARN                 basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: streaming task paused, reason error (-5)
    (GLib.Error('Internal data flow error.', 'gst-stream-error-quark', 1), 'gstbasesrc.c(2943): gst_base_src_loop (): /GstPipeline:pipeline0/GstPulseSrc:pulsesrc:\nstreaming task paused, reason error (-5)')
    0:00:02.658628129  5462 0x7f6ba8002a30 WARN                audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment
    

    最佳答案

    感谢 otopolsky 的评论,我做到了 :)

    我做错了什么:

  • 元素必须设置为 NULL : 这很重要
  • oggmux以后一定要入住tee , 在两个子管道上:否则 Icecast 将列出流但无法提供服务。对 opusenc 执行相同操作

  • 建议:
  • 没有必要取消所有不需要的元素的链接:只需在需要的地方中断
  • 没有必要从管道中删除您不需要的每个元素:如果您想重用它们,请保留它们

  • 最终代码(重新连接正常工作且独立于本地编码/录制):
    def event_probe2(pad, info, *args):
        Gst.Pad.remove_probe(pad, info.id)
        tee.link(opusenc1)
        opusenc1.set_state(Gst.State.PLAYING)
        oggmux1.set_state(Gst.State.PLAYING)
        queue1.set_state(Gst.State.PLAYING)
        shout2send.set_state(Gst.State.PLAYING)
        return Gst.PadProbeReturn.OK
    
    def reconnect():
        pad = tee.get_static_pad('src_1')
        pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe2, None)
    
    def event_probe(pad, info, *args):
        Gst.Pad.remove_probe(pad, info.id)
        tee.unlink(opusenc1)
        opusenc1.set_state(Gst.State.NULL)
        oggmux1.set_state(Gst.State.NULL)
        queue1.set_state(Gst.State.NULL)
        shout2send.set_state(Gst.State.NULL)
        GLib.timeout_add_seconds(interval, reconnect)
        return Gst.PadProbeReturn.OK
    
    def message_handler(bus, message):
        if message.type == Gst.MessageType.ERROR:
            if message.src == shout2send:
                pad = tee.get_static_pad('src_1')
                pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
            else:
                print(message.parse_error())
                pipeline.set_state(Gst.State.NULL)
                exit(1)
        else:
            print(message.type)
    

    小问题:
  • 我用 tee.get_static_pad('src_1') ,但我想我可以在某处获取 src id,而不是使用固定值
  • 可能整件事都可以用更好的形式编写(但这是我第一个使用 Python+Gstreamer 编写的程序,它可以工作,所以我很满意)
  • 为了避免数据丢失,我调用 pipeline.set_state(Gst.State.NULL)一秒后 pipeline.send_event(Gst.Event.new_eos()) ,但我仍然收到类似 WARN audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment 的消息

  • 代码:https://github.com/ViGLug/libre-streaming

    关于python - 在不停止管道的情况下在失败时重新启动接收器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34165272/

    相关文章:

    c++ - 部署 GStreamer SDK 应用程序

    linux - 从 remotePC 通过网络流式传输网络摄像头并在 opencv 中访问(在主机上)- {Linux env.}

    linux - 在python gstreamer中寻找大的原始视频文件

    python - 使用gstreamer,在不停止接收器的情况下播放播放列表

    python - 在 Python 中嵌套相同生成器的实例

    python - Matplotlib - 看不见的边距切断了 3D 绘图数据?

    python - 缺少 PyGST 插件?

    python - Pocketsphinx + Gstreamer 竞赛条件? Pocketsphinx无法在Python脚本中同时收听音频+录音?

    python - pip 安装与本地包具有相同命名空间的包

    python - 如何正确报告 Docker 容器内的可用 RAM?