最近发现projectreactor.io对Publisher的支持不错:
Flux.create(fluxSink -> {
for (int i = 0; i < 10; i++)
fluxSink.next(i);
fluxSink.complete();
})
.map(...)
.subscribe(...);
处理器有什么好的支持吗? 我的意思是类似或类似的东西:
XXX process = new XXX((inputValue, output) -> {
if(inputValue == 0)
output.error();
else
output.next(inputValue);
});
publisher.subscribe(process);
process.subscribe(...);
如果没有,我该如何实现自己的,或者为什么我不能这样做?
更新 1:
经过讨论(见评论)看来,在我的用例中我需要使用 flatMap
(见答案),我的问题是处理器的良好实现我的意思是一些功能,如果它失败了我能够控制并发出错误。我认为 flatMap
会给你足够的功能。就我而言,我使用:
import org.jsoup.Jsoup;
Flux.just("url")
.flatMap(url -> {
try {
Document document = Jsoup.connect(url).get();
return Flux.just(document);
} catch (IOException e) {
return Flux.error(e);
}
})
.subscribe();
最佳答案
您可能正在寻找 SubmissionPublisher
看起来类似于 Flux
在 react 堆中的实现:
A
Flow.Publisher
that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using aSubmissionPublisher
allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.
注意:链接中共享了一个自定义示例Flow.Processor
,可以进一步自定义处理onError
和 consume
根据您的用例的需要实现方法。
关于reactive-programming - 对 java.util.concurrent.Flow.Processor<T,R> 的良好实现/支持,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47350392/