java - 如何在 RxJava2 中的自定义 Observable 中获取观察者的 dispose 操作的通知

标签 java rx-java rx-java2

In this thread ,提出了一个关于如何观察取消订阅事件的问题,以便您可以在取消订阅后清理并删除监听器。然而,在RxJava2中,上述线程的方法不再起作用。

def myObservable = Observable.create({ aEmitter ->
    val listener = {event -> 
      aEmitter.onNext(event);                
    }
    existingEventSource.addListener(listener)

    // Fails since aEmitter doesn't have an add() method nor does Subscriptions exist.
    aEmitter.add(Subscriptions.create(() -> existingEventSource.removeListener(listener)));
})

在 RxJava2 中解决这个问题的正确方法是什么?

最佳答案

请查看stringObservable Observable,如何处理订阅。

public class MyTest {
  @Mock private MyService mock;

  @Before
  public void setUp() throws Exception {
    MockitoAnnotations.initMocks(this);
  }

  @Test
  public void nam3e() {
    ArrayList<Listener> listeners = new ArrayList<>();

    doAnswer(
            invocation -> {
              Object[] args = invocation.getArguments();
              Listener arg = (Listener) args[0];

              listeners.add(arg);

              return null;
            })
        .when(mock)
        .addListener(any());

    Observable<String> stringObservable =
        Observable.create(
            e -> {
              Listener listener =
                  s -> {
                    e.onNext(s);
                  };

              mock.addListener(listener);

              e.setCancellable(
                  () -> {
                    mock.removeListener(listener);
                  });
            });

    TestObserver<String> test = stringObservable.test();

    Listener listener = listeners.get(0);
    listener.onNext("Wurst");

    test.assertNotComplete().assertValue("Wurst");
    verify(mock, times(1)).addListener(any());

    test.dispose();

    verify(mock, times(1)).removeListener(any());
  }

  public interface MyService {
    void addListener(Listener listener);

    void removeListener(Listener listener);
  }

  @FunctionalInterface
  public interface Listener {
    void onNext(String s);
  }
}

关于java - 如何在 RxJava2 中的自定义 Observable 中获取观察者的 dispose 操作的通知,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47644374/

相关文章:

java - Ant Build - 找不到taskdef

java - 基于属性的可流动窗口

android - RxBinding出错后如何重新绑定(bind)View?

kotlin - 如何使用 Observable.zip 保存类型?

java - RxJava 2 Activity 销毁时可处置

java - Apache poi HSSF 返回不正确的物理行数

java - 管道分离的多行非结构化数据的Map Reduce代码

java - JScrollPane 不起作用,视口(viewport)正在堆叠面板

java - 使用 rxjava 使网络调用并行而不交错

java - 如何使用 RxJava2 过滤列表并收集其索引到字符串结果?