linux - 将 "broadcast"数据传送到 linux 中的多个进程的规范方法?

标签 linux sockets

我有一个应用程序需要将数据流从一个进程发送到多个读取器,每个读取器都需要查看自己的数据流副本。这是相当高的速率(100MB/s 并不少见),所以我想尽可能避免重复。在我的理想世界中,Linux 会命名管道支持多个读取器,并为常见的单读取器情况提供快速路径。

我想要一些提供某种命名空间隔离措施的东西(例如:127.0.0.1 上的广播对我相信的任何进程开放......)。 Unix 域套接字不支持广播,而且 UDP 无论如何都是“不可靠的”(在我的情况下,服务器将丢弃数据包而不是阻塞)。我以为我可以创建一个共享内存段并将公共(public)缓冲区存储在那里,但这感觉就像重新发明轮子。在 linux 中是否有规范的方法来做到这一点?

最佳答案

I supposed I could create a shared-memory segment and store the common buffers there, but that feels like reinventing the wheel. Is there a canonical way to do this in linux?



简答:

长答案: [你在正确的轨道上]

我以前不得不这样做[为了更高的速度],所以我不得不研究这个。以下是我想出的。

在主进程中,创建一个共享缓冲区池[根据您的选择使用 SysV shm 或私有(private) mmap]。为他们分配 ID 号(例如 1,2,3,...)。现在有一个来自 bufid 的映射来缓冲内存地址。要使子进程可以访问它,请在 fork 之前执行此操作。 children 也继承了共享内存映射,所以没有太多工作

现在 fork child 。给他们每个人一个唯一的进程 ID。您可以从一个数字开始逐渐增加:2,3,4,... [main is 1] 或仅使用常规 pid。

打开 SysV 消息 channel (msgget 等)。同样,如果您在 fork 之前在主进程中执行此操作,则它们可供子进程使用 [IIRC]。

现在这是它的工作原理:

main 找到一个未使用的缓冲区并填充它。对于每个 child ,main 通过 msgsnd 发送 IPC 消息(在单个公共(public) IPC channel 上)其中消息有效负载 [ mtext ] 是 bufid 编号。每条消息都有标准标题的 mtype字段设置为目标 child 的 pid。

执行此操作后,main 将缓冲区记住为“正在运行”且不可重用。

每个 child 做一个 msgrcvmtype设置为它的pid。然后它从 mtext 中提取 bufid并处理缓冲区。当它完成时,它发送一个带有 mtype 的 IPC 消息 [再次在同一 channel 上]使用 mtext 设置为 main 的 pid它刚刚处理的 bufid。

main 的循环执行非阻塞 msgrcv ,注意给定 bufid 的所有“释放”消息。当所有 child 都释放了缓冲区时,它会被放回缓冲区“空闲队列”。在 main 的服务循环中,它可能会填充新的缓冲区并根据需要发送更多消息 [穿插等待]。

然后 child 执行 msgrcv 并重复循环。

因此,我们使用 [大] 共享内存缓冲区和短 [几个字节] bufid 描述符 IPC 消息。

好的,那么您可能会问这个问题:“为什么要为通信 channel 使用 SysV IPC?” [对多个管道或套接字]。

您已经知道共享缓冲区可以避免发送多个数据副本。

所以,这就是要走的路。但是,为什么不通过套接字或管道 [或共享队列、条件变量、互斥锁等] 发送上述 bufid 消息呢?

答案是速度和目标进程的唤醒特性。

对于高度实时的响应,当 main 发出 bufid 消息时,您希望目标进程 [如果它一直在休眠] 立即唤醒并开始处理缓冲区。

我检查了 linux 内核源代码,唯一具有该特性的机制是 SysV IPC。所有其他人都有[调度]滞后。

