c# - 执行 Task.WhenAny 时如何产生返回项

标签 c# async-await c#-5.0

我的解决方案中有两个项目:WPF 项目和类库。

在我的类库中:

我有一个符号列表:

class Symbol
{
     Identifier Identifier {get;set;}
     List<Quote> HistoricalQuotes {get;set;}
     List<Financial> HistoricalFinancials {get;set;}
}

对于每个交易品种,我使用网络请求查询金融服务以检索每个交易品种的历史金融数据。 (webClient.DownloadStringTaskAsync(uri);)

所以这是我的方法:
    public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
    {
        var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

        foreach (var symbol in await _listSymbols)
        {
            historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
        }

        while (historicalFinancialTask.Count > 0)
        {
            var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
            historicalFinancialTask.Remove(historicalFinancial);

            // the line below doesn't compile, which is understandable because method's return type is a Task of something
            yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); 
        }
    }

    private async Task<HistoricalFinancialResult> GetFinancialsQueryAsync(Symbol symbol)
    {
        var result = new HistoricalFinancialResult();
        result.Symbol = symbol;
        result.Data = await _financialsQuery.GetFinancialsQuery(symbol.Identifier); // contains some logic like parsing and use WebClient to query asynchronously
        return result;
    }

    private class HistoricalFinancialResult
    {
        public Symbol Symbol { get; set; }
        public IEnumerable<Financial> Data { get; set; }

        // equality members
    }

如您所见,我希望每次下载每个品种的金融历史数据时都能产生结果,而不是等待我对金融服务的所有调用完成。

在我的 WPF 中,这是我想要做的:
foreach(var symbol in await _service.GetSymbolsAsync())
{
      SymbolsObservableCollection.Add(symbol);
}

似乎我们无法在异步方法中产生返回,那么我可以使用什么解决方案?除了将我的 GetSymbols 方法移动到我的 WPF 项目中。

最佳答案

虽然我喜欢 TPL Dataflow 组件(svick 建议您使用它),但迁移到该系统确实需要大量的 promise - 这不是您可以添加到现有设计中的东西。如果您正在执行大量 CPU 密集型数据处理并希望利用许多 CPU 内核,它会提供相当大的好处。但充分利用它并非易事。

他的另一个建议是使用 Rx,可能更容易与现有解决方案集成。 (参见 original documentation ,但对于最新的代码,请使用 Rx-Main nuget 包。或者,如果您想查看源代码,请参见 the Rx CodePlex site )甚至可以继续调用代码使用 IEnumerable<Symbol>如果您愿意 - 您可以纯粹将 Rx 用作实现细节,[ 编辑 2013/11/09 添加: ] 尽管正如 svick 指出的那样,鉴于您的最终目标,这可能不是一个好主意。

在我给你展示一个例子之前,我想先弄清楚我们到底在做什么。您的示例有一个带有此签名的方法:

public async Task<IEnumerable<Symbol>> GetSymbolsAsync()

那个返回类型,Task<IEnumerable<Symbol>> ,本质上说“这是一种产生 IEnumerable<Symbol> 类型的单一结果的方法,它可能不会立即产生该结果。”

我认为正是这种单一的结果让您感到悲伤,因为那并不是您真正想要的。一个 Task<T> (不管 T 可能是什么)代表单个异步操作。它可能有很多步骤(如果将 await 实现为 C# async 方法的许多用途),但最终它会产生一件事。你想在不同的时间生产多种东西,所以 Task<T>不太合适。

如果你真的要按照你的方法签名 promise 的方式去做——最终产生一个结果——你可以这样做的一种方法是让你的异步方法构建一个列表,然后在它准备好并准备好时将其作为结果产生:
// Note: this first example is *not* what you want.
// However, it is what your method's signature promises to do.
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
{
    var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

    foreach (var symbol in await _listSymbols)
    {
        historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
    }

    var results = new List<Symbol>();
    while (historicalFinancialTask.Count > 0)
    {
        var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
        historicalFinancialTask.Remove(historicalFinancial);

        results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); 
    }

    return results;
}

这个方法按照它的签名所说的那样做:它异步地产生一个符号序列。

但大概你想创建一个 IEnumerable<Symbol>它会在元素可用时生成它们,而不是等到它们全部可用时才生成。 (否则,您也可以使用 WhenAll 。)您可以这样做,但是 yield return不是办法。

简而言之,我认为您想要做的是生成一个异步列表。有一种类型:IObservable<T>准确表达了我相信您希望用您的 Task<IEnumerable<Symbol>> 表达的内容:它是一系列项目(就像 IEnumerable<T> )但异步。

可以通过类比来理解它:
public Symbol GetSymbol() ...


public Task<Symbol> GetSymbolAsync() ...

作为
public IEnumerable<Symbol> GetSymbols() ...

是:
public IObservable<Symbol> GetSymbolsObservable() ...

