c# - RX : How to concat a Snapshot stream and an Update stream?

标签 c# repository-pattern system.reactive

我一直在尝试创建一个 observable,它从存储库缓存中流式传输世界状态(快照),然后是来自单独提要的实时更新。问题是快照调用正在阻塞,因此必须在此期间缓冲更新。

这是我想出的,稍微简化了一点。 GetStream() 方法是我所关注的方法。我想知道是否有更优雅的解决方案。假设 GetDataFeed() 全天脉冲更新缓存。

private static readonly IConnectableObservable<long> _updateStream;

public static Constructor()
{
      _updateStream = GetDataFeed().Publish();
      _updateStream.Connect();
}

static void Main(string[] args)
{
      _updateStream.Subscribe(Console.WriteLine);
      Console.ReadLine();
      GetStream().Subscribe(l => Console.WriteLine("Stream: " + l));
      Console.ReadLine();
}

public static IObservable<long> GetStream()
{
      return Observable.Create<long>(observer =>
            {
                  var bufferedStream = new ReplaySubject<long>();
                  _updateStream.Subscribe(bufferedStream);
                  var data = GetSnapshot();
                  // This returns the ticks from GetSnapshot
                  //  followed by the buffered ticks from _updateStream
                  //  followed by any subsequent ticks from _updateStream
                  data.ToObservable().Concat(bufferedStream).Subscribe(observer);

                  return Disposable.Empty;
            });
}

private static IObservable<long> GetDataFeed()
{
      var feed = Observable.Interval(TimeSpan.FromSeconds(1));
      return Observable.Create<long>(observer =>
      {
            feed.Subscribe(observer);
            return Disposable.Empty;
      });
}

大众观点反对 Subject,因为它们不是“功能性的”,但如果没有 ReplaySubject,我找不到这样做的方法。热 Observable 上的 Replay 过滤器不起作用,因为它会重播所有内容(可能是一整天的陈旧更新)。

我也很关心竞争条件。有没有办法保证某种排序,是否应该在快照之前缓冲较早的更新?能否与其他 RX 运算符(operator)一起更安全、更优雅地完成整个事情?

谢谢。

-将

最佳答案

无论您使用 ReplaySubject 还是 Replay 函数都没有区别。 Replay 在底层使用了一个 ReplaySubject。我还会注意到您正在疯狂地泄漏订阅,这可能会导致资源泄漏。此外,您对重放缓冲区的大小没有限制。如果你整天都在观察 observable,那么重播缓冲区将不断增长。您应该对其进行限制以防止出现这种情况。

这是 GetStream 的更新版本。在这个版本中,我采用了简单的方法,将 Replay 限制为最近 1 分钟的数据。这假定 GetData 将始终完成并且观察者将在该 1 分钟内观察到结果。你的里程可能会有所不同,你可能会改进这个方案。但至少这样,当您整天观察可观察对象时,该缓冲区不会无限制地增长,并且仍然只包含一分钟的更新。

public static IObservable<long> GetStream()
{
    return Observable.Create<long>(observer =>
    {
        var updateStreamSubscription = new SingleAssignmentDisposable();
        var sequenceDisposable = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(updateStreamDisposable, sequenceDisposable);

        // start buffering the updates
        var bufferedStream = _updateStream.Replay(TimeSpan.FromMinutes(1));
        updateStreamSubscription.Disposable = bufferedStream.Connect();

        // now retrieve the initial snapshot data
        var data = GetSnapshot();

        // subscribe to the snapshot followed by the buffered data
        sequenceDisposable.Disposable = data.ToObservable().Concat(bufferedStream).subscribe(observer);

        // return the composite disposable which will unsubscribe when the observer wishes
        return subscriptions;
    });
}

关于竞争条件和过滤“旧”更新的问题...如果您的快照数据包含某种版本信息,并且您的更新流也提供版本信息,那么您可以有效地测量返回的最新版本您的快照查询,然后过滤缓冲流以忽略旧版本的值。这是一个粗略的例子:

public static IObservable<long> GetStream()
{
    return Observable.Create<long>(observer =>
    {
        var updateStreamSubscription = new SingleAssignmentDisposable();
        var sequenceDisposable = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(updateStreamDisposable, sequenceDisposable);

        // start buffering the updates
        var bufferedStream = _updateStream.Replay(TimeSpan.FromMinutes(1));
        updateStreamSubscription.Disposable = bufferedStream.Connect();

        // now retrieve the initial snapshot data
        var data = GetSnapshot();

        var snapshotVersion = data.Length > 0 ? data[data.Length - 1].Version : 0;
        var filteredUpdates = bufferedStream.Where(update => update.Version > snapshotVersion);

        // subscribe to the snapshot followed by the buffered data
        sequenceDisposable.Disposable = data.ToObservable().Concat(filteredUpdates).subscribe(observer);

        // return the composite disposable which will unsubscribe when the observer wishes
        return subscriptions;
    });
}

在将实时更新与存储的快照合并时,我已经成功地使用了这种模式。我还没有找到一个优雅的 Rx 运算符,它已经在没有任何竞争条件的情况下做到了这一点。但是上面的方法可能会变成这样。 :)

编辑:注意我在上面的例子中省略了错误处理。理论上,对 GetSnapshot 的调用可能会失败,并且您会泄露对更新流的订阅。我建议将 CompositeDisposable 声明之后的所有内容包装在 try/catch block 中,并在 catch 处理程序中确保调用 subscriptions.Dispose() 之前重新抛出异常。

关于c# - RX : How to concat a Snapshot stream and an Update stream?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17107924/

相关文章:

.net - 可以使用存储库模式 + 存储过程吗?他们应该/可以返回 IQueryable 吗?

c# - 如何模拟返回接口(interface)列表的方法

c# - 对缓冲的 Observable 进行排序

c# - UWP 应用中的设备 ID(阈值 1)

c# - 将构造函数参数传递给 Nunit 基类

c# - 如何更改列表框中项目的名称/字符串?

c# - 运行时 ASP.NET 确认对话框

asp.net-mvc-3 - 我应该在哪里创建 ASP.Net MVC 3 应用程序中的 Unit of Work 实例?

c# - 异步一次性创建

c# - 热可观察和 IDisposable