我有这段代码在 Rx Java 1 中包装一个回调,它编译得很好,但现在我已经切换到 RX Java 2,它不编译...Rx Java 2 中的等价物是什么?
return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() {
@Override
public void call(AsyncEmitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new AsyncEmitter.Cancellable() {
@Override
public void cancel() throws Exception {
transObs.cleanTransferListener();
}
});
}
}, AsyncEmitter.BackpressureMode.BUFFER);
更新:
我想到了这个,但是你是否必须处理背压,因为它是一个 oncreate 调用?
return Observable.create(new ObservableOnSubscribe<List<DigitsUser>>() {
@Override
public void subscribe(final ObservableEmitter<List<DigitsUser>> emitter) throws Exception {
mDigitFriends.findFriends((gotEm, users) -> {
emitter.onNext(users);
});
emitter.setCancellable(() -> {
emitter.onNext(null);
});
}
});
最佳答案
如果您担心背压,您应该使用 Flowable 类。这是来自 RxJava2 Wiki 的引述:
Practically, the 1.x fromEmitter (formerly fromAsync) has been renamed to Flowable.create.
这是您使用 Flowable 类的示例:
return Flowable.create(new FlowableEmitter<List<DigitsUser>>() {
@Override
public void subscribe(final FlowableEmitter<List<DigitsUser>> emitter) throws Exception {
mDigitFriends.findFriends((gotEm, users) -> {
emitter.onNext(users);
});
emitter.setCancellable(() -> {
emitter.onNext(null);
});
}
}, BackpressureStrategy.BUFFER);
关于java - 接收Java 2 : How to wrap a callback?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43232994/