我是 rxjava 新手,遇到以下问题:
外部系统会不定期地将对象放入 FIFO 队列中。我需要一个每秒运行一次的 Observable,从队列中获取一个项目(如果有的话)并将其发送给订阅者。
两个问题:
队列项是在 Observable 处于 Activity 状态时生成的,不可能预先提供所有项。队列可能会空,在这种情况下,Observable 必须等待并且不发出任何东西。 (如果 Observable 在暂停后队列中的某个项目变得可用时立即启动,那就太好了,但是如果我们不想更频繁地轮询,那么队列可能也需要是一个 Observable,不想法如何。)
外部系统必须能够完成 Observable。我可以设置一个变量并从 Observable 中读取它,但我想知道是否有更优雅的方法来做到这一点。
LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue boolean stopObservable = false; // the variable to stop the observable Observable.create(new Observable.OnSubscribe<Layer>() { @Override public void call(Subscriber<? super Layer> subscriber) { try { if (!queue.isEmpty()) { Layer layer = queue.poll(); subscriber.onNext(layer); } else { if (stopObservable) { subscriber.onCompleted(); } } } catch (Exception e) { subscriber.onError(e); } } }).somethingThatCreatesTheInterval().subscribeOnEtc.
对于间隔,我不能使用 .sample(),因为它会丢弃项目,并且发出所有项目很重要。
.throttleWithTimeout() 看起来更好,但它似乎也会丢弃项目。
rx 非常酷,但很难入门。任何意见表示赞赏。
最佳答案
当我需要定期轮询外部 Web 服务时,我做了类似的事情。
对于时间间隔,您可以继续使用
timer
;在每个以 1 秒为单位的刻度上,可观察链将轮询并可能选择一层,如果该层为空,则不会发出任何内容Observable.timer(0, 1, TimeUnit.SECOND) .flatMap(tick -> Observable.just(queue.poll()).filter(layer -> layer != null)) .subscribe(layer -> System.out.format("The layer is : %s", layer));
现在,如果您想中止整个链,您可以添加
takeUntil
。因此,当您的外部系统想要停止时,它将在stopObservable
中提交一些内容,这将停止后续订阅:// somewhere before PublishSubject stopNotifier = PublishSubject.create(); // somewhere process the queue Observable.timer(0, 1, TimeUnit.SECOND) .takeUntil(stopNotifier) .flatMap(tick -> Observable.just(queue.poll())) .subscribe(layer -> System.out.format("The layer is : %s", layer)); // when not anymore interested (calling onComplete works too) stopNotifier.onNext("cancel everything about the queue");
我是通过平板电脑写此回复,因此您可能会认为我可能拼错了一些单词或犯了幼稚的编程错误;)
关于java - 为 Observable 提供值(value),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26080646/