java - Backpressure 是如何在 RxJava 内部发生的

标签 java multithreading rx-java backpressure

我一直在阅读一些关于 RxJava 背压的文档,但是我找不到详细的解释,比如它是如何在库内部发生的,每个人都只是总结说“生产者”太快而“消费者”太慢。

例如像下面的代码:

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

我一直在研究 RxJava 源代码,所以我的理解是在主线程中我们将每毫秒发出一次事件,一旦我们发出它,我们将值传递给 System.out.println(i) 方法并抛出它进入 newThead 调度程序的线程池并在 runnable 中运行该方法。

所以我的问题是,异常是如何在内部发生的?因为当我们调用 Thread.sleep() 时,我们只是在 hibernate 处理方法调用的线程 -> System.out.println() 而不会影响线程池中的其他线程,为什么会导致异常。是因为线程池没有足够的可用线程了吗?

谢谢

最佳答案

您可以将背压视为一个运算符(operator)分发给其上游源的许可系统:您可以给我 128 个元素。稍后,该运算符(operator)可能会说“好吧,再给我 96 个”,因此总共可能有 224 个未完成的许可证。一些来源,例如 interval 不关心许可,只是定期分发值。由于许可的数量通常与队列或缓冲区中的可用容量密切相关,因此分发超过这些存储容量的容量会产生 MissingBackpressureException

检测背压违规主要发生在对有界队列的 offer 返回 false 时,例如 observeOn 中的那个表示队列已满。

检测违规的第二种方法是跟踪运算符(operator)中未完成的许可计数,例如 onBackpressureDrop,只要上游发送超过此数量,运算符(operator)就不会转发它:

// in onBackpressureDrop
public void onNext(T value) {
    if (emitted != availablePermits) {
        emitted++;
        child.onNext(value);
    } else {
        // ignoring this value
    }
}

子订阅者通过 request() 发出许可信号,这通常会在 onBackpressureDrop 中导致类似这样的结果:

public void childRequested(long n) {
    availablePermits += n;
}

在实践中,由于可能的异步执行,availablePermits 是一个AtomicLong(并且被称为requested)。

关于java - Backpressure 是如何在 RxJava 内部发生的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38915089/

相关文章:

通过 Catch block 处理 Java 异常

c# - 使用任务的经典永无止境的线程循环?

java - JTextArea 无法在多线程中工作 - java

c++ - 互斥锁是否会锁定自身,或者有问题的内存位置?

java - 当我从 solr 查询时,发生了一个常见的异常,告诉我未定义的字段 userId

java - 简单的字符串错误

android - AsyncTask 中的可观察/订阅者

AndroidObservable 从不在主线程上观察

java - retryWhen 后未调用 doOnCompleted

java - 如何使用 Spring Data Mongo DB 对类进行建模来存储具有不同长度字段和类型的嵌套 JSON 文档