java - RxJava- 一个单一的热源 Observable?好主意还是坏主意?

标签 java reactive-programming rx-java

我很欣赏 RxJava 可以处理的所有复杂性,从处理用户事件到大型复杂的响应式(Reactive)算法结构。然而,我仍然在为源的开始而苦苦挣扎 Observable以及所有事件的来源,以及如何管理多个事件来源。

我知道这个问题的答案可能是“视情况而定”,但将源事件合并到单个 Subject 是否是个坏主意?还是某种形式的 Hot Observable?

例如,假设我有一个数据驱动的桌面应用程序。我有一个可枚举的标识根级事件类型。

public enum AppEvent { 
     TRANSACTIONS_REFRESH,
     CATEGORIES_REFRESH
}

然后我有一个 AppEventEngine包含 PublishSubject 的单例并提供一种发布事件和访问 Observable<AppEvent> 的方法.

public final class AppEventEngine {

    private static final AppEventEngine instance = new AppEventEngine();

    private final PublishSubject<AppEvent> eventBus = PublishSubject.create();

    public void post(AppEvent appEvent) { 
        eventBus.onNext(appEvent);
    }
    public Observable<AppEvent> getEvents(AppEvent appEvent) { 
        return eventBus.startWith(appEvent).filter(e -> e.equals(appEvent));
    }

    public static AppEventEngine get() { 
        return instance;
    }
    private AppEventEngine() {}
}

然后我可以发出 AppEvent.CATEGORIES_REFRESHCategoryManager并使用 RxJava-JDBC构建并发出一个新的 List<Category>来自查询。

public final class CategoryManager {

    private final Database db = //instantiate database;
    private final Observable<List<Category>> categories;

    private CategoryManager() { 
        categories = AppEventEngine.get().getEvents(AppEvent.CATEGORIES_REFRESH)
                .flatMap(e -> db.select("SELECT * FROM CATEGORY").get(rs -> new Category(rs.getInt("ID"), rs.getString("DESC"))).toList()).cache(1);
    }

    public Observable<List<Category>> getCategories() { 
        return categories;
    }
}

然后外部客户端类不仅可以转换和订阅 Observable<List<Category>>还要推送AppEvent.CATEGORIES_REFRESH随时发生的事件。

public final class SomeClientUX { 
    //gui code 

    public void categoryRefreshButtonPressed() {
        AppEventEngine.get().post(AppEvent.CATEGORIES_REFRESH);
    }
}

我的问题是...正在关注此 EventBus pattern ( much like that in Google Guava ) 是一种编写 RxJava 桌面应用程序的方式吗?或者我应该避免使用 PublishSubject并以不同的方式(以分散的方式)导出事件?

最佳答案

我认为这是一个有效的模式,并且是 PublishSubject 的常见用例。如果您希望事件从不同的线程推送到它(除非推送的事件之间存在正式的先行发生关系,但这是一种优化你可能不需要)。所以声明应该是:

private final Subject<AppEvent> eventBus =
    PublishSubject.create().toSerialized();

关于java - RxJava- 一个单一的热源 Observable?好主意还是坏主意?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32235209/

相关文章:

rx-java - 缓存最后发出的项目 RxJava Operator

c# - 如果中间有 linq 方法,rx 处理异常订阅

java - akka中的有序执行

java - 确保每个 Hashmap 桶/槽有一个值

java - 在构造函数中使用 "this instanceof …"或 "getClass()"是否安全?

java - RxJava 2.x : Should I use Flowable or Single/Completable?

r - 带有nearPoints()的动态ggplot图层 Shiny

java - Android RxJava Observable.interval() 不会停止发射项目

java - 使用 rxjava 使网络调用并行而不交错

Java、Android、Firebase 身份验证登录时抛出 NullPointerException