java - 如何将基于回调的 API 转换为基于 Observable 的 API?

标签 java multithreading rx-java

我正在使用的库发出一系列 Message使用回调对象的对象。

interface MessageCallback {
    onMessage(Message message);
}

回调是使用一些 libraryObject.setCallback(MessageCallback) 添加的调用并使用非阻塞启动进程 libraryObject.start()方法调用。

创建 Observable<Message> 的最佳方式是什么?会发射那些物体?

如果libraryObject.start()怎么办?阻塞?

最佳答案

1。无限次调用回调

我们可以像这样将它转换为 Observable(以 RxJava 2 为例):

Observable<Message> source = Observable.create(emitter -> {
        MessageCallback callback = message -> emitter.onNext(message);
        libraryObject.setCallback(callback);
        Schedulers.io().scheduleDirect(libraryObject::start);
        emitter.setCancellable(() -> libraryObject.removeCallback(callback));
    })
    .share(); // make it hot

share 使这个可观察的热门,即多个订阅者将共享一个订阅,即最多有一个回调注册到 libraryObject

我使用 io 调度程序来安排从后台线程进行的 start 调用,因此它不会延迟第一次订阅。

2。单条消息回调

这也是很常见的场景。假设我们有以下回调式异步方法:

libraryObject.requestDataAsync(Some parameters, MessageCallback callback);

然后我们可以像这样将它转换为 Observable(以 RxJava 2 为例):

Observable<Message> makeRequest(parameters) {
    return Observable.create(emitter -> {
        libraryObject.requestDataAsync(parameters, message -> {
            emitter.onNext(message);
            emitter.onComplete();
        });
    });
}

关于java - 如何将基于回调的 API 转换为基于 Observable 的 API?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29040346/

相关文章:

java - 如何从数组创建 Observable?

rx-java - 多个可观察量链接在一起以完成

javascript - 如何在 SonarQube 4.4 的技术债务计算中忽略文件/文件夹?

java - 发送记录并等待其确认接收

java - Android、服务和线程。为什么线程不开始运行直到其他执行结束?

java - 如何在rxjava中使用toMap()获取索引

java - Maven 的 pom.xml 中的 pluginManagement 是什么?

java - 如何将信息从一个对象传递到另一个对象

java - Java排列程序详解

c++ - 并发读/写 OpenMp 中的共享变量