.net - 在 .Net 应用程序中管理来自单个 Socket 的并发读取

标签 .net multithreading sockets concurrency

我正在编写一个多线程的.Net 应用程序,其中许多调用者需要同时从单个套接字读取。

从套接字读取的数据被分成记录/数据 block - 每条记录都针对由 header 中的 CallerID 标识的特定调用者,因此例如从套接字读取的字节流可能类似于以下内容:

Header: CallerId = 1, Length = 5
Data:   "Hello"
Header: CallerId = 2, Length = 6
Data:   "Stack "
Header: CallerId = 1, Length = 7
Data:   " World!"
Header: CallerId = 2, Length = 8
Data:   "Overflow"

在这种情况下,调用者 1 读取的数据流应为“Hello World!”,而调用者 2 读取的数据流应为“Stack Overflow”。
  • 我的想法是,每个调用者数据流都应该通过继承自 Stream 的某个类公开,以便调用者能够使用同步(Read)和异步(BeginRead)编程模型。
  • 我想避免每个套接字有一个专用的读取线程,因为我的应用程序将在任何时候处理许多这样的套接字。
  • 调用者不应该互相阻塞,例如在上面的例子中,下面的例子不应该死锁:
  • caller1Stream.Read(12) // Hello world!
    caller2Stream.Read(14) // Stack Overflow
    

    我已经想办法做到这一点,但是我发现事情很快就变得复杂了,尤其是;由于需要按顺序读取数据,因此在为调用者读取数据时需要缓冲 - 这使得为给定调用者读取流的行为更加复杂:
  • 读者必须首先检查他们的缓冲区,看看其他人是否已经读取了足够的数据
  • 我正在努力寻找合适的 FIFO 内存缓冲区/流(是否已经存在或者我需要编写一个?)
  • 如果没有读取到足够的数据,那么读取器需要从套接字读取数据,直到已经读取了足够用于给定调用者的数据,同时缓冲用于其他调用者的数据。
  • 数据缓冲很好(我可以维护一个阅读器列表并通过调用者 ID 查找正确的缓冲区)但是,
  • 如何防止多个线程同时尝试从套接字读取但读取彼此数据的死锁?
  • 如何通知正在等待数据的调用者他们的数据已被另一个调用者读取?
  • 如何确保消息的 header 和数据 block 一起读取? (数据 block 是变长的,所以需要先读取定长的头 block 来确定要读取多少数据)

  • 当然,这一定是一个相当普遍的问题——难道没有一个 .Net 类可以让整个事情变得更简单吗? (请注意,我无法更改传输机制/协议(protocol))。有什么方法可以使用其中一种监听器样式的类来执行上述操作?

    如果没有,我该如何解决这些问题?

    最佳答案

    根据您的帖子,我假设您使用的是 TCP/IP,因为您指的是流式传输,但套接字也可用于 UDP 无连接协议(protocol)(和其他协议(protocol))。 UDP 非常简单,因为您只发送固定消息(无流式传输)。

    无论如何,由于您指的是接收消息流,因此我必须假设您使用的是 TCP/IP,并且如果您直接使用套接字,那么您要查找的内容存在一些重大问题。

    TCP/IP 是一种流解决方案,所以我放在一端的东西会从另一端出来,它总是有序的,但不一定在一起。因此,如果在一端写“Hello World”,则在阅读时可能会得到“Hello World”,或者“Hello”和“World”,甚至是“H”“ello World”。关键是,没有办法调用 read 并期望接收整个消息。

    我过去这样做的方式是只使用一个中间线程(或者在我的情况下,我只是使用 .net 线程池),它快速读取套接字,重新组装消息,然后将它们传递给要处理的队列。诀窍是重新组装,因为你不知道你收到了什么。在你的标题中说,消息长度是 int32,即 4 个字节。当你调用 read 时,你甚至可能需要重新组装这个长度,因为你只收到了你需要告诉长度的 4 个字节中的第一个字节。

    听起来你的标题是固定长度的,所以基本上你需要做的,假设 callerid 和长度是 uint32 的。尝试读取 8 个字节,如果小于 8 个,则放入缓冲区,直到我们读取 8 个字节。一旦我们读取了 8 个字节,取出 4 个字节并获得长度,为消息的长度分配一个缓冲区。尝试阅读,直到我们填满缓冲区,一旦填满,将其放入特定调用者的队列中,并通知线程有一条新消息以防它正在休眠。然后拿走剩下的一切,重新开始,或者等待更多数据。

    我过去成功地使用了它,它增加了相对较少的开销。

    关于.net - 在 .Net 应用程序中管理来自单个 Socket 的并发读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3368226/

    相关文章:

    c# - 绑定(bind)到数据表中的数字字段时如何允许空值?

    .net - 为什么 F# 不支持使用类型缩写扩展系统类型?

    python-3.x - 我应该如何设置 spaCy 服务器来处理多个并发请求(非阻塞)?

    c# - 分析 .net 多线程应用程序 (Visual Studio 2008)

    java - 当我使用http comet时,如何让tomcat发送tcp keepalive数据包

    .net - 收到错误消息: "The installed product does not match the installation source(s)"

    c# - 如何将两个 DateTime 与秒进行比较?

    iOS GCD 用于 UITableView

    java - 多线程服务器不接受客户端输出流

    Android客户端和服务器端编程