reactive-programming - 对 java.util.concurrent.Flow.Processor<T,R> 的良好实现/支持

标签 reactive-programming java-9 project-reactor reactive-streams java-flow

最近发现projectreactor.io对Publisher的支持不错:

Flux.create(fluxSink -> {
           for (int i = 0; i < 10; i++)
            fluxSink.next(i);
           fluxSink.complete();
        })
                .map(...)
                .subscribe(...);

处理器有什么好的支持吗? 我的意思是类似或类似的东西:

XXX process = new XXX((inputValue, output) -> {
    if(inputValue == 0)
       output.error();
    else
       output.next(inputValue);
});

publisher.subscribe(process);  
process.subscribe(...);

如果没有,我该如何实现自己的,或者为什么我不能这样做?

更新 1:

经过讨论(见评论)看来,在我的用例中我需要使用 flatMap (见答案),我的问题是处理器的良好实现我的意思是一些功能,如果它失败了我能够控制并发出错误。我认为 flatMap 会给你足够的功能。就我而言,我使用:

        import org.jsoup.Jsoup;

        Flux.just("url")
            .flatMap(url -> {
                try {
                    Document document = Jsoup.connect(url).get();
                    return Flux.just(document);
                } catch (IOException e) {
                    return Flux.error(e);
                }
            })
            .subscribe();

最佳答案

您可能正在寻找 SubmissionPublisher 看起来类似于 Flux 在 react 堆中的实现:

A Flow.Publisher that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.

注意:链接中共享了一个自定义示例Flow.Processor,可以进一步自定义处理onErrorconsume 根据您的用例的需要实现方法。

关于reactive-programming - 对 java.util.concurrent.Flow.Processor<T,R> 的良好实现/支持,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47350392/

相关文章:

javascript - 在其派生链中调用 Observable 的 onNext() 方法

kotlin - 在 Kotlin 中使用 Reactor 分页 Gitlab API

java - 在 Flux 中使用 Mono 结果

java - Spring WebFlux Reactor - 更新 Flux 中的对象

java - 如何在 Play Framework (Java) v2.4.x 中提供响应式(Reactive) ByteChunk

java - RxJava : How to get all results AND errors from an Observable

classpath - Java 9 模块 : Can automatic Modules result in larger (full) projects?

java - 无法从 Mac 终端执行 jshell

java - Java 中的可选 orElse 可选

rx-java - Java Spring WebFlux 与 RxJava