java - Spring Webflux "works"中的实现,但我试图理解 "why?"

标签 java functional-programming spring-webflux

“你正在思考命令式,第一行将被执行,然后是第二行,这在 webflux 中不是这种情况。你必须考虑事件回调。”

我同意这一评估(我有很多“势在必行”做事的经验),并希望人们能够帮助我纠正我如何看待解决方案空间。我发布了相同“功能”的三个不同版本,其中只有一个有效(并且,我愿意接受有关如何/应该修改该版本以更好地与响应式(Reactive)/功能实现保持一致的评论)。

在进行引用评估的人的指导/帮助下,我能够让“DemoPOJOHandler.add(ServerRequest)”工作。代码以及调试级输出如下所示。我注意到,在 HTTP POST "/v2/DemoPOJO" 和 *"Mapped to mil.navy..."行之后,有一个来自 reactor.netty 的条目。 Channel.FluxReceive 声明“正在订阅入站接收器..”。这似乎是我的其他两次尝试中所缺少的关键操作。

我的具体问题(尽管“很长”)是:

我“明白”认为语句#1将被执行,然后语句#2等是解决方案空间的“命令” View 。但是,在下面的示例中,这似乎就是正在发生的行为。 logger 语句在 08:38:34.217 执行,然后在 08:38:34.251 执行订阅,然后在 08:39:34.267 实例化 DemoPOJO,然后一切都“正常”。

但是, request.bodyToMono()... 中的链接 序列与命令式代码中的方法链接(例如,'Integer.toString().indexOf()') 并没有显着不同,除了 lambda(或, lambda 的存在是“事情发生变化”的原因吗?)。所以,如果 request.bodyToMono()... 序列理论上不需要“ .then() ”或“ .switchIfEmpty() ”,那为什么核心不需要request.bodyToMono()... 序列执行“ service.add(demoPOJO) ”?我知道 Mono 没有被订阅,但是为什么看起来需要链中的附加语句才能发生订阅并将 POJO 添加到存储库中?

此代码执行成功...

@Component
public class DemoPOJOHandler {

    private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");

        return request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> service.add(demoPOJO))
                                                 .then(ServerResponse.ok().build())
                                                 .switchIfEmpty(ServerResponse.badRequest()
                                                                              .contentType(MediaType.APPLICATION_JSON)
                                                                              .build());
    }
}


2019-07-25 08:38:34.144 DEBUG 11992 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkAccessible: true
2019-07-25 08:38:34.145 DEBUG 11992 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkBounds: true
2019-07-25 08:38:34.145 DEBUG 11992 --- [ctor-http-nio-2] i.n.util.ResourceLeakDetectorFactory     : Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@7a8a4d6a
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [id: 0xa2da3d98, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62644] New http connection, requesting read
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] New http connection, requesting read
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.channel.BootstrapHandlers  : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-2] reactor.netty.channel.BootstrapHandlers  : [id: 0xa2da3d98, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62644] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxCapacityPerThread: 4096
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxSharedCapacityFactor: 2
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.linkCapacity: 16
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.ratio: 8
2019-07-25 08:38:34.173 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Increasing pending responses, now 1
2019-07-25 08:38:34.173 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Handler is being applied: org.springframework.http.server.reactive.ReactorHttpHandlerAdapter@579c20c6
2019-07-25 08:38:34.195 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5f552130] HTTP POST "/v2/DemoPOJO"
2019-07-25 08:38:34.217 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.r.f.s.s.RouterFunctionMapping      : [5f552130] Mapped to mil.navy.demo.DemoPOJO.DemoPOJORouter$$Lambda$258/1123559518@22a8277c
2019-07-25 08:38:34.217 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOHandler   : DemoPOJOHandler.add( ServerRequest )
2019-07-25 08:38:34.251 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.channel.FluxReceive        : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.DemoPOJO( 666, foo_666, 10666 )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.toString()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] o.s.http.codec.json.Jackson2JsonDecoder  : [5f552130] Decoded [666 :: foo_666 :: 10666]
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOService   : DemoPOJOService.add( DemoPOJO )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJORepo      : DemoPOJORepo.add( DemoPOJO )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.getId()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.getId()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJORepo      : DemoPOJORepo.add( DemoPOJO ) -> adding for id 666
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.getId()
2019-07-25 08:38:34.272 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5f552130] Completed 200 OK
2019-07-25 08:38:34.273 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Last HTTP response frame
2019-07-25 08:38:34.273 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] No sendHeaders() called before complete, sending zero-length header
2019-07-25 08:38:34.274 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Decreasing pending responses, now 0
2019-07-25 08:38:34.275 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Last HTTP packet was sent, terminating the channel
2019-07-25 08:38:41.720 DEBUG 11992 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: (port 62610) connection closed
2019-07-25 08:38:41.720 DEBUG 11992 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: close connection

