java - 为 Observable 提供值(value)

标签 java rx-java

我是 rxjava 新手,遇到以下问题:

外部系统会不定期地将对象放入 FIFO 队列中。我需要一个每秒运行一次的 Observable,从队列中获取一个项目(如果有的话)并将其发送给订阅者。

两个问题:

  • 队列项是在 Observable 处于 Activity 状态时生成的,不可能预先提供所有项。队列可能会空,在这种情况下,Observable 必须等待并且不发出任何东西。 (如果 Observable 在暂停后队列中的某个项目变得可用时立即启动,那就太好了,但是如果我们不想更频繁地轮询,那么队列可能也需要是一个 Observable,不想法如何。)

  • 外部系统必须能够完成 Observable。我可以设置一个变量并从 Observable 中读取它,但我想知道是否有更优雅的方法来做到这一点。

    LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue
    boolean stopObservable = false; // the variable to stop the observable
    
    Observable.create(new Observable.OnSubscribe<Layer>() {
    
        @Override public void call(Subscriber<? super Layer> subscriber) {
            try {
                if (!queue.isEmpty()) {
                    Layer layer = queue.poll();
                    subscriber.onNext(layer);
                } else {
                    if (stopObservable) { subscriber.onCompleted(); }
                }
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }
    
    }).somethingThatCreatesTheInterval().subscribeOnEtc.
    

对于间隔,我不能使用 .sample(),因为它会丢弃项目,并且发出所有项目很重要。

.throttleWithTimeout() 看起来更好,但它似乎也会丢弃项目。

rx 非常酷,但很难入门。任何意见表示赞赏。

最佳答案

当我需要定期轮询外部 Web 服务时,我做了类似的事情。

  1. 对于时间间隔,您可以继续使用 timer ;在每个以 1 秒为单位的刻度上,可观察链将轮询并可能选择一层,如果该层为空,则不会发出任何内容

    Observable.timer(0, 1, TimeUnit.SECOND)
        .flatMap(tick -> Observable.just(queue.poll()).filter(layer -> layer != null))
        .subscribe(layer -> System.out.format("The layer is : %s",  layer));
    
  2. 现在,如果您想中止整个链,您可以添加 takeUntil 。因此,当您的外部系统想要停止时,它将在 stopObservable 中提交一些内容,这将停止后续订阅:

    // somewhere before
    PublishSubject stopNotifier = PublishSubject.create();
    
    // somewhere process the queue
    Observable.timer(0, 1, TimeUnit.SECOND)
        .takeUntil(stopNotifier)
        .flatMap(tick -> Observable.just(queue.poll()))
        .subscribe(layer -> System.out.format("The layer is : %s",  layer));
    
    // when not anymore interested (calling onComplete works too)
    stopNotifier.onNext("cancel everything about the queue");
    

我是通过平板电脑写此回复,因此您可能会认为我可能拼错了一些单词或犯了幼稚的编程错误;)

关于java - 为 Observable 提供值(value),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26080646/

相关文章:

java - 不在for循环中执行第二个条件

java - 收到错误 Invalid Oracle URL specified : OracleDataSource. makeURL

java - 如何在android中的opengl中实现撤销功能

java - ConcatMap 没有选项并跳过错误 - RxJava

android - RxJava/RxAndroid : continue range loop after error

java - 在用户输入的字符串中搜索 int

java - 获取父类(super class)的超接口(interface)参数的类

java - RxJava 谓词过滤器

java - 如何在 RxJava 中不重复相同的操作?

rx-java - 如何在 RxJava 中延迟 Observable 发射