我正在使用 webflux 进行小型概念验证。在我的应用程序的一部分中,我想与一个阻塞的数据库(通过 JDBC)进行通信,该数据库不适合 react 器。尽管如此,对于这个概念验证,我正在考虑以下技巧:
- 定义一个专用线程池(我们称它为
DBThreadPool
)为ExecutorService
具有等于 JDBC 连接池大小的固定线程数。将该池包装在 Reactor Scheduler 中(名为dbScheduller
) 包装阻塞调用(如 Reactor 文档中所述)并将其安排在
dbScheduller
上:public Flux<DbUser> allUsers() { return Mono.fromCallable(() -> <jdbcQueryHere>) .flatMapIterable(Function.identity()) .log("DB-OPER").subscribeOn(dbScheduller); }
查询完成后,我想立即处理返回的
Flux<DbUser>
使用一些 react 性运算符,但在容器线程 中执行它们,这样我就可以释放数据库线程。根据 Reactor 文档,可以通过publishOn
完成方法:public Mono<ServerResponse> allUsers(ServerRequest request) { return ServerResponse.ok() .contentType(APPLICATION_STREAM_JSON) .body( usersDao.allUsers() .publishOn(<netty Thread pool as scheduller>) .map(<some function>) .filter(<some predicate>), DbUser.class); }
您认为这是一种可行的方法吗(在我们获得数据库的非阻塞驱动程序之前)?
如何访问容器 (netty) 线程池,以便可以利用其中一个线程来完成(来自数据库的后处理数据并写入响应)HTTP 请求?
我知道我可以单独使用数据库线程(通过省略 publishOn
)来完成 HTTP 请求,但我想尽快释放它(这样它可以被另一个需要访问的请求重用DB) 并将其余工作(这可能很耗时)留给 netty 管理的线程(它可以是最初执行我的 handler method
的线程)。
最佳答案
Do you think this is a viable approach (until we get non-blocking drivers to databases)?
我个人确实认为这是一种有效的方法。 但是许多其他人(他们中的大多数人比我更了解响应式编程)怀疑这是一个好主意。
您的选择:
为您的阻塞数据库调用编写一个单独的服务并使用
WebClient
调用它Pro:阻塞和非阻塞代码的干净分离。
缺点:多一个服务边界,包括网络。
为您的阻塞数据库调用编写一个单独的服务,包括它自己的用户界面。
优点:没有服务间通信。
缺点:降低了实际响应式应用程序的百分比。
如果您想在此服务中保留阻塞代码,这是执行此操作的正确方法。
How to get access to container (netty) thread pool?
如果您只是呈现您的响应,则不需要,因为它会自动发生在 netty 线程上。
如果您有实际的转换要做,没有现成的东西可以做到这一点。我想可以使用一些 Netty API 并基于它实现一个 Scheduler
。但我对 Netty 的了解还不够,无法帮助解决这个问题。
为了回答我问自己的后续问题:
But isn't that a gigantic omission?
publishOn
方法并非真正用于此目的。他们的用例更多地用于例如。 Swing 应用程序,您在管道的末端需要使用特定线程或线程池。
我个人的感觉是 Reactor 应该有这样的东西,但开发人员目前并不认为这是一个好主意。所以我建议如果你认为它对你的应用程序真的有意义:open a ticket并准备好争论你的观点。要么你成功,要么我们都知道为什么这是个坏主意 :-)
Anything else?
很高兴您问到您是否在 react 性管道中的线程池上做阻塞的事情您应该考虑如何处理线程池(或相应资源)过载时的情况,即它处理事件的速度比它们得到的慢生产的。
关于java - 带有 Webflux 的 JDBC - 如何分派(dispatch)到容器线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46918303/