c# - 使用 Reactive Extensions 同时处理流时如何限制缓冲

标签 c# multithreading system.reactive

我有一个交错流 split into separate sequential streams .

制作人

int streamCount = 3;

new MyIEnumerable<ElementType>()
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % streamCount), Value = x })
.Subscribe(x => outputs[x.Key].OnNext(x.Value));

其中 outputs[] 是处理流的主题,定义如下。 .ObserveOn() 用于并发处理流(多线程)。

消费者

var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();                                                                                                     

outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x));
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x));
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x));

此代码的问题在于它会尽可能快地读取整个枚举,即使输出流无法 catch 也是如此。在我的例子中,可枚举是一个文件流,所以这可能会导致使用大量内存。因此,我希望在缓冲区达到某个阈值时阻止读取。

最佳答案

我已经通过在生产者和消费者上使用信号量解决了这个问题,如下所示。但是,我不确定这是否是一个好的解决方案(就 Rx 契约、编程风格等而言)。

var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS);

// add to producer (before subscribe)
.Do(_ => semaphore.Wait());

// add to consumer (before subscribe)
.Do(_ => semaphore.Release()))

CancelationToken 传递给对 Wait() 的调用并确保它在流异常停止时被取消可能是个好主意?

关于c# - 使用 Reactive Extensions 同时处理流时如何限制缓冲,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11168779/

相关文章:

c# - 在所有解决方案项目中共享域模型对象

c# - 为什么检查字典是否包含键会更快,而不是在不存在时捕获异常?

c# - 在 C# 中,如何确定不同文化中的一周开始时间是星期一还是星期日

java - 为什么 wait , notify 和 notifyAll 方法都在 Object 类中?

c# - 我如何根据每个元素的条件循环可观察序列?

c# - 访问路径 'c:\$Recycle.Bin\S-1-5-18' 被拒绝

ios - 核心数据托管上下文未保存在多线程环境中

ios - 带有核心数据和 NSFetchedResultsController 的后台线程

c# - 如何将以下类转换为 IObservable?

silverlight - 如何取消隐藏 Silverlight WCF BeginXXXX 和 EndXXXX 服务调用