编辑:
根据德米特里·伊克里雅诺夫(Dmitry Ikryanov)的建议,
使用DisposableObserver会编译,但是会导致崩溃
io.reactivex.exceptions.ProtocolViolationException: It is not allowed to
subscribe with a(n) com.DataManager$theObserver$1 multiple times. Please
create a fresh instance of com.DataManager$theObserver$1 and subscribe that
to the target source instead.
subecribWith()的唯一代码,仅被调用一次
fun initSession() {
if (mDisposable != null && mDisposable!!.isDisposed) {
mDisposable!!.dispose()
}
mDisposable = RxBus.listen(DataEvent::class.java).subscribeWith(theObserver) <=== crash at here
}
DisposableObserver是该类的成员变量:
var theObserver: DisposableObserver<DataEvent> = object : DisposableObserver<DataEvent>() {
override fun onComplete() {
Log.e(TAG, "onComplete: All Done!") }
override fun onNext(t: DataEvent) {
Log.e(TAG, "Next: " + t)
onDataReady(t) }
override fun onError(e: Throwable) {
Log.e(TAG, "onError: ")
}
}
===
原始问题:
试图在kotlin中使用RxJava subscription(),得到一个错误
“Type mismatch. Required: Disposable? Found: Unit”
,不知道这是什么意思,有人知道吗?class DataEvent {}
使用RxBus
object RxBus {
private val publisher = PublishSubject.create<Any>()
fun publish(event: Any) {
publisher.onNext(event)
}
// Listen should return an Observable and not the publisher
// Using ofType we filter only events that match that class type
fun <T> listen(eventType: Class<T>): Observable<T> = publisher.ofType(eventType)
}
当这样的电话,没关系:
mDisposable = RxBus.listen(DataEvent::class.java).subscribe({
onDataReady(it)
})
但是当使用定义的
RxBus.listen(DataEvent::class.java).subscribe(observer)
实例调用observer
时它显示红色下划线:“类型不匹配。需要:一次性?找到:单位”
mDisposable = RxBus.listen(DataEvent::class.java).subscribe(observer)
观察者是:
var observer: Observer<DataEvent> = object : Observer<DataEvent> {
override fun onSubscribe(d: Disposable) {
Log.e(TAG, "onSubscribe: ")
}
override fun onNext(@NonNull t: DataEvent) {
Log.e(TAG, "onNext: " + t)
onDataReady(t)
}
override fun onError(e: Throwable) {
Log.e(TAG, "onError: ")
}
override fun onComplete() {
Log.e(TAG, "onComplete: All Done!")
}
}
最佳答案
这是因为在RxJava 2.0方法中subscribe(observer)
已更改,但未返回任何内容。
Unlike the Observable of version 1.x, subscribe(Observer) does not allow external cancellation of a subscription and the Observer instance is expected to expose such capability.
您可以使用
subscribeWith(observer)
。例:
val disposable = Observable.just("Hello world!")
.delay(1, TimeUnit.SECONDS)
.subscribeWith(object : DisposableObserver<String>() {
public override fun onStart() {
println("Start!")
}
fun onNext(t: Int?) {
println(t)
}
override fun onError(t: Throwable) {
t.printStackTrace()
}
override fun onComplete() {
println("Done!")
}
})
关于kotlin - kotlin,在subscribe()中使用观察者对象实例时得到了 “Type mismatch. Required: Disposable? Found: Unit”,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48790926/