我有一组线程,它们从私有(private)消息队列中检索消息,将它们反序列化为日志条目对象,并将日志条目对象的属性存储在 SQL Server 数据库表 logentry 中。
这是我创建和启动线程的代码。
try
{
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(new ThreadStart(this.logEntriesToDatabase));
threads[i].Start();
}
}
catch (ThreadStateException ex)
{
MessageBox.Show(ex.Message, "Error", MessageBoxButtons.OK,MessageBoxIcon.Error);
return;
}
catch (OutOfMemoryException ex)
{
MessageBox.Show("Not Enough Memory Please Close Other Applications To Continue", "Error", MessageBoxButtons.OK, MessageBoxIcon.Error);
return;
}
每个Thread执行一个函数logentriestodatabase()
while(true)
{
#region Retrieves Message from Message Queue and Deserialized it to a Log Entry Object.
#region Sleep Time for Current Thread
Thread.Sleep(180);
#endregion
#region Check to See Whether Queue Is Empty. If so go back to start of while loop
if (q1.GetAllMessages().Length == 0)
{
continue;
}
#endregion
#region Message retrieval and Deserialization Code
System.Messaging.Message m = this.q1.Receive();
m.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
LogEntry lg = BinaryLogFormatter.Deserialize(m.Body.ToString());
#endregion
#endregion
#region Insert Log Entry Into Database
#region Define a new SQL Connection with username and password specified in App.Config, an SQL Transaction and database queuries
SqlConnection conn = new SqlConnection(ConfigurationManager.ConnectionStrings["LogReader"].ConnectionString);
SqlTransaction transaction;
string query_insert_into_logentry = "INSERT INTO logentry" + "(message, priority, processname, severity, accountid, ipaddress, servername, servertype, timestamp)" + "VALUES ('" + lg.Message + "'," + lg.Priority + ",'" + lg.AppDomainName + "','" + lg.Severity.ToString() + "','" + lg.ExtendedProperties["AccountID"].ToString() + "','" + lg.ExtendedProperties["IpAddress"].ToString() + "','" + lg.ExtendedProperties["ServerName"].ToString() + "','" + lg.ExtendedProperties["ServerType"].ToString() + "','" + lg.TimeStamp.ToString() + "')";
string query_insert_into_category = "INSERT INTO category (category) VALUES ('" + lg.Categories.First().ToString() + "')";
#endregion
#region Begin and Terminates Transaction and Closes the SQL Connection Catches any SQL Exception Thrown and Displays Them
try
{
conn.Open();
transaction = conn.BeginTransaction();
new SqlCommand(query_insert_into_logentry, conn, transaction).ExecuteNonQuery();
new SqlCommand(query_insert_into_category, conn, transaction).ExecuteNonQuery();
transaction.Commit();
conn.Close();
}
catch (SqlException ex)
{
MessageBox.Show(ex.Message);
return;
}
#endregion
#endregion
}
现在,每当我运行这个程序时,当消息队列变空时,程序就会挂起。 我似乎无法弄清楚为什么。我试图为 q1.Receive() 函数提供一个 TimeSpan,但这没有用。我用 180 毫秒的时间调用了 sleep 方法,但它仍然不起作用。可能是因为 q1.Receive 方法在遇到空队列时将当前线程发送到阻塞状态。
请帮助我接受想法。
最佳答案
您可以使用 MessageQueue.BeginReceive 异步读取消息,而不是在紧密循环中同步读取消息并阻塞多个线程。/结束接收。问了一个类似的问题here .
如果您使用的是 .NET 4.0 或更高版本,则可以从 BeginReceive/EndReceive 对创建任务并使用 ContinueWith 处理消息,而无需创建新线程。在 .NET 4.5 中,您可以使用 asyc/await
关键字来简化处理,例如:
private async Task<Message> MyReceiveAsync()
{
MessageQueue queue=new MessageQueue();
...
var message=await Task.Factory.FromAsync<Message>(
queue.BeginReceive(),
queue.EndReceive);
return message;
}
public async Task LogToDB()
{
while(true)
{
var message=await MyReceiveAsync();
SaveMessage(message);
}
}
即使 LogToDB
使用 `while(true),循环也会异步执行。
要结束循环,您可以将 CancellationToken 传递给 LogToDB
和 end processing cooperatively :
public async Task LogToDB(CancellationToken token)
{
while(!token.IsCancellationRequested)
{
var message=await MyReceiveAsync();
SaveMessage(message);
}
}
这样您就可以避免创建多个线程和计时器。
关于c# - 程序在访问消息队列时挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20856028/