c# - 并行运行异步方法 8 次

标签 c# .net parallel-processing .net-4.5

如何将以下内容转换为 Parallel.ForEach?

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    foreach (String url in threads)
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }
        progressBar1.PerformStep();
    }
}

我在编写代码时假设异步和并行处理是相同的,但我才意识到事实并非如此。我查看了我能找到的所有问题,但我似乎真的找不到适合我的例子。他们中的大多数缺乏可读的变量名。使用不解释它们包含什么的单字母变量名是一种糟糕的陈述示例的方式。

我通常在名为线程的数组中有 300 到 2000 个条目(包含论坛线程的 URL),并行处理(由于许多 HTTP 请求)似乎会加快执行速度。

在使用 Parallel.ForEach 之前,我是否必须删除所有异步(我在 foreach 之外没有任何异步,只有变量定义)?我应该怎么做呢?我可以在不阻塞主线程的情况下执行此操作吗?

顺便说一句,我正在使用 .NET 4.5。

最佳答案

I coded it in the assumption that asynchronous and parallel processing would be the same

异步处理和并行处理有很大的不同。如果您不明白其中的区别,我认为您应该先阅读更多相关信息(例如 what is the relation between Asynchronous and parallel programming in c#? )。

现在,您想要做的事情实际上并没有那么简单,因为您想要以特定的并行度 (8) 异步处理一个大集合。对于同步处理,您可以使用 Parallel.ForEach()(连同 ParallelOptions 来配置并行度),但是没有可以使用 的简单替代方法>异步

在您的代码中,由于您希望一切都在 UI 线程上执行,所以这很复杂。 (虽然理想情况下,您不应该直接从计算访问 UI。相反,您应该使用 IProgress,这意味着代码不再需要在 UI 线程上执行。)

在 .Net 4.5 中执行此操作的最佳方法可能是使用 TPL 数据流。它的ActionBlock完全按照您的意愿行事,但它可能非常冗长(因为它比您需要的更灵活)。所以创建一个辅助方法是有意义的:

public static Task AsyncParallelForEach<T>(
    IEnumerable<T> source, Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };
    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();
    return block.Completion;
}

在你的情况下,你会像这样使用它:

await AsyncParallelForEach(
    threads, async url => await DownloadUrl(url), 8,
    TaskScheduler.FromCurrentSynchronizationContext());

这里,DownloadUrl() 是处理单个 URL(循环体)的 async Task 方法,8 是并行度(可能不应该是实际代码中的文字常量)和 FromCurrentSynchronizationContext() 确保代码在 UI 线程上执行。

关于c# - 并行运行异步方法 8 次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14673728/

相关文章:

C# 自定义属性属性反射

c# - System.IO 未找到通过子命令生成的路径

.net - Windows 客户端损坏授权 header (Kerberos) => IIS 400(错误请求)

c++ - ASCII 格式的 MPI 并行 IO(我该怎么做?)

arrays - Julia中的共享数组用法

c# - Microsoft Office 应用程序的主要互操作程序集

c# - 事件触发前的对象处理和垃圾收集

c# - 使用 LINQ 选择 n 个最大的

c# - 在排队的后台工作项目中保留主体

multithreading - 为什么多线程应用程序通常会扩展不良?