c# - 在 Rx.Net 中,我如何实现一个可观察的反馈循环直到反馈耗尽?

标签 c# system.reactive

我有以下 API:

IObservable<IList<SqlDataRecord>> WriteToDBAndGetFailedSource(SqlConnection conn, IList<SqlDataRecord> batch)

它尝试将批处理写入数据库。如果失败,则返回整个批处理,否则返回的 observable 为空。

我也有生产批处理的来源:

IObservable<IList<SqlDataRecord>> GetDataSource(string filePath, int bufferThreshold)

现在,我可以像这样组合它们:

var failedBatchesSource = GetDataSource(filePath, 1048576)
  .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
  .Merge(100);

这将写入所有批处理(最多并发 100 个)并返回失败批处理的可观察值。

我真正想要的是将失败的批处理在一定的暂停之后送回批处理的来源,可能是在原始来源仍在生产批处理的时候。我当然可以这样写:

var failedBatchesSource = GetDataSource(filePath, 1048576)
  .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
  .Merge(100)
  .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
  .Merge(100);

但这当然是错误的,因为:

  1. 这打破了在再次处理失败的批处理之前暂停的要求。
  2. 它可能会向数据库生成超过 100 个并发写入请求。
  3. 这就像展开一个迭代次数未知的 for 循环 - 效率低下。

一旦我收集了所有失败并在循环中重新开始,我也可以跳出可观察的 monad:

            var src = GetDataSource(filePath, 1048576);

            for (;;)
            {
                var failed = await src
                    .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
                    .Merge(100)
                    .ToList();
                if (failed.Count == 0)
                {
                    break;
                }
                src = failed.ToObservable();
            }

但我想知道我是否可以在保持在可观察的单子(monad)内的同时做得更好。

最佳答案

认为这可能会成功

public static IObservable<T> ProcessAll<T>(this IObservable<T> source, Func<T, IObservable<T>> processor, int mergeCount, TimeSpan failureDelay)
{
    return Observable.Create<T>(
        observer =>
            {
                var failed = new Subject<T>();

                return source.Merge(failed)
                        .Select(processor)
                        .Merge(mergeCount)
                        .Delay(failureDelay)
                        .Subscribe(failed.OnNext, observer.OnError, observer.OnCompleted);
            });
}

然后像这样使用它:

GetDataSource(filePath, 1048576)
  .ProcessAll(batch => WriteToDBAndGetFailedSource(conn, batch), 100, TimeSpan.FromMilliseconds(500))
  .Subscribe();

ProcessAll 是一个可怕的名字,但现在是星期五晚上,我想不出更好的名字。

关于c# - 在 Rx.Net 中,我如何实现一个可观察的反馈循环直到反馈耗尽?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33575048/

相关文章:

system.reactive - 从多个任务中创建一个可观察对象

c# - 如何更改datagridview中列的颜色?

javascript - JQuery UI 自动完成未到达 ActionResult C# MVC

C#根据输入参数从url下载zip文件

c# - 为什么重复 Enumerable 到 Observable 转换 block

.net - 响应式(Reactive)扩展 (Rx) 和异步类

c# - 不包含 的定义,也找不到接受类型的第一个参数的扩展方法

c# - 本地目录的路径

c# - 如何防止 Rx 测试挂起?

c# - C# 中的 IObserver 和 IObservable 用于观察者与委托(delegate)、事件