c# - 程序在访问消息队列时挂起

标签 c# .net multithreading msmq enterprise-library

我有一组线程,它们从私有(private)消息队列中检索消息,将它们反序列化为日志条目对象,并将日志条目对象的属性存储在 SQL Server 数据库表 logentry 中。

这是我创建和启动线程的代码。

        try
        {
            for (int i = 0; i < threads.Length; i++)
            {
                threads[i] = new Thread(new ThreadStart(this.logEntriesToDatabase));
                threads[i].Start();
            }
        }
        catch (ThreadStateException ex)
        {
            MessageBox.Show(ex.Message, "Error", MessageBoxButtons.OK,MessageBoxIcon.Error);
            return;
        }
        catch (OutOfMemoryException ex)
        {
            MessageBox.Show("Not Enough Memory Please Close Other Applications To Continue", "Error", MessageBoxButtons.OK, MessageBoxIcon.Error);
            return;
        }

每个Thread执行一个函数logentriestodatabase()

while(true)
        {
            #region Retrieves Message from Message Queue and Deserialized it to a Log Entry Object.

                #region Sleep Time for Current Thread
                    Thread.Sleep(180);
                #endregion
                #region Check to See Whether Queue Is Empty. If so go back to start of while loop
                    if (q1.GetAllMessages().Length == 0)
                    {
                        continue;
                    }
                #endregion
                #region Message retrieval and Deserialization Code
                    System.Messaging.Message m = this.q1.Receive();
                    m.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
                    LogEntry lg = BinaryLogFormatter.Deserialize(m.Body.ToString());
                #endregion

            #endregion

            #region Insert Log Entry Into Database

                #region Define a new SQL Connection with username and password specified in App.Config, an SQL Transaction and database queuries

                    SqlConnection conn = new SqlConnection(ConfigurationManager.ConnectionStrings["LogReader"].ConnectionString);
                    SqlTransaction transaction;
                    string query_insert_into_logentry = "INSERT INTO logentry" + "(message, priority, processname, severity, accountid, ipaddress, servername, servertype, timestamp)" + "VALUES ('" + lg.Message + "'," + lg.Priority + ",'" + lg.AppDomainName + "','" + lg.Severity.ToString() + "','" + lg.ExtendedProperties["AccountID"].ToString() + "','" + lg.ExtendedProperties["IpAddress"].ToString() + "','" + lg.ExtendedProperties["ServerName"].ToString() + "','" + lg.ExtendedProperties["ServerType"].ToString() + "','" + lg.TimeStamp.ToString() + "')";
                    string query_insert_into_category = "INSERT INTO category (category) VALUES ('" + lg.Categories.First().ToString() + "')";

                #endregion
                #region Begin and Terminates Transaction and Closes the SQL Connection Catches any SQL Exception Thrown and Displays Them

                    try
                    {
                        conn.Open();
                        transaction = conn.BeginTransaction();
                        new SqlCommand(query_insert_into_logentry, conn, transaction).ExecuteNonQuery();
                        new SqlCommand(query_insert_into_category, conn, transaction).ExecuteNonQuery();
                        transaction.Commit();
                        conn.Close();
                    }
                    catch (SqlException ex)
                    {
                        MessageBox.Show(ex.Message);
                        return;
                    }

                #endregion
            #endregion
        }

现在,每当我运行这个程序时,当消息队列变空时,程序就会挂起。 我似乎无法弄清楚为什么。我试图为 q1.Receive() 函数提供一个 TimeSpan,但这没有用。我用 180 毫秒的时间调用了 sleep 方法,但它仍然不起作用。可能是因为 q1.Receive 方法在遇到空队列时将当前线程发送到阻塞状态。

请帮助我接受想法。

最佳答案

您可以使用 MessageQueue.BeginReceive 异步读取消息,而不是在紧密循环中同步读取消息并阻塞多个线程。/结束接收。问了一个类似的问题here .

如果您使用的是 .NET 4.0 或更高版本,则可以从 BeginReceive/EndReceive 对创建任务并使用 ContinueWith 处理消息,而无需创建新线程。在 .NET 4.5 中,您可以使用 asyc/await 关键字来简化处理,例如:

private async Task<Message> MyReceiveAsync()
{
    MessageQueue queue=new MessageQueue();
    ...
    var message=await Task.Factory.FromAsync<Message>(
                       queue.BeginReceive(),
                       queue.EndReceive);

    return message;
}

public async Task LogToDB()
{
    while(true)
    {
       var message=await MyReceiveAsync();
       SaveMessage(message);
    }
}

即使 LogToDB 使用 `while(true),循环也会异步执行。

要结束循环,您可以将 CancellationToken 传递给 LogToDBend processing cooperatively :

public async Task LogToDB(CancellationToken token)
{
    while(!token.IsCancellationRequested)
    {
       var message=await MyReceiveAsync();
       SaveMessage(message);
    }
}

这样您就可以避免创建多个线程和计时器。

关于c# - 程序在访问消息队列时挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20856028/

相关文章:

.net - 删除 ThreadContext 属性

java - 在这种情况下,volatile hashmap 是否足够?

python - cron 作业是否在谷歌应用引擎中同时运行?

c# - 如何获取 PPID

c# - 使用最小起订量模拟 Linq `Any` 谓词

c# - 返回零的随机数生成器

C多线程性能问题

c# - 扫描具有特定接口(interface)的 .NET 程序集的 DLL - 一些 DLL 抛出 R6034!

c++ - 将托管 C# DLL 包含到非托管 C++ DLL 中——全部在一个文件中

c# - 如何声明一个未启动的任务,将等待另一个任务?