c# - Rx 中的热连接

标签 c# system.reactive

Observable.Concat是一个加入 observables 的实现,但第二个 IObservable<T>仅在第一个完成时进行订阅。

http://www.introtorx.com/content/v1.0.10621.0/12_CombiningSequences.html#Concat

是否有任何“HotConcat”的实现?类似于 Observable.Merge ,但保持交付顺序,首先推送初始订阅的元素,然后推送后续订阅。就像是: Hot Concat

我知道可以使用 ReplaySubject<T> , 但它似乎不太好,因为性能和内存使用的影响..

最佳答案

这是我已经使用了一段时间的实现。此实现引入了一个 BufferUntilSubscribed 运算符,它将一个 IObservable 转换为一个 IConnectableObservable,它将在您调用 Connect 时开始缓冲,并且会将缓冲的结果传递给第一个订阅者。一旦第一个订阅者“ catch ”,缓冲将停止,订阅者将在直播事件到达时收到。

一旦你有了它,你就可以像这样编写 HotConcat:

public static IObservable<T> HotConcat<T>(params IObservable<T>[] sources)
{
    var s2 = sources.Select(s => s.BufferUntilSubscribed());
    var subscriptions = new CompositeDisposable(s2.Select(s2 => s2.Connect()).ToArray());
    return Observable.Create<T>(observer =>
    {
        var s = new SingleAssignmentDisposable();
        var d = new CompositeDisposable(subscriptions);
        d.Add(s);

        s.Disposable = s2.Concat().Subscribe(observer);

        return d;
    });
}

下面是 BufferUntilSubscribed 的实现:

private class BufferUntilSubscribedObservable<T> : IConnectableObservable<T>
{
    private readonly IObservable<T> _source;
    private readonly IScheduler _scheduler;
    private readonly Subject<T> _liveEvents;
    private bool _observationsStarted;
    private Queue<T> _buffer;
    private readonly object _gate;

    public BufferUntilSubscribedObservable(IObservable<T> source, IScheduler scheduler)
    {
        _source = source;
        _scheduler = scheduler;
        _liveEvents = new Subject<T>();
        _buffer = new Queue<T>();
        _gate = new object();
        _observationsStarted = false;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (_gate)
        {
            if (_observationsStarted)
            {
                return _liveEvents.Subscribe(observer);
            }

            _observationsStarted = true;

            var bufferedEvents = GetBuffers().Concat().Finally(RemoveBuffer); // Finally clause to remove the buffer if the first observer stops listening.
            return Observable.Merge(_liveEvents, bufferedEvents).Subscribe(observer);
        }
    }

    public IDisposable Connect()
    {
        return _source.Subscribe(OnNext, _liveEvents.OnError, _liveEvents.OnCompleted);
    }

    private void RemoveBuffer()
    {
        lock (_gate)
        {
            _buffer = null;
        }
    }

    /// <summary>
    /// Acquires a lock and checks the buffer.  If it is empty, then replaces it with null and returns null.  Else replaces it with an empty buffer and returns the old buffer.
    /// </summary>
    /// <returns></returns>
    private Queue<T> GetAndReplaceBuffer()
    {
        lock (_gate)
        {
            if (_buffer == null)
            {
                return null;
            }

            if (_buffer.Count == 0)
            {
                _buffer = null;
                return null;
            }

            var result = _buffer;
            _buffer = new Queue<T>();
            return result;
        }
    }

    /// <summary>
    /// An enumerable of buffers that will complete when a call to GetAndReplaceBuffer() returns a null, e.g. when the observer has caught up with the incoming source data.
    /// </summary>
    /// <returns></returns>
    private IEnumerable<IObservable<T>> GetBuffers()
    {
        Queue<T> buffer;
        while ((buffer = GetAndReplaceBuffer()) != null)
        {
            yield return buffer.ToObservable(_scheduler);
        }
    }

    private void OnNext(T item)
    {
        lock (_gate)
        {
            if (_buffer != null)
            {
                _buffer.Enqueue(item);
                return;
            }
        }

        _liveEvents.OnNext(item);
    }
}

/// <summary>
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data.
/// Thus the observer may subscribe late to a hot observable yet still see all of the data.  Later observers will not see the buffered events.
/// </summary>
/// <param name="source"></param>
/// <param name="scheduler">Scheduler to use to dump the buffered data to the observer.</param>
/// <returns></returns>
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source, IScheduler scheduler)
{
    return new BufferUntilSubscribedObservable<T>(source, scheduler);
}

/// <summary>
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data.
/// Thus the observer may subscribe late to a hot observable yet still see all of the data.  Later observers will not see the buffered events.
/// </summary>
/// <param name="source"></param>
/// <returns></returns>
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source)
{
    return new BufferUntilSubscribedObservable<T>(source, Scheduler.Immediate);
}

关于c# - Rx 中的热连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24790191/

相关文章:

c# - 如何在不使用触摸屏的情况下模拟触摸事件

c# - 如何将控件渲染到图像?

C#,多维数组的多线程填充

c# - 绑定(bind)导致我的视​​图模型留在内存中

javascript - 按需获取生成值的优雅解决方案

c# - 极长的 Rx 事件链

c# - ADO.NET:将 DataTable 转换为 DataRow 数组

c# - 使用 Rx 下载文件(响应式编程)

c# - ConnectableObservable 一次处理所有订阅的方法?

c# - 使用 .NET Rx 观察随 Action 事件改变的属性