.net - Rx 框架中的什么允许我在创建期间等待其他方法时返回 IObservable<T> ?

标签 .net task-parallel-library .net-4.5 system.reactive c#-5.0

我一直在努力创建一个 IObservable<T> 使用 Reactive Extensions 实现对于 Twitter's streaming APIs .

从高层发送 HTTP 请求并且连接保持打开状态。带长度前缀的项目将被发送以供消耗。

基本上,这是循环调用 Stream.ReadAsync 使用 await keyword 。安 IObserver<T> interface实现(来自 Observable.Create 或来自 Dataflow library 的 block ,没关系,它是实现细节)传递到此循环,然后调用 IObserver<T> 上的方法实现,产生可观察的。

但是,在此循环开始处理之前必须完成许多事情,这些事情需要调用 Task<T> -返回方法,所有这些方法在 C# 5.0 中都可以使用 await 更轻松地调用关键词。像这样的事情:

public async Task<IObservable<string>> Create(string parameter,
    CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).
         ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);

     // Create the observable.
     return block.AsObservable();
}

也就是说,我目前正在返回 Task<IObservable<T>>从我的方法来看,但我觉得我在 react 性扩展中遗漏了一些东西,它允许我使用 await为了方便我需要调用的电话,还返回 IObservable<T>而不是Task<IObservable<T>> .

响应式扩展中的什么方法允许我创建一个在从创建方法返回之前需要等待方法的可观察对象?

我发现的最接近的是Observable.DeferAsync 。假设对我的方法的调用和可观察对象的使用类似于:

public async Task Observe()
{
    // NOT the real name of the interface, but explains it's role here.
    IObservableFactory factory;

    // Create is really named something else.
    IObservable<string> observable = factory.Create("parameter");

    // Subscribe.
    observable.Subscribe(o => Console.WriteLine("Observed: {0}", o));

    // Wait.
    await observable;
}

使用DeferAsync在这里不起作用,因为调用 Subscribe将发送第一个请求,然后读取该请求,然后调用 awaitobservable将创建第二个订阅,但是订阅不同的可观察值。

或者,最终,返回Task<IObservable<T>>在响应式框架中执行此操作的适当方法?

随后,由于该方法返回 Task<T> ,通过 CancellationToken 是一个很好的做法。取消操作。也就是说,我可以理解 CancellationToken用于取消可观察对象的创建,但它也应该用于取消实际可观察对象(因为它可以向下传递以读取流等) .

我的直觉告诉我不行,因为取消违反了关注点分离以及 DRY 原则:

  • 取消创建和取消可观察是两件不同的事情。
  • 调用 Subscribe 将返回 IDisposable 实现这将取消订阅。

最佳答案

我不会返回 Task<IObservable<T>> 。在公共(public) API 中混合任务和可观察对象最终会变得困惑。请记住,任务可以被视为产生单个值的可观察量。这也意味着不要将 CancellationToken 与公共(public) API 中的可观察量混合在一起。您可以通过订阅和取消订阅来控制可观察量。

这并不意味着您不能混合幕后的概念。以下是如何使用 Observable.Using 执行您想要的操作, Task.ToObservableCancellationDisposable

首先,修改您的方法以返回 Task<ISourceBlock<string>> :

public async Task<ISourceBlock<string>> CreateBlock(string parameter, CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);

     return block;
}

现在这是使用上述方法的新 Create 方法:

public IObservable<string> Create(string parameter)
{
    // Create a cancellation token that will be canceled when the observable is unsubscribed, use this token in your call to CreateBlock.
    // Use ToObservable() to convert the Task to an observable so we can then
    // use SelectMany to subscribe to the block itself once it is available
    return Observable.Using(() => new CancellationDisposable(),
           cd => CreateBlock(parameter, cd.Token)
               .ToObservable()
               .SelectMany(block => block.AsObservable()));
}

编辑:我发现 Rx 已经用 FromAsync 实现了这个模式。 :

public IObservable<string> Create(string parameter)
{
    return Observable.FromAsync(token => CreateBlock(parameter, token))
                     .SelectMany(block => block.AsObservable());
}

还有,DeferAsync ,这更合适,因为您的 Task实际上是在创建你真正想要观察的 Observable(例如你的 block ):

public IObservable<string> Create(string parameter)
{
    return Observable.DeferAsync(async token => (await CreateBlock(parameter, token)).AsObservable());
}

关于.net - Rx 框架中的什么允许我在创建期间等待其他方法时返回 IObservable<T> ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15730531/

相关文章:

c# - Parallel.ForEach本地最终没有分区

.net - .NET 可重用控件的在线存储库

c# - Watin 嵌入到 winform 应用程序中

c# - .NET 中在单独(单个)线程上管理任务队列的最佳方式

c# - 具有 WCF 服务引用异步功能的任务计划程序

升级到 .NET 4.5 后 MSBuild 部署失败

c# - SOAP 调用中的授权 header

c# .net Windows 8 App TcpClient 代码端口到 StreamSocket

c# - 如何克服 tessnet 内存泄漏?

c# - 如何在 C#/.NET 中测试一个值是否被装箱?