c# - 留在队列中的项目的替代方案(线程消费者生产者)

标签 c# multithreading queue async-await task-parallel-library

作为库的一部分,使用 this code ,有一个 SimpleQueue 类来解耦生产者和消费者:

private class SimpleQueue
{
    private readonly Func<ResolvedEvent, CancellationToken, Task> _onResolvedEvent;
    private readonly CancellationToken _token;
    private readonly ConcurrentQueue<ResolvedEvent> _events;
    private readonly InterlockedBoolean _isPushing;
    private static readonly ILog s_logger;

    static SimpleQueue()
    {
        s_logger = LogProvider.For<SimpleQueue>();
    }

    public SimpleQueue(Func<ResolvedEvent, CancellationToken, Task> onResolvedEvent, CancellationToken token)
    {
        _onResolvedEvent = onResolvedEvent;
        _token = token;
        _events = new ConcurrentQueue<ResolvedEvent>();
        _isPushing = new InterlockedBoolean();
    }

    public void Enqueue(ResolvedEvent resolvedEvent)
    {
        _events.Enqueue(resolvedEvent);
        Push();
    }

    private void Push()
    {
        if(_isPushing.CompareExchange(true, false))
        {
            return;
        }
        Task.Run(async () =>
        {
            ResolvedEvent resolvedEvent;
            while (!_token.IsCancellationRequested && _events.TryDequeue(out resolvedEvent))
            {
                try
                {
                    await _onResolvedEvent(resolvedEvent, _token);
                }
                catch(Exception ex)
                {
                    s_logger.ErrorException(ex.Message, ex);
                }
            }
            _isPushing.Set(false);
        }, _token);
    }
}

我想我可以在这里看到一个问题,如果:

  1. 在任务线程中,调用 events.TryDequeue(outresolvedEvent)) 返回 false
  2. 然后上下文切换到另一个线程
  3. 在另一个线程事件排队时,调用 Push() 但立即返回,因为 _isPushingtrue
  4. 上下文切换回任务线程,_isPushing 设置为 false 并且任务退出

在这种情况下,队列中将存在 on 事件,直到下一次入队并在 Push() 中循环以出队时才会调度该事件。如果是这样,我想我不喜欢这个。

所以我重写为使用 TPL BlockingQueue:

public class SimpleQueue<T>
{
    readonly BufferBlock<T> _queue = new BufferBlock<T>();

    public SimpleQueue(Func<T, CancellationToken, Task> onItemQueued, CancellationToken token)
    {
        Task.Run(async () =>
        {
            while (true)
            {
                try
                {
                    var item = await _queue.ReceiveAsync(token);
                    if (token.IsCancellationRequested)
                        return;
                    await onItemQueued(item, token);
                }
                catch (Exception ex)
                {
                    // log
                }
            }
        }, token);
    }

    public void Enqueue(T item)
    {
        _queue.Post(item);
    }
}

class Program
{
    private readonly static SimpleQueue<string> Queue;
    private readonly static CancellationToken CancellationToken = new CancellationToken();

    static async Task OnEvent(string item, CancellationToken cancellationToken)
    {
        await Task.Run(() =>
        {
            Console.WriteLine("Rx from remote {0}", item);

        }, cancellationToken);
    }

    static Program()
    {
        Queue = new SimpleQueue<string>(OnEvent, CancellationToken);
    }

    static void Main(string[] args)
    {
        // wire up code to call ExternalReceive from 3rd party lib
        DevLinkImports.DLRegisterType2CallDeltas(0,CallEvent);

        Console.ReadLine();
    }

    // this is called by 3rd party dll on demand
    static void CallEvent(uint pbxh, string info)
    {
        // we must dispatch and return within 50ms or 3rd party lib will go ape
        Queue.Enqueue(info);  
    }

问题:

  1. 出于学习目的,我对原始 SimpleQueue 的问题的看法是否正确,并且项目可能会根据时间而保留?

  2. 如果没有“过早优化”,我觉得唯一明智的做法是问,每次调用static async Task OnEvent(string item, CancellationToken CancellationToken) 启动一个新线程的开销是多少?

  3. 通过重写,我在 sleep 时不会保持线程打开,但实际上使用此异步调用有什么好处,或者只是启动单个线程并使用 BlockingCollection 和出队时阻塞?我不想为了牺牲启动新线程所需的时间而保存一个线程。

最佳答案

  1. For learning purposes am I correct in seeing the issue with the original SimpleQueue and items could be left depending on timing?

无法肯定地说,因为 InterlockedBoolean 的实现这里没有提供。您的担忧似乎是有道理的,但在尝试做出明确的声明之前,我想看看实际的代码。

  1. Without "premature optimization" i feel it only sensible to ask, what is the overhead of spinning up a new thread for each call to static async Task OnEvent(string item, CancellationToken cancellationToken)?

创建线程的开销非常大。但是你的OnEvent()方法实际上可能会也可能不会这样做。您正在创建一个新任务,然后调度程序将决定如何处理它。如果线程池包含可用于执行它的可用线程和/或调度程序决定它可以等待现有但繁忙的线程变得可用,则不会创建新线程。

  1. With the rewrite I am not holding threads open when sleeping, but in reality is there any benefit of using this async call or just instead spin up a single thread and use a BlockingCollection and blocking on dequeue? I don't want to save one thread for sacrificing time taken to spin up new threads.

在程序中添加一个单个线程来为队列提供服务并不是那么糟糕。您只需创建它一次,因此其开销是微不足道的。它确实占用了堆栈一兆字节(默认情况下),但这通常也不会成为问题。

另一方面,同样调用Task.Run()由于使用了线程池,也不太可能导致显着的开销。

所以对我来说,这取决于美观和可维护性。

我会指出使用BlockingCollection<T>的问题与 BufferBlock<T>与您实现 OnEvent() 的问题有些不同。前者涉及底层队列的实现,而后者涉及事件实际上出队时发生的情况。即使您使用 BlockingCollection<T> ,如果你不改变OnEvent()您仍然要为每个事件开始一个新任务。相反,你没有理由不能制作 OnEvent()同步运行事件处理,甚至使用 BufferBlock<T> .

队列代码显然期望以异步方式处理事件,但事件并非必须如此。这取决于队列的客户端。

关于c# - 留在队列中的项目的替代方案(线程消费者生产者),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32530552/

相关文章:

c# - 增加 Parallel.For 中运行的线程数

定期调用已创建线程的函数(手动调度)

java - 如何在java中对硬盘驱动器的文件读/写进行排队?

python - 在 Python 中初始化队列

c# - 在 MVC 4 中传递对象

c# - 从 MS-Word ApplicationClass 获取 PID?

c++ - 我如何在优先队列中找到值(value)?

java - 模拟快餐店的排队和请求服务的程序

C#:在网格中显示数据的最佳方式?

c# - Topshelf开始陷入无限循环