我有一个交错流 split into separate sequential streams .
制作人
int streamCount = 3;
new MyIEnumerable<ElementType>()
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % streamCount), Value = x })
.Subscribe(x => outputs[x.Key].OnNext(x.Value));
其中 outputs[]
是处理流的主题,定义如下。 .ObserveOn()
用于并发处理流(多线程)。
消费者
var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();
outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x));
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x));
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x));
此代码的问题在于它会尽可能快地读取整个枚举,即使输出流无法 catch 也是如此。在我的例子中,可枚举是一个文件流,所以这可能会导致使用大量内存。因此,我希望在缓冲区达到某个阈值时阻止读取。
最佳答案
我已经通过在生产者和消费者上使用信号量解决了这个问题,如下所示。但是,我不确定这是否是一个好的解决方案(就 Rx 契约、编程风格等而言)。
var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS);
// add to producer (before subscribe)
.Do(_ => semaphore.Wait());
// add to consumer (before subscribe)
.Do(_ => semaphore.Release()))
将 CancelationToken
传递给对 Wait()
的调用并确保它在流异常停止时被取消可能是个好主意?
关于c# - 使用 Reactive Extensions 同时处理流时如何限制缓冲,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11168779/