当进程 A 执行时 msgsnd在进程 B 已经完成的 channel 上 msgrcv上,会发生三件事:
  • 进程 B 将被调度程序标记为可运行。
  • [IIRC] B 将移到其调度队列的前面
  • 此外,更重要的是,这会导致立即重新安排所有进程。

  • B 将立即启动 [而不是下一个计时器中断或其他某个进程刚好进入休眠状态]。在单核机器上,A 将进入休眠状态,而 B 将代替它运行。

    警告:我所有的研究都是在 CFS 调度程序之前几年完成的,但是,我相信上述内容仍然适用。此外,我使用了 RT 调度程序,如果 CFS 无法按预期工作,这可能是一个可能的选择。

    更新:

    Looking at the POSIX message queue source, I think that the same immediate-wakeup behavior you discussed with the System V queues is going on, which gives the added benefit of POSIX compatibility.



    时间语义是可能的 [并且是可取的] 所以我不会感到惊讶。但是,SysV 实际上比 POSIX mqueue 更加标准和普遍。而且,存在一些语义差异 [见下文]。

    对于计时,您可以使用 nsec 时间戳构建一个单元测试程序 [仅使用 msgs]。我使用了 TSC 邮票,但 clock_gettime(CLOCK_REALTIME,...)也可能工作。盖章出发时间和到达/唤醒时间以查看。比较 SysV 和 mq

    使用 SysV 或 mq,您可能需要通过/proc/* 提高最大 msgs 数、最大 msg 大小、最大队列数。默认值相对较小。如果不这样做,您可能会发现任务被阻塞,等待 msg,但由于超出了 msg 队列最大参数,master 无法发送一个 [被阻塞]。我实际上有这样的错误,所以我更改了我的代码以在启动期间提高这些值 [它以 root 身份运行]。因此,您可能需要将此作为 RC 启动脚本(或任何 [atrocious ;-)] systemd 等效项)

    我看着在我自己的代码中使用 mq 替换 SysV。对于多对一返回空闲池消息,它没有相同的语义。在我最初的回答中,我忘记提到需要两个 msg 队列:master-to-children (e.g. work-to-do) 和 children-to-master (e.g. 返回一个现在可用的缓冲区)。

    我有几种不同类型的缓冲区(例如压缩视频、压缩音频、未压缩视频、未压缩音频),它们具有不同的类型和结构描述符。

    此外,多个不同的缓冲区队列,因为这些缓冲区从一个线程传递到另一个线程 [不同的处理阶段]。

    使用 SysV,您可以将单个 msg 队列用于多个缓冲区列表/队列,缓冲区列表 ID 是 msg mtype .一个 child msgrcv等待 mtype设置为 ID 值。主节点等待返回空闲消息队列 mtype的 0。

    mq* 需要单独的 mqd_t对于每个 ID,因为它不允许等待 msg 子类型。
    msgrcv允许 IPC_NOWAIT在每次调用时,但要获得与 mq_receive 相同的效果你必须用 O_NONBLOCK 打开队列或使用定时版本。这在“关闭”或“重新启动”阶段使用(例如,向 child 发送消息,表示不会有更多数据到达,他们应该终止[或重新配置等])。 IPC_NOWAIT在程序启动期间“排空”队列[从先前调用中去除陈旧消息]或在操作期间从先前配置中排出陈旧消息非常方便。

    因此,您需要一个单独的 mqd_t 而不是两个 SysV msg 队列来处理任意数量的缓冲区列表。对于每个缓冲区列表/类型。

    关于linux - 将 "broadcast"数据传送到 linux 中的多个进程的规范方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35996848/

    相关文章:

    linux - bash 中的字符串/数组操作?

    linux - 我无法在终端中删除类似 "#test.cpp#~"的文件

    c++ - 无法从端口 2368 (Linux) C 接收 UDP 流

    node.js - 套接字流对话的 Node.js 范例是什么?

    linux - 如何提取字符串和它之前的字符之间的文本?

    linux - Shell如何以编程方式实现管道?

    Linux 管道行为

    perl - 无法通过 perl 套接字发送流水线 http 请求

    接受后配置 TCP keepalive

    .net - 我如何使用 TCPListener 更好地处理客户端?