java - RxJava SerializedObserver 实现

标签 java concurrency rx-java

阅读时this David Karnok 撰写的有关 RxJava 内部结构的文章 我遇到了一个类似于 RxJava 的 SerializedObserver 类的实现示例。这是代码:

class ValueListEmitterLoop<T> {
    List<T> queue;                           
    boolean emitting;
    Consumer<? super T> consumer;

    public void emit(T value) {
        synchronized (this) {
            if (emitting) {
                List<T> q = queue;
                if (q == null) {
                    q = new ArrayList<>();   
                    queue = q;
                }
                q.add(value);
                return;
            }
            emitting = true;
        }
        consumer.accept(value);              
        for (;;) {
             List<T> q;
             synchronized (this) {           
                 q = queue;
                 if (q == null) {            
                     emitting = false;
                     return;
                 }
                 queue = null;               
             }
             q.forEach(consumer);            
        }        
    }
}

所以问题是为什么在第一个 synchronized block 中引入内部变量 q ?我在第二个 synchronized block 中清楚地看到了其背后的推理。我是否有任何原因不只是使用:

if (queue == null) {
    queue = new ArrayList<>();
}
queue.add(value);

最佳答案

我发现将字段读入局部变量是一个很好的做法,特别是当它们被多次使用并且附近有一些 volatile /同步访问时。

例如,以下是常见模式:

volatile boolean cancelled;

final Queue<T> queue;

final Subscriber<? super T> actual;

void drain() {
    Subscriber<? super T> a = actual;
    Queue<T> q = queue;

    for (;;) {
        if (cancelled) {
            return;
        }

        T v = q.poll();

        if (v == null) {
             a.onComplete();
             return;
        }

        a.onNext(v);
    }
}

如果aq是字段访问,处理器/JVM将不得不一直从内存中读回它们,因为取消了 volatile 访问 以及 poll() 中类似的原子。

关于java - RxJava SerializedObserver 实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39730639/

相关文章:

java - 当我使用 : int cannot be converted to String, this 时出现错误 : timer = new Timer(1000,);

rx-java - Retrofit2+RxJava2,无效 token ,retryWhen()重新订阅时如何更新流

java - 配置 Play !在多个数据库环境中使用特定数据库的 2 个模型

c++ - 非成员函数锁的使用方法

concurrency - future 永远不会解决并兑现 promise

c++ - C++中有没有办法同时在多个线程上调用join?

使用 Retrofit2 调用 Android 单元测试 API

java - Java RX 中可通过缓冲区和多个值更新进行观察?

java - Android cocos2d如何对CCscene进行截图?

java - 编写与 Python 接口(interface)的非常快速的抽象的语言选择?