.net - 长时间运行的 MSMQ 消息处理。限制工作线程数

标签 .net multithreading system.reactive msmq messaging

环境

  • 主机应用程序是一个Windows 服务 (.NET 3.5);
  • 目前还无法切换到 .NET 4+ 框架。

过程

  • 处理一条消息最多可能需要 5 分钟。
  • 使用异步模式接收消息(ReceiveCompleted 事件订阅):

    private void OnMessageReceived(object source, ReceiveCompletedEventArgs eventArgs) {
    
        var queue = (MessageQueue)source;
        Message m = null;
    
        try {                
            var processor = new Thread(() => {
                try {
                    // 1. process the message
                    // 2. send a feedback to m's response queue
                }
                catch(Exception ex) {
                    Logger.Log.Error(ex);
                    // 1. send a feedback to m's response queue
                } 
            };
            processor.Start();
        }
        catch(Exception ex) {
            Logger.Log.Error(ex);
        }
    
        queue.BeginReceive();
    }
    

问题

我猜生成单独的工作线程之后应该有一些限制。

假设我想使用1-5 个工作线程(最大值) 处理消息,如果所有可用的工作线程都很忙,那么:

  • 省略对当前消息的处理,将反馈发送到响应队列(消息丢失);
  • 消息被送回队列,发送反馈(处理被推迟);

if all available workers are busy 这部分是否意味着我必须实现类似于线程池的东西?

我添加了 到这个问题。这是由于我看到的一些代码使用了 Rx Buffering。我不太明白这是否适用于我的情况。我可以在我的案例中使用 Rx 吗?

最佳答案

如果您的服务由 IIS 托管,IIS 可能会阻止这种情况,但您可以尝试使用 ThreadPool特别是使用它的 SetMaxThreads 方法。请注意,您不能将它们设置为低于当前处理器数量,限制是应用程序范围的(注意这可能对您的应用程序正在使用的库产生不利影响),并且取决于主机可能正在处理您的服务防止您以这种方式设置线程。

你可以尝试用这个运行你自己的,实现一个private static Queue<Thread> threadPool; ,用你想要的线程数静态初始化它,然后做这样的事情:

var processor = threadPool.Dequeue();
...
// there's no easy thread-safe way to check this in .net 3.5 - 
// if you can use concurrent queue you could do this after checking if 
// there's anything available in the queue with a ConcurrentQueue<Thread>
//queue.BeginReceive(); // call this from the end of your thread execution now

您的线程可以 Enqueue自己回到threadPool完成处理后排队,然后调用 BeginReceive()在 MSMQ queue对象,以便进程可以继续(将 queue 传递给线程,如此处记录 https://msdn.microsoft.com/en-us/library/ts553s52(v=vs.85).aspx )。

如果您对如何实现 ConcurrentQueue 感兴趣在 .NET 3.5 中,您可以随时查看 Reference Source - 但就我个人而言,我宁愿硬着头皮升级到最新的 .NET 版本。

关于.net - 长时间运行的 MSMQ 消息处理。限制工作线程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36178104/

相关文章:

c# - 如何在 TrackBar 中使用 float

.net - Visual Studio 中的相对路径

java - Java线程什么时候空闲?

C++ TinyThread 和带有 FreeGLUT 的 OpenGL

system.reactive - 接收 : operator for getting first and most recent value from an Observable stream

c# - 如何设置串行特殊字符?

c# - 如何比较 FieldInfo 的实例值?

c# - 在多线程中调用静态方法

c# - Rx,动态合并源

c# - 响应式扩展 - 返回相同的可观察对象还是创建新的?