我一直在我们的代码库中尝试 rxJava,主要是希望添加并发性来提高性能。然而,当我使用 rxJava 时,似乎存在开销/启动成本问题。在下面的示例中,在“doRx()”中,在触发 getAllElements() 之前需要大约 130 毫秒,而在“doOld”中,在触发 getAllElements() 之前需要 0 毫秒。有什么解释为什么我最初在 doRx() 中损失了 130 毫秒吗?
这是我使用 System.currentTimeMillis() 进行的日志记录。 () 是从 init() 开始所耗时。
现有实现
- (0) 2016-10-11T13:34:07.060: OldImpl: init()
- (0) 2016-10-11T13:34:07.060: OldImpl:调用 getAllElements()
- (327) 2016-10-11T13:34:07.387:OldImpl:已收到 getAllElements()
RX 实现
- (0) 2016-10-11T13:34:07.703: RxImpl: init()
- (160) 2016-10-11T13:34:07.863:RxImpl:调用 getAllElements()
- (392) 2016-10-11T13:34:08.095: RxImpl: 收到getAllElements()
代码背后的原因是我首先想要收集所有元素,然后并行运行它们(在 h2 下),因为这样我们可以节省时间,因为有很多后端调用。我用过this blog作为此设置的指导。
public List<Element> doRx() {
List<Element> elements = new ArrayList<>();
Observable
.from(getAllElements())
.flatMap(
s -> Observable
.just(Element::new)
.subscribeOn(Schedulers.io())
.flatMap(
e -> {
List<Element> elements = new ArrayList<>();
for (SubElement se : e.getSubElements()) {
elements.add(se);
}
return Observable.from(elements);
}
)
)
.flatMap(
h1 -> Observable
.just(h1)
.subscribeOn(Schedulers.computation())
.flatMap(
h2 -> {
// Do additional things in parallell on all elements
return Observable
.just(h2);
}
)
)
.toBlocking()
.getIterator()
.forEachRemaining(myList::add);
return elements;
}
public List<Element> doOld() {
List<Element> elements = getAllElements();
for (Element e : elements) {
// Do stuff, same as under h2
}
return elements;
}
最佳答案
如果我理解你的代码正确,它相当于以下内容:
public List<Element> doRx() {
return Observable
.from(getAllElements())
.flatMap(element -> Observable
.just(new Element(element))
.subscribeOn(Schedulers.io())
.flatMaplIterable(e -> e.getSubElements())
)
.observeOn(Schedulers.computation())
.doOnNext(element -> {
// Do additional things in parallell on all elements
})
.toList()
.toBlocking()
.single();
}
与顺序版本相比,每个元素至少有 2 个上下文切换。你的时间安排得怎么样? X 运行,忽略最大和最小的数字?
关于java - rxJava 中的开销,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39952088/