c# - 重 I/O 操作中的 Parallel.ForEach 与异步 For 循环

标签 c# multithreading async-await task-parallel-library

我想比较两个理论场景。为了问题的目的,我简化了案例。但基本上它是您典型的生产者消费者场景。 (我关注的是消费者)。

我有一个很大的Queue<string> dataQueue我必须将其传输给多个客户端。

那么让我们从更简单的情况开始:

 class SequentialBlockingCase
 {
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string>();

    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                foreach (var destination in _destinations)
                {
                    SendDataToDestination(destination, data);
                }
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }

    private static void SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post, instead simulate the send
        Thread.Sleep(200);
    }
}
}

现在这个设置工作得很好。它坐在那里并轮询 Queue当有数据要发送时,它会将其发送到所有目的地。

问题:

  • 如果其中一个目的地不可用或速度慢,则会影响所有其他目的地。
  • 在并行执行的情况下不使用多线程。
  • 每次传输到每个目的地的 block 。

这是我的第二次尝试:

 class ParalleBlockingCase
{
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string>();

    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                Parallel.ForEach(_destinations, destination =>
                {
                    SendDataToDestination(destination, data);
                });
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }

    private static void SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post
        Thread.Sleep(200);
    }
}

如果 1 个目的地速度慢或不可用,此修订至少不会影响其他目的地。

但是这个方法仍然是阻塞的,我不确定 Parallel.ForEach使用线程池。我的理解是它将创建 X 个线程/任务并一次执行 4 个(4 个核心 cpu)。但它必须在任务 5 开始之前完成任务 1 的芬兰语。

因此我的第三个选择:

class ParalleAsyncCase
{
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string> { };

    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                List<Task> tasks = new List<Task>();
                foreach (var destination in _destinations)
                {
                    var task = SendDataToDestination(destination, data);
                    task.Start();
                    tasks.Add(task);
                }

                //Wait for all tasks to complete
                Task.WaitAll(tasks.ToArray());
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }

    private static async Task SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post
        await Task.Delay(200);
    }
}

现在根据我的理解这个选项,仍然会阻塞在 Task.WaitAll(tasks.ToArray()); 的主线程上这很好,因为我不希望它因为创建任务的速度快于执行速度而跑掉。

但是并行执行的任务应该使用ThreadPool ,并且所有 X 个任务应该立即开始执行,而不是阻塞或按顺序执行。 (线程池将在它们变为事件状态或 awaiting 时在它们之间交换)

现在我的问题。

选项 3 是否比选项 2 有任何性能优势。

特别是在更高性能的服务器端场景中。在我现在正在开发的特定软件中。上面会有我的简单用例的多个实例。即多个消费者。

我对这两种解决方案的理论差异和优缺点很感兴趣,如果有的话,可能还有更好的第四种选择。

最佳答案

Parallel.ForEach将使用线程池。异步代码不会,因为it doesn't need any threads at all (链接是我的博客)。

正如 Mrinal 指出的那样,如果您的代码受 CPU 限制,则并行是合适的;如果您有 I/O 绑定(bind)代码,异步是合适的。在这种情况下,HTTP POST 显然是 I/O,因此理想的消费代码应该是异步的。

maybe even a better 4th option if there is one.

我建议让您的消费者完全异步。为此,您需要使用异步兼容的生产者/消费者队列。有一个相当高级的( BufferBlock<T> ) in the TPL Dataflow library ,和一个相当简单的 ( AsyncProducerConsumerQueue<T> ) in my AsyncEx library .

使用它们中的任何一个,您都可以创建一个完全异步的消费者:

List<Task> tasks = new List<Task>();
foreach (var destination in _destinations)
{
  var task = SendDataToDestination(destination, data);
  tasks.Add(task);
}
await Task.WhenAll(tasks);

或者更简单地说:

var tasks = _destinations
    .Select(destination => SendDataToDestination(destination, data));
await Task.WhenAll(tasks);

关于c# - 重 I/O 操作中的 Parallel.ForEach 与异步 For 循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39075845/

相关文章:

c# - 在 Razor 页面中访问异步属性

c# - 从C#mongodb驱动程序执行mongodb shell查询(最新)

c# - 在 Caliburn Micro 中的 ViewModel 之间切换

java - Activity 的实例变量未在 AsyncTask 的 onPostExecute 中设置或如何将数据从 AsyncTask 返回到主 UI 线程

java - 多线程环境下如何避免同步?

c++ - 用于信号处理程序和多线程的 volatile

c# - Windsor 单例线程安全的这种依赖性吗?

c# - 如何通过asp.net(访问数据库)OledbCommand获取输入(日期)并将其值插入到 "textbox"

c# - 确定多用户、多点触控应用程序中的用户意图

python3 - 从异步方法获取结果