c# - 在 C# 中实现线程以进行快速、批量和连续读取的最佳实践?

标签 c# multithreading io usb hid

在 .NET 4.0 中应如何处理在 C# 中从设备读取批量数据?具体来说,我需要从 USB HID 设备中快速读取,该设备会发出超过 26 个数据包的报告,其中必须保留顺序。

我试过在 BackgroundWorker 线程中执行此操作。它一次从设备读取一个数据包,并在读取更多数据之前对其进行处理。这提供了相当不错的响应时间,但它很容易丢失数据包,并且单个数据包读取的开销成本加起来。

while (!( sender as BackgroundWorker ).CancellationPending) {
       //read a single packet
       //check for header or footer
       //process packet data
    }
}

在 C# 中读取此类设备的最佳做法是什么?


背景:

我的 USB HID 设备不断报告大量数据。数据分为 26 个数据包,我必须保留顺序。不幸的是,该设备只标记每个报告中的第一个和最后一个数据包,因此我需要能够捕获其间的所有其他数据包。

最佳答案

对于 .Net 4,您可以使用 BlockingCollection提供可供生产者和消费者使用的线程安全队列。 BlockingCollection.GetConsumingEnumerable()方法提供了一个枚举器,当使用 CompleteAdding() 将队列标记为已完成时,该枚举器会自动终止。并且是空的。

这是一些示例代码。在此示例中,有效负载是一个整数数组,但您当然可以使用所需的任何数据类型。

请注意,对于您的特定示例,您可以使用 the overload of GetConsumingEnumerable()它接受类型为 CancellationToken 的参数.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            var queue = new BlockingCollection<int[]>();

            Task.Factory.StartNew(() => produce(queue));

            consume(queue);

            Console.WriteLine("Finished.");
        }

        private static void consume(BlockingCollection<int[]> queue)
        {
            foreach (var item in queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming " + item[0]);
                Thread.Sleep(25);
            }
        }

        private static void produce(BlockingCollection<int[]> queue)
        {
            for (int i = 0; i < 1000; ++i)
            {
                Console.WriteLine("Producing " + i);
                var payload = new int[100];
                payload[0] = i;
                queue.Add(payload);
                Thread.Sleep(20);
            }

            queue.CompleteAdding();
        }
    }
}

对于 .Net 4.5 及更高版本,您可以使用 Microsoft's Task Parallel Library 中的高级类,它具有丰富的功能(乍一看可能有点令人生畏)。

下面是使用 TPL DataFlow 的相同示例:

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            var queue = new BufferBlock<int[]>();

            Task.Factory.StartNew(() => produce(queue));
            consume(queue).Wait();

            Console.WriteLine("Finished.");
        }

        private static async Task consume(BufferBlock<int[]> queue)
        {
            while (await queue.OutputAvailableAsync())
            {
                var payload = await queue.ReceiveAsync();
                Console.WriteLine("Consuming " + payload[0]);
                await Task.Delay(25);
            }
        }

        private static void produce(BufferBlock<int[]> queue)
        {
            for (int i = 0; i < 1000; ++i)
            {
                Console.WriteLine("Producing " + i);
                var payload = new int[100];
                payload[0] = i;
                queue.Post(payload);
                Thread.Sleep(20);
            }

            queue.Complete();
        }
    }
}

关于c# - 在 C# 中实现线程以进行快速、批量和连续读取的最佳实践?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28361718/

相关文章:

c - C99 中 'unlocked' I/O 函数的等价物是什么?

performance - VBA I/O 性能

java - 使用 Java 截断服务器中的文本

c# - 与 Web API 共享的 MVC 身份验证

c# - 自动映射器性能

c++ - decaf 线程 vs boost 线程 vs omnithreads

python - 一个线程安全的 memoize 装饰器

java - 10 个线程同时写入单个哈希

c# - 方法的最大参数数量是多少(WebApi?)

c# - 如何在具有多棵树的 AD 林中的全局目录中搜索用户