c# - 如何使用 Reactive 限制消费顺序?

标签 c# .net multithreading system.reactive reactive-programming

我们有一个应用程序,其中我们有一个物化的项目数组,我们将通过 Reactive 管道处理这些项目。看起来有点像这样

EventLoopScheduler eventLoop = new EventLoopScheduler();
IScheduler concurrency = new TaskPoolScheduler(
    new TaskFactory(
        new LimitedConcurrencyLevelTaskScheduler(threadCount)));
IEnumerable<int> numbers = Enumerable.Range(1, itemCount);

// 1. transform on single thread
IConnectableObservable<byte[]> source = 
    numbers.Select(Transform).ToObservable(eventLoop).Publish();

// 2. naive parallelization, restricts parallelization to Work 
// only; chunk up sequence into smaller sequences and process
// in parallel, merging results
IObservable<int> final = source.
    Buffer(10).
    Select(
        batch =>
        batch.
        ToObservable(concurrency).
        Buffer(10).
        Select(
            concurrentBatch =>
            concurrentBatch.
            Select(Work).
            ToArray().
            ToObservable(eventLoop)).
        Merge()).
    Merge();

final.Subscribe();

source.Connect();
Await(final).Wait();

如果你真的很想玩这个,替代方法看起来像

private async static Task Await(IObservable<int> final)
{
    await final.LastOrDefaultAsync();
}

private static byte[] Transform(int number)
{
    if (number == itemCount)
    {
        Console.WriteLine("numbers exhausted.");
    }
    byte[] buffer = new byte[1000000];
    Buffer.BlockCopy(bloat, 0, buffer, 0, bloat.Length);
    return buffer;
}

private static int Work(byte[] buffer)
{
    Console.WriteLine("t {0}.", Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(50);
    return 1;
}

一点解释。 Range(1, itemCount) 模拟从数据源具体化的原始输入。 Transform 模拟每个输入必须经历的丰富过程,并导致更大的内存占用。 Work 是一个对转换后的输入进行操作的“冗长”过程。

理想情况下,我们希望最大限度地减少系统同时持有的已转换输入的数量,同时通过并行化 Work 来最大化吞吐量。内存中已转换输入的数量应为批量大小(10 以上)乘以并发工作线程数 (threadCount)。

所以对于 5 个线程,我们应该在任何给定时间保留 50 个 Transform 项;如果像这里一样,转换是一个 1MB 字节的缓冲区,那么我们预计整个运行过程中的内存消耗约为 50MB。

我发现的完全不同。也就是说,Reactive 急切地消耗所有 numbers,并预先对它们进行Transform(由 numbers exhausted. 消息证明),导致大量内存提前飙升(@1GB for 1000 itemCount)。

我的基本问题是:有没有办法实现我的需求(即最小化消耗,通过多线程批处理进行节流)?

更新:詹姆斯的逆转很抱歉;起初,我不认为 paulpdaniels 和 Enigmativity 的 Work(Transform) 组合适用(这与我们实际实现的性质有关,它比上面提供的简单场景更复杂),但是,经过一些进一步的实验,我也许能够应用相同的原则:即推迟转换直到批处理执行。

最佳答案

您在代码中犯了几个错误,这些错误导致您得出所有结论。

首先,您已完成此操作:

IEnumerable<int> numbers = Enumerable.Range(1, itemCount);

您使用了 Enumerable.Range,这意味着当您调用 numbers.Select(Transform) 时,您将遍历所有 numbers 与单个线程的速度一样快。 Rx 甚至没有机会做任何工作,因为到目前为止您的管道是完全可枚举的。

下一期在您的订阅中:

final.Subscribe();

source.Connect();
Await(final).Wait();

因为您调用了 final.Subscribe() & Await(final).Wait(); 您正在为 final 创建两个单独的订阅> 可观察。

由于中间有一个 source.Connect(),第二个订阅可能会丢失值。

那么,让我们尝试移除这里发生的所有问题,看看我们是否可以解决问题。

如果你这样做:

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .Select(bs => Work(bs));

一切正常。数字在最后耗尽,在我的机器上处理 20 个项目大约需要 1 秒。

但这是按顺序处理所有内容。 Work 步骤为 Transform 提供背压,以减慢它消耗数字的速度。

让我们添加并发。

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .SelectMany(bs => Observable.Start(() => Work(bs)));

这会在 0.284 秒内处理 20 个项目,处理完 5 个项目后,数字会自行耗尽。数字不再有任何背压。基本上,调度程序将所有工作交给 Observable.Start,以便立即为下一个数字做好准备。

让我们降低并发度。

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .SelectMany(bs => Observable.Start(() => Work(bs), concurrency));

现在 20 个项目在 0.5 秒内得到处理。在号码用完之前,只有两个得到处理。这是有道理的,因为我们将并发限制为两个线程。但是在数字的消耗上仍然没有背压,所以它们很快就被消化掉了。

说了这么多,我试图构造一个具有适当背压的查询,但我找不到办法。关键在于 Transform(...) 的执行速度远快于 Work(...),因此它完成得更快。

那么对我来说显而易见的举动是:

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .SelectMany(n => Observable.Start(() => Work(Transform(n)), concurrency));

这直到最后才完成数字,并将处理限制为两个线程。它似乎为你想要的做了正确的事情,除了我必须一起做 Work(Transform(...))

关于c# - 如何使用 Reactive 限制消费顺序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31415930/

相关文章:

c# - 按引用调用未按预期工作

C#-MYSQL获取特定行列数据

c# - .NET 无需自动化即可从标签中读取文本

c# - WCF 异常记录

c# - 在单独的后台线程与进程中运行长时间后台任务

c# - 密码正则表达式验证

c# - 如果更改 Thread.CurrentThread.CurrentCulture,则 HttpWebRequest 超时

.net - .NET 框架是否支持 EAX 加密模式?

VS2012 中的 c++11 线程/互斥锁实现 - 触发断言

java - 如何使用 JavaFX 实现多线程