我正在执行网络请求来获取消息,然后等待该消息的处理,然后再次重复整个过程。
消息的处理将长时间运行,并且线程可能处于等待状态,这可能允许它在其他地方使用。我想要的是继续 while 循环,获取更多消息并在线程空闲时处理它们。
当前同步代码:
while(!cancellationToken.IsCancelled) {
var message = await GetMessage();
await ProcessMessage(message); // I'll need it to continue from here if thread is released.
}
这个使用的场景是消息队列Consumer服务。
最佳答案
鉴于 async
的使用/await
,您当前的代码不一定是 synchronous
(就线程而言 - 可以在不同线程上调用延续),尽管显然必须维护获取消息和处理消息之间的依赖关系。
block 引用>Re: the thread may be in a waiting state that may allow it to be used elsewhere
等待编码良好的 I/O 密集型工作根本不需要消耗线程 - 请参阅 Stephen Cleary's There is no thread 。假设两个等待的任务是 IO 密集型的,您的代码在等待 IO 密集型工作时可能根本不会消耗任何线程,即应用程序的其余部分将使用线程池。因此,如果您唯一关心的是浪费线程,那么就不需要更多了。
但是,如果您关心的是性能和额外吞吐量,是否有下游容量来并发调用
ProcessMessage
(例如多个下游 Web 服务器或额外的数据库容量),那么您可以考虑并行化 IO 绑定(bind)工作(同样,不需要更多线程池线程)例如,如果您能够重写
GetMessages
调用一次检索一批,您可以尝试以下操作:var messages = await GetMessages(10); var processTasks = messages .Select(message => ProcessMessage(message)); await Task.WhenAll(processTasks);
(如果您无法触摸代码,您可以循环
GetMessages
来检索Task.WhenAll
之前的 10 条单独的消息)但是,如果您没有任何进一步的并发能力
ProcessMessage
调用,那么您应该考虑解决瓶颈 - 例如添加更多服务器、优化代码或并行化ProcessMessage
中完成的工作工作等理由是,正如你所说,
GetMessages
从队列中检索数据。如果您没有能力处理您检索到的消息,您所能做的就是将消息排队到其他地方,这似乎毫无意义 - 而是将消息留在队列中,直到您准备好处理它们。队列深度还将创建积压工作的可见性,您可以对其进行监控。编辑,回复:偶尔一个
ProcessMessage()
通话时间比其他人长得多根据评论,OP 有额外的信息,偶尔
ProcessMessage
调用比其他调用花费的时间要长得多,并且希望在此期间继续处理其他消息。一种方法可能是使用此 clever pattern here 对并行任务应用超时。 ,如果达到该值,任何长时间运行的 ProcessTasks 都会继续运行,并继续处理下一批消息。
下面的内容有潜在的危险,因为它需要仔细平衡超时(低于 1000 毫秒)与观察到的不当行为的频率
ProcessMessage
调用 - 如果超时与“慢”ProcessMessages 的频率相比太低,下游资源可能会不堪重负。更安全(但更复杂)的添加是跟踪不完整
ProcessMessage
的并发数量。任务通过 Task.IsCompleted ,如果达到阈值,则等待完成足够的这些任务以使积压达到安全水平。while(!cancellationToken.IsCancelled) { // Ideally, the async operations should all accept cancellationTokens too var message = await GetMessages(10, cancellationToken); var processTasks = messages .Select(message => ProcessMessage(message, cancellationToken)); await Task.WhenAny(Task.WhenAll(processTasks), Task.Delay(1000, cancellationToken)); }
回复:下游负载安全水平的节流 - TPL DataFlow很可能在这里有用。
关于c# - 线程等待时持续处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46386834/