java - RxJava 缓冲 - 忽略零项

标签 java rx-java reactive-programming reactivex

这是我用于缓冲和转换传入事件的代码:

public Publisher<Collection<EventTO>> logs(String eventId) {
    ConnectableObservable<Event> connectableObservable = eventsObservable
        .share().publish();
    connectableObservable.connect();

    connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
        .filter(event -> event.getId().equals(eventId))
        .buffer(1, TimeUnit.SECONDS, 50)
        .map(eventsMapper::mapCollection);
}

这里的问题是 Flowable 每秒返回一个空列表,尽管没有事件发布到 eventsObservable

有没有一种方法可以保存 .buffer 直到至少有一个对象?

注意: 看起来有一种方法可以在 C# 中做到这一点(此处描述: https://stackoverflow.com/a/30090185/668148 )。 但是用 Java 怎样才能做到呢?

最佳答案

根据 Mark Keen 的建议,.distinctUntilChanged 可以解决问题。

因此,如果缓冲后有 1+ 个项目,以下代码将推送事件列表:

connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
    .filter(event -> event.getId().equals(eventId))
    .buffer(1, TimeUnit.SECONDS, 50)
    .distinctUntilChanged()             // <<<======  
    .map(eventsMapper::mapCollection);

关于java - RxJava 缓冲 - 忽略零项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54271359/

相关文章:

java - Lombok:等于子类中的字符串实例变量

java - 尝试使用 Spring Boot 显示自定义错误页面时出现问题

android - RxJava - 链接请求和更新 UI

java - java中如何让相机朝它所面对的方向移动?

java - Unix 时间戳的时间转换

android - 来自数组的 ReplySubject

java - Vert.x 从 vertx.executeBlocking 做出 promise

javascript - 创建 Cycle.js 可重用模块

facebook - 网络关闭时如何处理 React-Native 中的网络故障

c# - Reactive 的 "Buffer until quiet"行为?