c# - 异步取自阻塞集合

标签 c# asynchronous task-parallel-library

我正在使用 BlockingCollection实现生产者/消费者模式。我有一个异步循环,它用要处理的数据填充集合,然后客户端可以在稍后的时间访问这些数据。数据包很少到达,我希望在不使用阻塞调用的情况下完成轮询。

本质上,我正在寻找类似 BeginTake 的东西和 EndTake阻塞集合中不存在,因此我可以在回调中使用内部线程池。它不一定是 BlockingCollection以任何方式。任何满足我需要的东西都会很棒。

这就是我现在所拥有的。 _bufferedPacketsBlockingCollection<byte[]> :

public byte[] Read(int timeout)
{
    byte[] result;
    if (_bufferedPackets.IsCompleted)
    {
        throw new Exception("Out of packets");
    }
    _bufferedPackets.TryTake(out result, timeout);      
    return result;
}

我希望它是这样的,用伪代码:

public void Read(int timeout)
{
    _bufferedPackets.BeginTake(result =>
        {
            var bytes = _bufferedPackets.EndTake(result);
            // Process the bytes, or the resuting timeout
        }, timeout, _bufferedPackets);
}

我对此有哪些选择?我不想将任何线程置于等待状态,因为它还有很多其他 IO 内容需要处理,而且我很快就会用完线程。

更新:我重写了相关代码以不同方式使用异步过程,本质上是根据超时限制内是否存在等待请求来交换回调。这工作正常,但如果有一种方法可以做到这一点而不诉诸计时器和交换 lambda,那仍然会很棒,因为这可能会导致竞争条件并且难以编写(和理解)。我也用自己的异步队列实现解决了这个问题,但如果有更标准且经过良好测试的选项,它仍然很棒。

最佳答案

我可能误解了你的情况,但你不能使用非阻塞集合吗?

我创建了这个例子来说明:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTakeFromBlockingCollection
{
    class Program
    {
        static void Main(string[] args)
        {
            var queue = new ConcurrentQueue<string>();

            var producer1 = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 10; i += 1)
                {
                    queue.Enqueue("=======");
                    Thread.Sleep(10);
                }
            });

            var producer2 = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 10; i += 1)
                {
                    queue.Enqueue("*******");
                    Thread.Sleep(3);
                }
            });

            CreateConsumerTask("One  ", 3, queue);
            CreateConsumerTask("Two  ", 4, queue);
            CreateConsumerTask("Three", 7, queue);

            producer1.Wait();
            producer2.Wait();
            Console.WriteLine("  Producers Finished");
            Console.ReadLine();
        }

        static void CreateConsumerTask(string taskName, int sleepTime, ConcurrentQueue<string> queue)
        {
            Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    string result;
                    if (queue.TryDequeue(out result))
                    {
                        Console.WriteLine("  {0} consumed {1}", taskName, result);
                    }
                    Thread.Sleep(sleepTime);
                }
            });
        }
    }
}

这是程序的输出

enter image description here

我相信 BlockingCollection 旨在包装并发集合并提供一种允许多个消费者阻塞的机制;等待生产者。这种用法似乎与您的要求相反。

我找到了这个 article about the BlockingCollection class有帮助。

关于c# - 异步取自阻塞集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11949424/

相关文章:

c# - 异步/等待性能

c# - 使用多个任务/消费者使用阻塞集合

.net - 我需要处理任务吗?

c# - 我无法从 C# 访问 unc 路径。获取访问权限被拒绝

c# - 从字符串中提取颜色和尺寸

javascript - 异步加载 json 作为对象属性,然后在事件中访问它

c# - TPL 如何正确取消任务

c# - 将非托管/非托管成员添加到托管类

c# - 我如何创建特定编号的 ListView。 WPF 中的列数?

asynchronous - 2015 年需要/AMD 和异步资源