我正在编写一个 Windows 服务,它将启动多个工作线程,这些工作线程将监听 Amazon SQS 队列并处理消息。将有大约 20 个线程监听 10 个队列。
线程必须始终运行,这就是为什么我倾向于为工作循环实际使用实际线程而不是线程池线程。
这是一个顶级实现。 Windows 服务将启动多个工作线程,每个工作线程都将监听其队列并处理消息。
protected override void OnStart(string[] args)
{
for (int i = 0; i < _workers; i++)
{
new Thread(RunWorker).Start();
}
}
这里是工作的实现
public async void RunWorker()
{
while(true)
{
// .. get message from amazon sqs sync.. about 20ms
var message = sqsClient.ReceiveMessage();
try
{
await PerformWebRequestAsync(message);
await InsertIntoDbAsync(message);
}
catch(SomeExeception)
{
// ... log
//continue to retry
continue;
}
sqsClient.DeleteMessage();
}
}
我知道我可以使用 Task.Run 执行相同的操作并在线程池线程上执行它而不是启动单独的线程,但我看不出这样做的原因,因为每个线程都会一直运行。
您认为此实现有任何问题吗?让线程始终以这种方式运行有多可靠?我该怎么做才能确保每个线程始终运行?
最佳答案
您现有解决方案的一个问题是您以一种即发即弃的方式调用您的 RunWorker
,尽管是在一个新线程上(即 new Thread(RunWorker).Start( )
).
RunWorker
是一个 async
方法,当执行点命中第一个 await
(即 await PerformWebRequestAsync(消息)
)。如果 PerformWebRequestAsync
返回挂起的任务,RunWorker
返回并且您刚刚启动的新线程终止。
我认为您在这里根本不需要新线程,只需使用 AmazonSQSClient.ReceiveMessageAsync
和 await
其结果即可。另一件事是你不应该使用 async void
方法,除非你真的不关心跟踪异步任务的状态。请改用 async Task
。
您的代码可能如下所示:
List<Task> _workers = new List<Task>();
CancellationTokenSource _cts = new CancellationTokenSource();
protected override void OnStart(string[] args)
{
for (int i = 0; i < _MAX_WORKERS; i++)
{
_workers.Add(RunWorkerAsync(_cts.Token));
}
}
public async Task RunWorkerAsync(CancellationToken token)
{
while(true)
{
token.ThrowIfCancellationRequested();
// .. get message from amazon sqs sync.. about 20ms
var message = await sqsClient.ReceiveMessageAsync().ConfigureAwait(false);
try
{
await PerformWebRequestAsync(message);
await InsertIntoDbAsync(message);
}
catch(SomeExeception)
{
// ... log
//continue to retry
continue;
}
sqsClient.DeleteMessage();
}
}
现在,要停止所有挂起的工作程序,您可以简单地执行此操作(从主“请求调度程序”线程):
_cts.Cancel();
try
{
Task.WaitAll(_workers.ToArray());
}
catch (AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
请注意,ConfigureAwait(false)
对于 Windows 服务是可选的,因为默认情况下初始线程上没有同步上下文。但是,我会保持这种方式以使代码独立于执行环境(对于存在同步上下文的情况)。
最后,如果由于某种原因您不能使用 ReceiveMessageAsync
,或者您需要调用另一个阻塞 API,或者只是在 开始 做一些 CPU 密集型工作RunWorkerAsync
,只需用 Task.Run
包装它(而不是包装整个 RunWorkerAsync
):
var message = await Task.Run(
() => sqsClient.ReceiveMessage()).ConfigureAwait(false);
关于c# - 始终在 Windows 服务上运行线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25001764/