(不幸的是,与 Task<T> 不同,对于调用异步面向序列的方法,没有一个通用的命名约定。我在这里的末尾添加了“Observable”,但这不是普遍做法。我当然不会称它为 GetSymbolsAsync 因为人们会期望它返回一个 Task 。)

换句话说,Task<IEnumerable<T>>说“当我准备好时,我会制作这个系列”而 IObservable<T>说:“这是一个系列。当我准备好时,我会制作每件元素。”

因此,您需要一个返回 Symbol 序列的方法。对象,这些对象是异步生成的。这告诉我们你真的应该返回一个 IObservable<Symbol> .这是一个实现:
// Unlike this first example, this *is* what you want.
public IObservable<Symbol> GetSymbolsRx()
{
    return Observable.Create<Symbol>(async obs =>
    {
        var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

        foreach (var symbol in await _listSymbols)
        {
            historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
        }

        while (historicalFinancialTask.Count > 0)
        {
            var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
            historicalFinancialTask.Remove(historicalFinancial);

            obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
        }
    });
}

如您所见,这使您可以编写几乎您希望编写的内容 - 这段代码的主体几乎与您的完全相同。唯一的区别是您在何处使用 yield return (未编译),这将调用 OnNext Rx 提供的对象上的方法。

写完之后,您可以轻松地将其包装在 IEnumerable<Symbol> 中。 ([ 2013 年 11 月 29 日编辑添加: ] 虽然您可能实际上并不想这样做 - 请参阅答案末尾的补充):
public IEnumerable<Symbol> GetSymbols()
{
    return GetSymbolsRx().ToEnumerable();
}

这可能看起来不是异步的,但它实际上允许底层代码异步操作。当您调用此方法时,它不会阻塞 - 即使执行获取财务信息工作的底层代码无法立即产生结果,此方法仍会立即返回 IEnumerable<Symbol> .当然,如果数据尚不可用,任何尝试遍历该集合的代码最终都会阻塞。但关键的是,我认为你最初试图实现的目标是:
  • 你可以写一个 async完成工作的方法(在我的例子中是一个委托(delegate),作为参数传递给 Observable.Create<T>,但如果你愿意,你可以编写一个独立的 async 方法)
  • 调用代码不会仅仅因为要求您开始获取符号而被阻塞
  • 由此产生的 IEnumerable<Symbol>一旦有货,将立即生产每个单独的项目

  • 这是因为 Rx 的 ToEnumerable方法中有一些巧妙的代码,可以弥补 IEnumerable<T> 的同步世界观之间的差距。和异步产生结果。 (换句话说,这正是您发现 C# 无法为您做的事情而感到失望的事情。)

    如果你好奇,你可以查看源。背后的代码 ToEnumerable可以在 https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs 找到。

    [ 编辑 2013/11/29 添加: ]

    svick 在评论中指出了我遗漏的一些内容:您的最终目标是将内容放入 ObservableCollection<Symbol> .不知怎的,我没有看到那一点。这意味着 IEnumerable<T>是错误的方法 - 您希望在项目可用时填充集合,而不是使用 foreach环形。所以你只需这样做:
    GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
    

    或类似的规定。这将在项目可用时将项目添加到集合中。

    顺便说一下,这取决于在 UI 线程上启动的整个事情。只要是这样,您的异步代码就应该最终在 UI 线程上运行,这意味着当项目添加到集合中时,这也会发生在 UI 线程上。但是如果由于某种原因你最终从一个工作线程启动了东西(或者如果你在任何等待上使用 ConfigureAwait,从而断开与 UI 线程的连接)你需要安排处理来自右侧线程上的 Rx 流:
    GetSymbolsRx()
        .ObserveOnDispatcher()
        .Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
    

    如果您在执行此操作时在 UI 线程上,它将选择当前的调度程序,并确保所有通知都通过它到达。如果您在订阅时已经在错误的线程上,您可以使用 ObserveOn需要调度程序的过载。 (这些要求您引用 System.Reactive.Windows.Threading 。这些是扩展方法,因此您需要一个 using 作为其包含的命名空间,也称为 System.Reactive.Windows.Threading )

    关于c# - 执行 Task.WhenAny 时如何产生返回项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18284169/

    相关文章:

    .net - 使用异步 CTP 封装同步方法不起作用

    c# - 如何从通过 Task.Factory.StartNew 创建的线程访问应用程序上下文

    c# - 使用顶级函数执行等待的嵌套异步调用

    c# - Entity Framework 正确使用连接表?

    C# 在列表中添加委托(delegate)

    c# - 使用异步 lambda 运行任务

    javascript - 在 Typescript 中使用 AsyncIterator – 必需选项

    c# - Umbraco 6 - 读取中央配置设置的 "correct"方式是什么?

    c# - 获取两个列表之间的差异

    C# Async ApiController 过早关闭 OutputStream