Observable.Concat
是一个加入 observables 的实现,但第二个 IObservable<T>
仅在第一个完成时进行订阅。
http://www.introtorx.com/content/v1.0.10621.0/12_CombiningSequences.html#Concat
是否有任何“HotConcat”的实现?类似于 Observable.Merge
,但保持交付顺序,首先推送初始订阅的元素,然后推送后续订阅。就像是:
我知道可以使用 ReplaySubject<T>
, 但它似乎不太好,因为性能和内存使用的影响..
最佳答案
这是我已经使用了一段时间的实现。此实现引入了一个 BufferUntilSubscribed
运算符,它将一个 IObservable
转换为一个 IConnectableObservable
,它将在您调用 Connect
时开始缓冲,并且会将缓冲的结果传递给第一个订阅者。一旦第一个订阅者“ catch ”,缓冲将停止,订阅者将在直播事件到达时收到。
一旦你有了它,你就可以像这样编写 HotConcat
:
public static IObservable<T> HotConcat<T>(params IObservable<T>[] sources)
{
var s2 = sources.Select(s => s.BufferUntilSubscribed());
var subscriptions = new CompositeDisposable(s2.Select(s2 => s2.Connect()).ToArray());
return Observable.Create<T>(observer =>
{
var s = new SingleAssignmentDisposable();
var d = new CompositeDisposable(subscriptions);
d.Add(s);
s.Disposable = s2.Concat().Subscribe(observer);
return d;
});
}
下面是 BufferUntilSubscribed
的实现:
private class BufferUntilSubscribedObservable<T> : IConnectableObservable<T>
{
private readonly IObservable<T> _source;
private readonly IScheduler _scheduler;
private readonly Subject<T> _liveEvents;
private bool _observationsStarted;
private Queue<T> _buffer;
private readonly object _gate;
public BufferUntilSubscribedObservable(IObservable<T> source, IScheduler scheduler)
{
_source = source;
_scheduler = scheduler;
_liveEvents = new Subject<T>();
_buffer = new Queue<T>();
_gate = new object();
_observationsStarted = false;
}
public IDisposable Subscribe(IObserver<T> observer)
{
lock (_gate)
{
if (_observationsStarted)
{
return _liveEvents.Subscribe(observer);
}
_observationsStarted = true;
var bufferedEvents = GetBuffers().Concat().Finally(RemoveBuffer); // Finally clause to remove the buffer if the first observer stops listening.
return Observable.Merge(_liveEvents, bufferedEvents).Subscribe(observer);
}
}
public IDisposable Connect()
{
return _source.Subscribe(OnNext, _liveEvents.OnError, _liveEvents.OnCompleted);
}
private void RemoveBuffer()
{
lock (_gate)
{
_buffer = null;
}
}
/// <summary>
/// Acquires a lock and checks the buffer. If it is empty, then replaces it with null and returns null. Else replaces it with an empty buffer and returns the old buffer.
/// </summary>
/// <returns></returns>
private Queue<T> GetAndReplaceBuffer()
{
lock (_gate)
{
if (_buffer == null)
{
return null;
}
if (_buffer.Count == 0)
{
_buffer = null;
return null;
}
var result = _buffer;
_buffer = new Queue<T>();
return result;
}
}
/// <summary>
/// An enumerable of buffers that will complete when a call to GetAndReplaceBuffer() returns a null, e.g. when the observer has caught up with the incoming source data.
/// </summary>
/// <returns></returns>
private IEnumerable<IObservable<T>> GetBuffers()
{
Queue<T> buffer;
while ((buffer = GetAndReplaceBuffer()) != null)
{
yield return buffer.ToObservable(_scheduler);
}
}
private void OnNext(T item)
{
lock (_gate)
{
if (_buffer != null)
{
_buffer.Enqueue(item);
return;
}
}
_liveEvents.OnNext(item);
}
}
/// <summary>
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data.
/// Thus the observer may subscribe late to a hot observable yet still see all of the data. Later observers will not see the buffered events.
/// </summary>
/// <param name="source"></param>
/// <param name="scheduler">Scheduler to use to dump the buffered data to the observer.</param>
/// <returns></returns>
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source, IScheduler scheduler)
{
return new BufferUntilSubscribedObservable<T>(source, scheduler);
}
/// <summary>
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data.
/// Thus the observer may subscribe late to a hot observable yet still see all of the data. Later observers will not see the buffered events.
/// </summary>
/// <param name="source"></param>
/// <returns></returns>
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source)
{
return new BufferUntilSubscribedObservable<T>(source, Scheduler.Immediate);
}
关于c# - Rx 中的热连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24790191/