作为库的一部分,使用 this code ,有一个 SimpleQueue
类来解耦生产者和消费者:
private class SimpleQueue
{
private readonly Func<ResolvedEvent, CancellationToken, Task> _onResolvedEvent;
private readonly CancellationToken _token;
private readonly ConcurrentQueue<ResolvedEvent> _events;
private readonly InterlockedBoolean _isPushing;
private static readonly ILog s_logger;
static SimpleQueue()
{
s_logger = LogProvider.For<SimpleQueue>();
}
public SimpleQueue(Func<ResolvedEvent, CancellationToken, Task> onResolvedEvent, CancellationToken token)
{
_onResolvedEvent = onResolvedEvent;
_token = token;
_events = new ConcurrentQueue<ResolvedEvent>();
_isPushing = new InterlockedBoolean();
}
public void Enqueue(ResolvedEvent resolvedEvent)
{
_events.Enqueue(resolvedEvent);
Push();
}
private void Push()
{
if(_isPushing.CompareExchange(true, false))
{
return;
}
Task.Run(async () =>
{
ResolvedEvent resolvedEvent;
while (!_token.IsCancellationRequested && _events.TryDequeue(out resolvedEvent))
{
try
{
await _onResolvedEvent(resolvedEvent, _token);
}
catch(Exception ex)
{
s_logger.ErrorException(ex.Message, ex);
}
}
_isPushing.Set(false);
}, _token);
}
}
我想我可以在这里看到一个问题,如果:
- 在任务线程中,调用
events.TryDequeue(outresolvedEvent))
返回false
- 然后上下文切换到另一个线程
- 在另一个线程事件排队时,调用
Push()
但立即返回,因为_isPushing
为true
- 上下文切换回任务线程,
_isPushing
设置为false
并且任务退出
在这种情况下,队列中将存在 on 事件,直到下一次入队并在 Push()
中循环以出队时才会调度该事件。如果是这样,我想我不喜欢这个。
所以我重写为使用 TPL BlockingQueue:
public class SimpleQueue<T>
{
readonly BufferBlock<T> _queue = new BufferBlock<T>();
public SimpleQueue(Func<T, CancellationToken, Task> onItemQueued, CancellationToken token)
{
Task.Run(async () =>
{
while (true)
{
try
{
var item = await _queue.ReceiveAsync(token);
if (token.IsCancellationRequested)
return;
await onItemQueued(item, token);
}
catch (Exception ex)
{
// log
}
}
}, token);
}
public void Enqueue(T item)
{
_queue.Post(item);
}
}
class Program
{
private readonly static SimpleQueue<string> Queue;
private readonly static CancellationToken CancellationToken = new CancellationToken();
static async Task OnEvent(string item, CancellationToken cancellationToken)
{
await Task.Run(() =>
{
Console.WriteLine("Rx from remote {0}", item);
}, cancellationToken);
}
static Program()
{
Queue = new SimpleQueue<string>(OnEvent, CancellationToken);
}
static void Main(string[] args)
{
// wire up code to call ExternalReceive from 3rd party lib
DevLinkImports.DLRegisterType2CallDeltas(0,CallEvent);
Console.ReadLine();
}
// this is called by 3rd party dll on demand
static void CallEvent(uint pbxh, string info)
{
// we must dispatch and return within 50ms or 3rd party lib will go ape
Queue.Enqueue(info);
}
问题:
出于学习目的,我对原始 SimpleQueue 的问题的看法是否正确,并且项目可能会根据时间而保留?
如果没有“过早优化”,我觉得唯一明智的做法是问,每次调用
static async Task OnEvent(string item, CancellationToken CancellationToken) 启动一个新线程的开销是多少
?通过重写,我在 sleep 时不会保持线程打开,但实际上使用此异步调用有什么好处,或者只是启动单个线程并使用
BlockingCollection
和出队时阻塞?我不想为了牺牲启动新线程所需的时间而保存一个线程。
最佳答案
- For learning purposes am I correct in seeing the issue with the original SimpleQueue and items could be left depending on timing?
无法肯定地说,因为 InterlockedBoolean
的实现这里没有提供。您的担忧似乎是有道理的,但在尝试做出明确的声明之前,我想看看实际的代码。
- Without "premature optimization" i feel it only sensible to ask, what is the overhead of spinning up a new thread for each call to static async Task OnEvent(string item, CancellationToken cancellationToken)?
创建新线程的开销非常大。但是你的OnEvent()
方法实际上可能会也可能不会这样做。您正在创建一个新任务,然后调度程序将决定如何处理它。如果线程池包含可用于执行它的可用线程和/或调度程序决定它可以等待现有但繁忙的线程变得可用,则不会创建新线程。
- With the rewrite I am not holding threads open when sleeping, but in reality is there any benefit of using this async call or just instead spin up a single thread and use a BlockingCollection and blocking on dequeue? I don't want to save one thread for sacrificing time taken to spin up new threads.
在程序中添加一个单个线程来为队列提供服务并不是那么糟糕。您只需创建它一次,因此其开销是微不足道的。它确实占用了堆栈一兆字节(默认情况下),但这通常也不会成为问题。
另一方面,同样调用Task.Run()
由于使用了线程池,也不太可能导致显着的开销。
所以对我来说,这取决于美观和可维护性。
我会指出使用BlockingCollection<T>
的问题与 BufferBlock<T>
与您实现 OnEvent()
的问题有些不同。前者涉及底层队列的实现,而后者涉及事件实际上出队时发生的情况。即使您使用 BlockingCollection<T>
,如果你不改变OnEvent()
您仍然要为每个事件开始一个新任务。相反,你没有理由不能制作 OnEvent()
同步运行事件处理,甚至使用 BufferBlock<T>
.
队列代码显然期望以异步方式处理事件,但事件并非必须如此。这取决于队列的客户端。
关于c# - 留在队列中的项目的替代方案(线程消费者生产者),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32530552/