c# - 从 C# 中的馈线接收大量 tcp 套接字财务数据的最佳方法?

标签 c# multithreading sockets concurrency tcp

我正在开发一个 C# Windows 服务,它将从使用 TCP 的馈线接收金融报价。我的项目必须接收和处理大量数据,因为我将跟踪 140 种用于每秒更新 SQL 数据库的不同 Assets 。

我在 BackgroundWork 线程中使用循环来汇集来自套接字的数据:

  try
  {
    // Send START command with the assets.
    if (!AeSocket.AeSocket.Send(Encoding.ASCII.GetBytes(String.Format("{0}{1}START{1}BC|ATIVO{1}{2}{3}", GlobalData.GlobalData.Id, GlobalData.GlobalData.Tab, AeSocket.AeSocket.GetAssets(),
                                                                      GlobalData.GlobalData.Ret))).Contains("OK"))
    {
      throw new Exception("Advise was no accepted.");
    }

    // Pool the socket and send all received string to the queue for processing in the background.
    while (true)
    {
      // Make sure the connection to the socket is still active.
      if (!AeSocket.AeSocket.Client.Connected)
      {
        throw new Exception("The connection was closed.");
      }

      // If no data is available in the socket, loop and keep waiting.
      if (!AeSocket.AeSocket.Client.Poll(-1, SelectMode.SelectRead))
      {
        continue;
      }

      // There are data waiting to be read.
      var data = new Byte[AeSocket.AeSocket.ReadBufferSize];
      var bytes = AeSocket.AeSocket.Client.Receive(data, 0);
      AeSocket.AeSocket.Response = Encoding.Default.GetString(data, 0, bytes);

      // Push into the queue for further processing in a different thread.
      GlobalData.GlobalData.RxQueue.Add(AeSocket.AeSocket.Response);
    }
  }
  catch
  {
    backgroundWorkerMain.CancelAsync();
  }
  finally
  {
    AeSocket.AeSocket.Client.Close();
    AeSocket.AeSocket.Client.Dispose();
  }

接收到的数据正在一个单独的线程中处理,以避免由于接收到的数据量很大而阻塞套接字接收事件。我正在使用 BlockingCollection (RxQueue)。

此集合正在被观察,如以下代码片段所示:

  // Subscribe to the queue for string processing in another thread.
  // This is a blocking observable queue, so it is run in this background worker thread.
  GlobalData.GlobalData.Disposable = GlobalData.GlobalData.RxQueue
    .GetConsumingEnumerable()
    .ToObservable()
    .Subscribe(c =>
    {
      try
      {
        ProcessCommand(c);
      }
      catch
      {
        // Any error will stop the processing.
        backgroundWorkerMain.CancelAsync();
      }
    });

然后将数据添加到 ConcurrentDictionary,由一秒计时器异步读取并保存到 SQL 数据库:

      // Add or update the quotation record in the dictionary.
      GlobalData.GlobalData.QuotDict.AddOrUpdate(dataArray[0], quot, (k, v) => quot);

一秒钟的系统计时器在另一个 BackgroundWorker 线程中处理,它保存从 ConcurrentDictionary 读取的报价数据。

  // Update the gridViewAssetQuotations.
  var list = new BindingList<MyAssetRecord>();

  foreach (var kv in GlobalData.GlobalData.QuotDict)
  {
    // Add the data in a database table...
  }

这是解决这种情况的好方法吗?

使用 BlockingCollection 作为异步队列和 ConcurrentDictionary 以允许从另一个线程异步读取是执行此操作的好方法吗?

我以前从套接字中汇集数据的方式如何:

      // If no data is available in the socket, loop and keep waiting.
      if (!AeSocket.AeSocket.Client.Poll(-1, SelectMode.SelectRead))
      {
        continue;
      }

有更好的方法吗?

此外,我必须每 4 秒向 TCP 服务器发送一个 KeepAlive 命令。我可以完全异步地完成它,忽略上面的池循环,使用另一个系统计时器,还是必须与池操作同步?服务器只允许在一个端口上进行连接。

提前感谢您的任何建议。

爱德华多·昆塔纳

最佳答案

您的循环将导致处理器使用率过高。如果没有数据要处理,请休眠(比如 1 毫秒),如果下一个周期仍然没有数据,请增加休眠时间,如果有数据,请减少休眠时间。这样,读取器循环将自动调整以适应数据流量并节省处理器周期。休息一切看起来不错。

关于c# - 从 C# 中的馈线接收大量 tcp 套接字财务数据的最佳方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20003233/

相关文章:

c# - 以编程方式使用 C# 和 LINQ 进行分组(可能是 OnSelecting?)

c# - SaveFileDialog:由于多线程应用程序中的 "owner"参数导致 InvalidOperationException

java - publishOn 和并行的区别

Java bufferedReader readline循环不会中断

检查 UDP 端口是否已在 C 中打开

c# - 如何在 C# 中生成字母数字优惠券代码?

c# - 无法使用 Datetime.ParseExact 转换日期时间字符串

C# 改变按钮文本的大小

java - vector 列表中的内存泄漏 Java OutOfMemoryError

java - 我无法在同一个文件上写入两次(java)