此代码执行“没有错误”,但从不订阅,因此从不执行链的“ doOnSuccess(...) ”部分。不过,好像应该是这样吧?使用“.then(”将单独的 return... 语句链接到 'request.bodyToMono(...)' 语句有什么“魔力”? ...)”?

@Component
public class DemoPOJOHandler {

    private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");

        request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()));
        return  ServerResponse.ok().build();
    }
}


2019-07-25 08:40:16.155 DEBUG 17064 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: (port 62661) connection closed
2019-07-25 08:40:16.155 DEBUG 17064 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: close connection
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkAccessible: true
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkBounds: true
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] i.n.util.ResourceLeakDetectorFactory     : Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@3860465a
2019-07-25 08:40:18.266 DEBUG 17064 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [id: 0x768a1f21, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62695] New http connection, requesting read
2019-07-25 08:40:18.266 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] New http connection, requesting read
2019-07-25 08:40:18.267 DEBUG 17064 --- [ctor-http-nio-3] reactor.netty.channel.BootstrapHandlers  : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:40:18.267 DEBUG 17064 --- [ctor-http-nio-2] reactor.netty.channel.BootstrapHandlers  : [id: 0x768a1f21, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62695] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxCapacityPerThread: 4096
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxSharedCapacityFactor: 2
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.linkCapacity: 16
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.ratio: 8
2019-07-25 08:40:18.285 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Increasing pending responses, now 1
2019-07-25 08:40:18.289 DEBUG 17064 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Handler is being applied: org.springframework.http.server.reactive.ReactorHttpHandlerAdapter@7fa4fcbc
2019-07-25 08:40:18.297 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [51900c31] HTTP POST "/v2/DemoPOJO"
2019-07-25 08:40:18.315 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.r.f.s.s.RouterFunctionMapping      : [51900c31] Mapped to mil.navy.demo.DemoPOJO.DemoPOJORouter$$Lambda$262/1446001495@27a07cfc
2019-07-25 08:40:18.316 DEBUG 17064 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOHandler   : DemoPOJOHandler.add( ServerRequest )
2019-07-25 08:40:18.358 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [51900c31] Completed 200 OK
2019-07-25 08:40:18.359 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Last HTTP response frame
2019-07-25 08:40:18.359 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] No sendHeaders() called before complete, sending zero-length header
2019-07-25 08:40:18.360 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Decreasing pending responses, now 0
2019-07-25 08:40:18.361 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Last HTTP packet was sent, terminating the channel
2019-07-25 08:40:18.366 DEBUG 17064 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] No ChannelOperation attached. Dropping: 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 0a 20 20 20 20 22 69 64 22 3a 20 36 36 36 2c |{.    "id": 666,|
|00000010| 0a 20 20 20 20 22 6e 61 6d 65 22 3a 20 22 66 6f |.    "name": "fo|
|00000020| 6f 5f 36 36 36 22 2c 0a 20 20 20 20 22 76 61 6c |o_666",.    "val|
|00000030| 75 65 22 3a 20 31 30 36 36 36 0a 7d             |ue": 10666.}    |
+--------+-------------------------------------------------+----------------+

这段代码会因 NPE 而崩溃。我失败的逻辑是“好吧,如果由于没有订阅 Mono”而没有发生“doOnSuccess(...)”,则“订阅”。显然,不是解决方案。不太明显(对我来说,在这个时间点)是“为什么?”。

@Component
public class DemoPOJOHandler {

    private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");

        request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()))
                                          .subscribe();
        return  ServerResponse.ok().build();
    }
}


reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
at mil.navy.demo.DemoPOJO.DemoPOJOHandler.lambda$add$2(DemoPOJOHandler.java:73) ~[classes/:na]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:311) [reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1743) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1743) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:155) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onComplete(FluxMapSignal.java:213) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:390) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:197) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:338) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:350) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:399) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.http.server.HttpServerOperations.cleanHandlerTerminate(HttpServerOperations.java:519) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.http.server.HttpTrafficHandler.operationComplete(HttpTrafficHandler.java:314) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]

    (... lots of stuff deleted to fit posting constraints ...)

