每秒仅 20 条消息!这就是我的全部!下面是从队列中查看 50 条消息并使用 ReceiveById 并行接收它们的代码。队列中的消息总数为 500。我也测试了其他数字。但上限是每秒 20 条消息!我在某个完全不碍事的地方吗?
编辑 1:
1 - 我需要队列是可恢复的。但有趣的是,即使我将可恢复选项设置为 false;上限仍然是 20 条消息/秒。
2 - 我不得不在这里使用 MSMQ,因为涉及到一些遗留应用程序。但如果这个代码是正确的,并且这个前 20 名的限制真的存在,我可以说服该组转换。因此,我们非常欢迎任何关于替换 MSMQ 的建议(基于实际经验)(请注意,我们需要保留我们的消息,以防出现任何类型的故障)。
3 - 我已将 ThreadPool 中的线程数设置为较高的数量以防它有所帮助,但实际上在这段代码中它会导致创建 100 - 200 个线程。我测试了从 50 到 10000 的不同数字,没有任何区别。
4 - 在每个任务中创建一个新的 MessageQueue,因为 ReceiveById 不是线程安全的。
5 - 正如您在代码中看到的那样,消息大小非常小;它只是一个字符串加一个 int。
编辑 2:[非常奇怪的新结果]
我仔细研究了这段代码并发现:如果我在任务中注释掉 singleLocal.UseJournalQueue = false; 行,我每秒最多可以读取 1200 条消息。不令人印象深刻,但在我的情况下可以接受。 奇怪的是 UseJournalQueue 的默认值是 false;为什么再次将其设置为 false 会对性能产生如此大的影响?
static partial class Program
{
static void Main(string[] args)
{
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
var qName = @".\private$\deep_den";
if (!MessageQueue.Exists(qName))
{
var q = MessageQueue.Create(qName);
}
var single = new MessageQueue(qName);
single.UseJournalQueue = false;
single.DefaultPropertiesToSend.AttachSenderId = false;
single.DefaultPropertiesToSend.Recoverable = true;
single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
var count = 500;
var watch = new Stopwatch();
watch.Start();
for (int i = 0; i < count; i++)
{
var data = new Data { Name = string.Format("name_{0}", i), Value = i };
single.Send(new Message(data));
}
watch.Stop();
Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);
var enu = single.GetMessageEnumerator2();
watch.Reset();
watch.Start();
while (Interlocked.Read(ref __counter) < count)
{
var list = new List<Message>();
var peekCount = 50;
while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
{
try
{
list.Add(enu.Current);
peekCount--;
}
catch (Exception ex2)
{
Trace.WriteLine(ex2.ToString());
break;
}
}
var tlist = new List<Task>();
foreach (var message in list)
{
var stupid_closure = message;
var t = new Task(() =>
{
using (var singleLocal = new MessageQueue(qName))
{
singleLocal.UseJournalQueue = false;
singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
singleLocal.DefaultPropertiesToSend.Recoverable = true;
singleLocal.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
try
{
// processing the message and insert it into database
// workflow completed here, so we can safely remove the message from queue
var localM = singleLocal.ReceiveById(stupid_closure.Id);
var localSample = (Data)localM.Body;
Interlocked.Increment(ref __counter);
Console.WriteLine(Interlocked.Read(ref __counter));
}
catch (MessageQueueException ex) { if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); }
catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); }
}
}, TaskCreationOptions.PreferFairness);
tlist.Add(t);
}
foreach (var t in tlist) t.Start();
Task.WaitAll(tlist.ToArray());
list.Clear();
}
watch.Stop();
Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);
Console.WriteLine("press any key to continue ...");
Console.ReadKey();
}
static long __counter = 0;
}
最佳答案
卡维, 如果启用了消息队列对象的日志设置,则您正在使用的 MessageQueue 对象的构造函数将 UseJournalQueue 属性设置为 true。它以某种方式认为 .\private$\deep_den 的日志设置已启用。编辑 - 您使用的是预先创建的队列吗?
关于c# - 使用 ReceiveById 时糟糕的 MSMQ 性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14167049/