kotlin - kotlin,在subscribe()中使用观察者对象实例时得到了 “Type mismatch. Required: Disposable? Found: Unit”

标签 kotlin rx-java2 subscribe

编辑:

根据德米特里·伊克里雅诺夫(Dmitry Ikryanov)的建议,
使用DisposableObserver会编译,但是会导致崩溃

io.reactivex.exceptions.ProtocolViolationException: It is not allowed to 
subscribe with a(n) com.DataManager$theObserver$1 multiple times. Please 
create a fresh instance of com.DataManager$theObserver$1 and subscribe that 
to the target source instead.

subecribWith()的唯一代码,仅被调用一次
fun initSession() {
    if (mDisposable != null && mDisposable!!.isDisposed) {
        mDisposable!!.dispose()
    }

    mDisposable = RxBus.listen(DataEvent::class.java).subscribeWith(theObserver)  <=== crash at here
}

DisposableObserver是该类的成员变量:
var theObserver: DisposableObserver<DataEvent> = object : DisposableObserver<DataEvent>() {
    override fun onComplete() {
        Log.e(TAG, "onComplete: All Done!")        }

    override fun onNext(t: DataEvent) {
        Log.e(TAG, "Next: " + t)
        onDataReady(t)        }

    override fun onError(e: Throwable) {
        Log.e(TAG, "onError: ")
    }
}

===

原始问题:

试图在kotlin中使用RxJava subscription(),得到一个错误“Type mismatch. Required: Disposable? Found: Unit”,不知道这是什么意思,有人知道吗?
class DataEvent {}

使用RxBus
object RxBus {

private val publisher = PublishSubject.create<Any>()

fun publish(event: Any) {
    publisher.onNext(event)
}

// Listen should return an Observable and not the publisher
// Using ofType we filter only events that match that class type
fun <T> listen(eventType: Class<T>): Observable<T> = publisher.ofType(eventType)

}

当这样的电话,没关系:
mDisposable = RxBus.listen(DataEvent::class.java).subscribe({
        onDataReady(it)
    })

但是当使用定义的RxBus.listen(DataEvent::class.java).subscribe(observer)实例调用observer
它显示红色下划线:“类型不匹配。需要:一次性?找到:单位”
mDisposable = RxBus.listen(DataEvent::class.java).subscribe(observer)

观察者是:
var observer: Observer<DataEvent> = object : Observer<DataEvent> {
    override fun onSubscribe(d: Disposable) {
        Log.e(TAG, "onSubscribe: ")
    }

    override fun onNext(@NonNull t: DataEvent) {
        Log.e(TAG, "onNext: " + t)
        onDataReady(t)
    }

    override fun onError(e: Throwable) {
        Log.e(TAG, "onError: ")
    }

    override fun onComplete() {
        Log.e(TAG, "onComplete: All Done!")
     }
}

最佳答案

这是因为在RxJava 2.0方法中subscribe(observer)已更改,但未返回任何内容。

Unlike the Observable of version 1.x, subscribe(Observer) does not allow external cancellation of a subscription and the Observer instance is expected to expose such capability.



您可以使用 subscribeWith(observer)
例:
val disposable = Observable.just("Hello world!")
                .delay(1, TimeUnit.SECONDS)
                .subscribeWith(object : DisposableObserver<String>() {
                    public override fun onStart() {
                        println("Start!")
                    }

                    fun onNext(t: Int?) {
                        println(t)
                    }

                    override fun onError(t: Throwable) {
                        t.printStackTrace()
                    }

                    override fun onComplete() {
                        println("Done!")
                    }
                })

关于kotlin - kotlin,在subscribe()中使用观察者对象实例时得到了 “Type mismatch. Required: Disposable? Found: Unit”,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48790926/

相关文章:

java - 如何使用 RxJava2 过滤列表并收集其索引到字符串结果?

java - RxJava 和 RxJava 2 可以共存于同一个 Android 项目中吗?

java - 如何将 rxJava2 的 Observable 转换为 Completable?

angular - 似乎无法在 Observable 上使用 'first()'(Angular 中的 rxjs)

java - 简单的邮件列表

android - 为什么 notifyDataSetChanged 会炸毁 ViewPager2 中的 PagerTransformer?

java - 如何正确加密用 jackson 编写的 JSON 文件

python - 在 python 中使用 paho mqtt 处理收到的消息

android - 撰写 UI 测试 - 如何断言文本颜色?

android - 在android q中如何获取android文档目录的路径?