c# - 如何实现从一个 IObserver 到另一个 IObserver 的原子切换?

标签 c# .net system.reactive

我有一个 IObservable<byte[]>我把它变成一个IObservable<XDocument>使用一些中间步骤:

var observedXDocuments =
    from b in observedBytes
    // Lot of intermediate steps to transform byte arrays into XDocuments
    select xDoc;

在某个时间点,我对观察到的 XDocument 很感兴趣所以我订阅了 IObserver<XDocument> .稍后,我想订阅另一个 IObserver<XDocument>并处理掉旧的。

我怎样才能在一个原子操作中做到这一点,而不丢失任何观察到的 XDocument ?我可以做类似的事情:

oldObserver.Dispose();
observedXDocuments.Subscribe(newObserver);

不过我很担心,在这两个调用之间,我可能会丢失一个 XDocument .如果我切换这两个电话,我可能会收到相同的 XDocument两次。

最佳答案

我可能会添加一个间接层。编写一个名为 ExchangeableObserver 的类,将其订阅到您的可观察对象,并使其永久订阅。 ExchangeableObserver 的工作是将一切委托(delegate)给给定的子观察者。但是程序员可以随时更改被委托(delegate)的子观察者。在我的示例中,我有一个 Exchange() 方法。像这样的东西:

public class ExchangeableObserver<T> : IObserver<T> {
  private IObserver<T> inner;

  public ExchangeableObserver(IObserver<T> inner) {
    this.inner=inner;
  }

  public IObserver<T> Exchange(IObserver<T> newInner) {
    return Interlocked.Exchange(ref inner, newInner);
  }

  public void OnNext(T value) {
    inner.OnNext(value);
  }

  public void OnCompleted() {
    inner.OnCompleted();
  }

  public void OnError(Exception error) {
    inner.OnError(error);
  }
}

关于c# - 如何实现从一个 IObserver 到另一个 IObserver 的原子切换?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6343114/

相关文章:

java - Observable#single() 和 Observable#toSingle() 之间的区别?

c# - 如何在 Rx.Net 中实现排放映射处理程序?

c# - CreateProcessAsUser 不绘制 GUI

c# - RX - 如何订阅条件状态,但仅当此状态在 x 时间段内没有变化时?

c# - 使用 LINQ 插入新的 XML 节点

c# - 带有嵌入式 Flash 视频的示例 PDF?

c# - INSERT 语句中的错误信息

javascript - 服务器端如何获取隐藏字段的值

c# - 获取 contact.LastName 时出现 System.Runtime.InteropServices.COMException (0x800706BE)

c# - 如何使用 linq 和 string.EndsWith(<1 of N items in an array>)