c# - 线程等待时持续处理

标签 c# multithreading task

我正在执行网络请求来获取消息,然后等待该消息的处理,然后再次重复整个过程。

消息的处理将长时间运行,并且线程可能处于等待状态,这可能允许它在其他地方使用。我想要的是继续 while 循环,获取更多消息并在线程空闲时处理它们。

当前同步代码:

while(!cancellationToken.IsCancelled) {
  var message = await GetMessage();

  await ProcessMessage(message); // I'll need it to continue from here if thread is released.
}

这个使用的场景是消息队列Consumer服务。

最佳答案

鉴于 async 的使用/await ,您当前的代码不一定是 synchronous (就线程而言 - 可以在不同线程上调用延续),尽管显然必须维护获取消息和处理消息之间的依赖关系。

Re: the thread may be in a waiting state that may allow it to be used elsewhere

等待编码良好的 I/O 密集型工作根本不需要消耗线程 - 请参阅 Stephen Cleary's There is no thread 。假设两个等待的任务是 IO 密集型的,您的代码在等待 IO 密集型工作时可能根本不会消耗任何线程,即应用程序的其余部分将使用线程池。因此,如果您唯一关心的是浪费线程,那么就不需要更多了。

但是,如果您关心的是性能和额外吞吐量,是否有下游容量来并发调用 ProcessMessage (例如多个下游 Web 服务器或额外的数据库容量),那么您可以考虑并行化 IO 绑定(bind)工作(同样,不需要更多线程池线程)

例如,如果您能够重写 GetMessages调用一次检索一批,您可以尝试以下操作:

var messages = await GetMessages(10);
var processTasks = messages
    .Select(message => ProcessMessage(message));
await Task.WhenAll(processTasks);

(如果您无法触摸代码,您可以循环 GetMessages 来检索 Task.WhenAll 之前的 10 条单独的消息)

但是,如果您没有任何进一步的并发能力 ProcessMessage调用,那么您应该考虑解决瓶颈 - 例如添加更多服务器、优化代码或并行化 ProcessMessage 中完成的工作工作等

理由是,正如你所说,GetMessages从队列中检索数据。如果您没有能力处理您检索到的消息,您所能做的就是将消息排队到其他地方,这似乎毫无意义 - 而是将消息留在队列中,直到您准备好处理它们。队列深度还将创建积压工作的可见性,您可以对其进行监控。

编辑,回复:偶尔一个ProcessMessage()通话时间比其他人长得多

根据评论,OP 有额外的信息,偶尔 ProcessMessage调用比其他调用花费的时间要长得多,并且希望在此期间继续处理其他消息。

一种方法可能是使用此 clever pattern here 对并行任务应用超时。 ,如果达到该值,任何长时间运行的 ProcessTasks 都会继续运行,并继续处理下一批消息。

下面的内容有潜在的危险,因为它需要仔细平衡超时(低于 1000 毫秒)与观察到的不当行为的频率 ProcessMessage调用 - 如果超时与“慢”ProcessMessages 的频率相比太低,下游资源可能会不堪重负。

更安全(但更复杂)的添加是跟踪不完整 ProcessMessage 的并发数量。任务通过 Task.IsCompleted ,如果达到阈值,则等待完成足够的这些任务以使积压达到安全水平。

while(!cancellationToken.IsCancelled) 
{
   // Ideally, the async operations should all accept cancellationTokens too
   var message = await GetMessages(10, cancellationToken);
   var processTasks = messages
      .Select(message => ProcessMessage(message, cancellationToken));
   await Task.WhenAny(Task.WhenAll(processTasks), 
                      Task.Delay(1000, cancellationToken));
 }

回复:下游负载安全水平的节流 - TPL DataFlow很可能在这里有用。

关于c# - 线程等待时持续处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46386834/

相关文章:

c# - GroupPrincipal.Members.Add(userPrincipal) 抛出错误

c# - 在 EXTJs 网格面板中使用 JSON 日期

c# - 如何随机分配按钮内容和颜色?

javascript - λ : folktale data. 任务 - 应用未知数量的任务

msbuild - 如何解决 : Custom MSBuild task requires assembly outside of AppBase

c# - 带有 EF.Functions.Contains() 的单元测试方法

java - 为什么我会收到并发修改错误?

c++ - 如何在 std::queue 中存储 char 缓冲区

java - 在 Android 应用程序中使用线程

python - 任务管理守护进程