2019-07-25 10:44:43.212 DEBUG 10544 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0xa62e89df, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:64515] No ChannelOperation attached. Dropping: 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 0a 20 20 20 20 22 69 64 22 3a 20 36 36 36 2c |{.    "id": 666,|
|00000010| 0a 20 20 20 20 22 6e 61 6d 65 22 3a 20 22 66 6f |.    "name": "fo|
|00000020| 6f 5f 36 36 36 22 2c 0a 20 20 20 20 22 76 61 6c |o_666",.    "val|
|00000030| 75 65 22 3a 20 31 30 36 36 36 0a 7d             |ue": 10666.}    |
+--------+-------------------------------------------------+----------------+
2019-07-25 10:45:08.112 DEBUG 10544 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(3)-169.254.211.161: (port 64485) connection closed
2019-07-25 10:45:08.112 DEBUG 10544 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(3)-169.254.211.161: close connection

最佳答案

由于我是您在顶部引用的那个人,因此我将尝试回答您的问题。

首先我们需要谈谈“非阻塞”。什么是“非阻塞”?非阻塞是基于事件的。底层服务器 Netty 并不为每个请求分配一个线程,而是使用事件链和事件队列。

因此,当有人订阅时,netty 将设置一个底层事件队列(某种程度上),其基本工作原理如下:

x <- y <- z

要得到 x,我们需要解析 y,但要得到 y,我们需要解析 z。这就是人们通常所说的此类编程中的“功能”部分。

当人们开始响应式编程时,我看到的最常见的错误之一是他们不明白 subscriber是召唤client 。您的 Spring 应用程序是 publisher以及每个client调用您的服务是 subscriber .

您永远不应该在您的应用程序中订阅

为什么您的发布应用程序要订阅自身?当你按照人们通常理解的方式解释时。

所以让我们看一下您的示例,我将以相反的顺序进行说明:

示例 3:

public Mono<ServerResponse> add(ServerRequest request) {
    logger.debug("DemoPOJOHandler.add( ServerRequest )");

    request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()))
                                      .subscribe();
    return  ServerResponse.ok().build();
}

这里我们以命令式的方式进入方法,我们给它请求,ServerRequest是一个具体的对象,但是只要你这样做bodyToMono您将返回 Mono<DemoPOJO>这又是一个包裹的 CompletableFuture里面有一个计算(获取请求中的主体并将其放入您的 dto 中)

一旦计算完成,Mono将进入success状态并触发链中之后的内容,因此 doOnSuccess会被触发。当doOnSuccess完成后会返回 Mono<Void> .

这就是你的问题所在,当 doOnSuccess完了,你subscribe 。所以你在这里要做的是,一旦有人发布 ServerRequest Netty(服务器)将在您的应用程序中建立一个事件链,并且在该事件链中应用程序将订阅自身。

这里的链是由应用程序订阅自身来完成的。所以应用程序是它自己的客户端。

示例 2:

public Mono<ServerResponse> add(ServerRequest request) {
    logger.debug("DemoPOJOHandler.add( ServerRequest )");

    request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()));
    return  ServerResponse.ok().build();
}

这里我们的做法与示例3相同,只是事件链建立时,将请求映射到DTO,然后我们在doOnSuccess中做一些事情但随后链条就断了。 doOnSuccess表示它已完成,但之后没有任何监听。

所以这里事件链被破坏了,它是不完整的。在您订阅之前什么都不会发生,但由于链条被破坏,没有人可以 subscribe因此什么都不会发生。

示例 1:

public Mono<ServerResponse> add(ServerRequest request) {
    logger.debug("DemoPOJOHandler.add( ServerRequest )");

    return request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> service.add(demoPOJO))
                                             .then(ServerResponse.ok().build())
                                             .switchIfEmpty(ServerResponse.badRequest()
                                                                          .contentType(MediaType.APPLICATION_JSON)
                                                                          .build());
}

到这里,链条就完成了。某事正在向某事发出信号,即正在向某事发出信号,当某件事完成时,下一个事件将触发,然后是下一个,下一个,下一个。

调用客户端发布数据,服务器建立事件链,链完成以便客户端订阅客户端订阅,然后事件链启动并触发所有回调并返回数据。

Flux<T>Mono<T>都是围绕 monad CompletableFuture<T> 的包装类。 Optional<T>Stream<T>也是 monad,monad 来自函数世界,比如编程语言 Haskell。了解它们如何工作的一个好方法是了解有关 monad 的更多信息。

如果你想总体了解有关 monad 的更多信息,我将厚颜无耻地插入我自己的文章:

Write a Monad, in Java, seriously?

我很好读的也是Intro to reactive programming我建议仔细阅读他们的所有示例。

关于java - Spring Webflux "works"中的实现,但我试图理解 "why?",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57209054/

相关文章:

java - Android 实例化和重写模式

java - Android Java - view1.setAlpha 处出现 NullpointerException?

java - 无法在 Maven 项目中创建 servlet

c# - c# 中的选项类型替代方案

scala 链接尝试使用需要 finally/close() 的托管资源

spring - 真的有必要将 Hystrix 与响应式(Reactive) spring boot 2 应用程序一起使用吗?

java - webflux和reactor的正确使用方法是什么

java - 如何在 java 中使用构造函数基初始化来初始化 WebClient?

java - 根据 FieldNamingPolicy 重命名 GSON 中的映射键

c++ - 作为成员变量的 lambda 函数崩溃