java - 禁用 Flowable/Observable 缓冲区

标签 java java-8 rx-java reactive-programming rx-java2

我始终无法从使用 combineLatest 运算符获得的组合中获取最后一个值。

我有 2 个热流 (a, b) 生成高频事件(每 100 毫秒一个事件):

Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value);
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value);

结合 combineLatest,它需要将近 300 毫秒才能完成它的工作。

Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB,        OrderBookCouple::new).observeOn(Schedulers.newThread());
combined.subscribe((bookCouple) -> {
                System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp());
                System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp());
                Thread.sleep(300);
            }

combiner 执行一次后,我想处理生成的事件的最后一个组合,意思是 (lastA, lastB)。

组合流的默认行为是将所有事件组合缓存在它自己的缓冲区中,以便组合流接收非常旧的组合并且这个时间间隔正在爆炸。

我应该如何更改我的代码以禁用此缓冲区并始终接收最后的组合?

最佳答案

您可以在 flowAflowB 上应用 onBackpressureLatest 并使用 overload of combineLatest让您指定预取量。

Flowable.combineLatest(
    Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
    a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]),
    1
)
.onBackpressureLatest()
.observeOn(Schedulers.newThread(), false, 1)

不幸的是,没有同时采用 BiFunction 和 bufferSize 的重载,因此您必须恢复为转换数组元素。

编辑

应用第二个 onBackpressureLatest 并限制 observeOn 上的缓冲区大小应该让您更接近所需的模式,尽管 combineLatest 不是针对这个用例。您可能需要一些多样本运算符。

关于java - 禁用 Flowable/Observable 缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45028277/

相关文章:

android - NoClassDefFoundError : rx. subscriptions.CompositeSubscription

android - 为什么 RxJava 的 "subscribe"方法被多次调用?

java - RXJava2。我是否需要处理一次发出的流? (单例,也许)

java - Java &&中的Bittorrent实现需要有关群行为的一些信息

java - 将 main 中创建的实例传递给 Java 中的类

java - java 8中的复杂比较器

javascript - 如何在 Nashorn 中追踪 ReferenceError

Java 8 CompletableFuture - 如何在同一输入上运行多个函数

java - 如何将 fetchMap() 与 RecordMapper 一起使用?

java - 具有多个服务器的客户端-服务器架构