环境
- 主机应用程序是一个Windows 服务 (.NET 3.5);
- 目前还无法切换到 .NET 4+ 框架。
过程
- 处理一条消息最多可能需要 5 分钟。
使用异步模式接收消息(
ReceiveCompleted
事件订阅):private void OnMessageReceived(object source, ReceiveCompletedEventArgs eventArgs) { var queue = (MessageQueue)source; Message m = null; try { var processor = new Thread(() => { try { // 1. process the message // 2. send a feedback to m's response queue } catch(Exception ex) { Logger.Log.Error(ex); // 1. send a feedback to m's response queue } }; processor.Start(); } catch(Exception ex) { Logger.Log.Error(ex); } queue.BeginReceive(); }
问题
我猜生成单独的工作线程之后应该有一些限制。
假设我想使用1-5 个工作线程(最大值) 处理消息,如果所有可用的工作线程都很忙,那么:
- 省略对当前消息的处理,将反馈发送到响应队列(消息丢失);
- 消息被送回队列,发送反馈(处理被推迟);
if all available workers are busy 这部分是否意味着我必须实现类似于线程池的东西?
我添加了 system.reactive到这个问题。这是由于我看到的一些代码使用了 Rx Buffering。我不太明白这是否适用于我的情况。我可以在我的案例中使用 Rx 吗?
最佳答案
如果您的服务由 IIS 托管,IIS 可能会阻止这种情况,但您可以尝试使用 ThreadPool
特别是使用它的 SetMaxThreads
方法。请注意,您不能将它们设置为低于当前处理器数量,限制是应用程序范围的(注意这可能对您的应用程序正在使用的库产生不利影响),并且取决于主机可能正在处理您的服务防止您以这种方式设置线程。
你可以尝试用这个运行你自己的,实现一个private static Queue<Thread> threadPool;
,用你想要的线程数静态初始化它,然后做这样的事情:
var processor = threadPool.Dequeue();
...
// there's no easy thread-safe way to check this in .net 3.5 -
// if you can use concurrent queue you could do this after checking if
// there's anything available in the queue with a ConcurrentQueue<Thread>
//queue.BeginReceive(); // call this from the end of your thread execution now
您的线程可以 Enqueue
自己回到threadPool
完成处理后排队,然后调用 BeginReceive()
在 MSMQ queue
对象,以便进程可以继续(将 queue
传递给线程,如此处记录 https://msdn.microsoft.com/en-us/library/ts553s52(v=vs.85).aspx )。
如果您对如何实现 ConcurrentQueue
感兴趣在 .NET 3.5 中,您可以随时查看 Reference Source - 但就我个人而言,我宁愿硬着头皮升级到最新的 .NET 版本。
关于.net - 长时间运行的 MSMQ 消息处理。限制工作线程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36178104/