c# - 通过 observable 限制重播缓冲区

标签 c# system.reactive

我有一个包含实时数据的流,以及一个基本上分隔属于一起的实时数据部分的流。现在,当有人订阅实时数据流时,我想为他们重播实时数据。但是我不想记住所有实时数据,只记住自上次其他流发出值以来的部分。

There is an issue这将解决我的问题,因为有一个重放操作符完全符合我的要求(或者至少我认为)。

目前有什么方法可以轻松做到这一点?有没有比下面的方法更好的方法?

private class ReplayWithLimitObservable<TItem, TDelimiter> : IConnectableObservable<TItem>
{
    private readonly List<TItem> cached = new List<TItem>();
    private readonly IObservable<TDelimiter> delimitersObservable;
    private readonly IObservable<TItem> itemsObservable;
    public ReplayWithLimitObservable(IObservable<TItem> itemsObservable, IObservable<TDelimiter> delimitersObservable)
    {
        this.itemsObservable = itemsObservable;
        this.delimitersObservable = delimitersObservable;
    }

    public IDisposable Subscribe(IObserver<TItem> observer)
    {
        lock (cached)
        {
            cached.ForEach(observer.OnNext);
        }

        return itemsObservable.Subscribe(observer);
    }

    public IDisposable Connect()
    {
        var delimiters = delimitersObservable.Subscribe(
            p =>
                {
                    lock (cached)
                    {
                        cached.Clear();
                    }
                });
        var items = itemsObservable.Subscribe(
            p =>
                {
                    lock (cached)
                    {
                        cached.Add(p);
                    }
                });
        return Disposable.Create(
            () =>
                {
                    items.Dispose();
                    delimiters.Dispose();
                    lock (cached)
                    {
                        cached.Clear();
                    }
            });
}

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters)
{
    return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters);
}

最佳答案

这是否符合您的要求?它的优点是将所有锁定和竞争条件留给 Rx 专家 :)

private class ReplayWithLimitObservable<T, TDelimiter> : IConnectableObservable<T>
{
  private IConnectableObservable<IObservable<T>> _source;

  public ReplayWithLimitObservable(IObservable<T> source, IObservable<TDelimiter> delimiter)
  {
    _source = source
      .Window(delimiter) // new replay window on delimiter
      .Select<IObservable<T>,IObservable<T>>(window =>
      {
        var replayWindow = window.Replay();

        // immediately connect and start memorizing values
        replayWindow.Connect();

        return replayWindow;
      })
      .Replay(1); // remember the latest window
  }

  IDisposable Connect()
  {
    return _source.Connect();
  }

  IDisposable Subscribe(IObserver<T> observer)
  {
    return _source
      .Concat()
      .Subscribe(observer);
  }
}

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters)
{
    return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters);
}

关于c# - 通过 observable 限制重播缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27312468/

相关文章:

c# - WinForms 中的 RX 和 Model-View-Presenter

c# - 隐藏 GridView ID 列

c# - 如何创建一个在超时后重新发送 HTTP 请求的 Observable 序列?

c# - Reactive Extensions 步履蹒跚的第一步

wpf - 如何将我的 MVVM 集合连接到我的业务层中长时间运行的进程?

c# - 我如何断言 Observable 不会推送任何项目?

c# - 下一行继续代码

c# - 未找到匹配的创建者

c# - CodeFluent 中的连续 GUID

c# - C#中对象实例化左侧的抽象类