c# - 使用 ReceiveById 时糟糕的 MSMQ 性能

标签 c# .net performance msmq message-queue

每秒仅 20 条消息!这就是我的全部!下面是从队列中查看 50 条消息并使用 ReceiveById 并行接收它们的代码。队列中的消息总数为 500。我也测试了其他数字。但上限是每秒 20 条消息!我在某个完全不碍事的地方吗?

编辑 1:

1 - 我需要队列是可恢复的。但有趣的是,即使我将可恢复选项设置为 false;上限仍然是 20 条消息/秒。

2 - 我不得不在这里使用 MSMQ,因为涉及到一些遗留应用程序。但如果这个代码是正确的,并且这个前 20 名的限制真的存在,我可以说服该组转换。因此,我们非常欢迎任何关于替换 MSMQ 的建议(基于实际经验)(请注意,我们需要保留我们的消息,以防出现任何类型的故障)。

3 - 我已将 ThreadPool 中的线程数设置为较高的数量以防它有所帮助,但实际上在这段代码中它会导致创建 100 - 200 个线程。我测试了从 50 到 10000 的不同数字,没有任何区别。

4 - 在每个任务中创建一个新的 MessageQueue,因为 ReceiveById 不是线程安全的。

5 - 正如您在代码中看到的那样,消息大小非常小;它只是一个字符串加一个 int。

编辑 2:[非常奇怪的新结果]

我仔细研究了这段代码并发现:如果我在任务中注释掉 singleLocal.UseJournalQueue = false; 行,我每秒最多可以读取 1200 条消息。不令人印象深刻,但在我的情况下可以接受。 奇怪的是 UseJournalQueue 的默认值是 false;为什么再次将其设置为 false 会对性能产生如此大的影响?

static partial class Program
{
    static void Main(string[] args)
    {
        ThreadPool.SetMaxThreads(15000, 30000);
        ThreadPool.SetMinThreads(10000, 20000);

        var qName = @".\private$\deep_den";

        if (!MessageQueue.Exists(qName))
        {
            var q = MessageQueue.Create(qName);
        }

        var single = new MessageQueue(qName);
        single.UseJournalQueue = false;
        single.DefaultPropertiesToSend.AttachSenderId = false;
        single.DefaultPropertiesToSend.Recoverable = true;
        single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

        var count = 500;
        var watch = new Stopwatch();

        watch.Start();
        for (int i = 0; i < count; i++)
        {
            var data = new Data { Name = string.Format("name_{0}", i), Value = i };

            single.Send(new Message(data));
        }
        watch.Stop();

        Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
        Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);

        var enu = single.GetMessageEnumerator2();

        watch.Reset();
        watch.Start();
        while (Interlocked.Read(ref __counter) < count)
        {
            var list = new List<Message>();
            var peekCount = 50;

            while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
            {
                try
                {
                    list.Add(enu.Current);
                    peekCount--;
                }
                catch (Exception ex2)
                {
                    Trace.WriteLine(ex2.ToString());
                    break;
                }
            }

            var tlist = new List<Task>();
            foreach (var message in list)
            {
                var stupid_closure = message;

                var t = new Task(() =>
                {
                    using (var singleLocal = new MessageQueue(qName))
                    {
                        singleLocal.UseJournalQueue = false;
                        singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
                        singleLocal.DefaultPropertiesToSend.Recoverable = true;
                        singleLocal.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

                        try
                        {
                            // processing the message and insert it into database
                            // workflow completed here, so we can safely remove the message from queue

                            var localM = singleLocal.ReceiveById(stupid_closure.Id);
                            var localSample = (Data)localM.Body;

                            Interlocked.Increment(ref __counter);
                            Console.WriteLine(Interlocked.Read(ref __counter));
                        }
                        catch (MessageQueueException ex) { if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); }
                        catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); }
                    }
                }, TaskCreationOptions.PreferFairness);

                tlist.Add(t);
            }

            foreach (var t in tlist) t.Start();

            Task.WaitAll(tlist.ToArray());

            list.Clear();
        }
        watch.Stop();
        Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
        Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);

        Console.WriteLine("press any key to continue ...");
        Console.ReadKey();
    }
    static long __counter = 0;
}

最佳答案

卡维, 如果启用了消息队列对象的日志设置,则您正在使用的 MessageQueue 对象的构造函数将 UseJournalQueue 属性设置为 true。它以某种方式认为 .\private$\deep_den 的日志设置已启用。编辑 - 您使用的是预先创建的队列吗?

关于c# - 使用 ReceiveById 时糟糕的 MSMQ 性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14167049/

相关文章:

c# - 添加到 Hashset<T> 时出现 IndexOutOfRangeException

c# - 你能在 C# 中定义一个预先指定大小的数组吗?

C# <===> VB.Net 关键字映射

java - 为什么我遇到长时间的非 GC 相关应用程序暂停?

performance - 如何知道网页的真实大小?

类内的 C# 命名空间?

c# - 如何将 Postmark 用于 ASP.NET 登录控件?

c# - 有什么方法可以解决由第三方库引起的OS加载程序锁死锁?

c# - 'split' 基于条件的通用列表的最(性能)最有效和可读的方法是什么?

.net - System.Drawing.Color 操作的扩展方法库