java - 如何正确管理 Reactor 中的可关闭资源

标签 java project-reactor

我有一个 http 客户端和执行器,当所有工作完成后应该将其关闭。

我正在尝试按照此处针对 RxJava 1.x 描述的方式使用 Flux.using 方法: https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter08/ResourceManagement.java

我的资源创建方法:

public static Flux<GithubClient> createResource(String token,
                                                int connectionCount) {

    return Flux.using(
            () -> {
                logger.info(Thread.currentThread().getName() + " : Created and started the client.");
                return new GithubClient(token, connectionCount);
            },
            client -> {
                logger.info(Thread.currentThread().getName() + " : About to create Observable.");
                return Flux.just(client);
            },
            client -> {
                logger.info(Thread.currentThread().getName() + " : Closing the client.");
                client.close();
            },
            false
    ).doOnSubscribe(subscription -> logger.info("subscribed"));
}

然后我使用:

Flux<StateMutator> dataMutators = GithubClient.createResource(
            config.getAccessToken(),
            config.getConnectionCount())
            .flatMap(client -> client.loadRepository(organization, repository)

问题是客户端连接在发出第一个请求之前就已关闭。

[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : Created and started the client.
[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : About to create Observable.
[main] INFO com.sapho.services.githubpublic.client.GithubClient - subscribed
[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : Closing the client.

java.lang.IllegalStateException: Client instance has been closed.

at jersey.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
at org.glassfish.jersey.client.JerseyClient.checkNotClosed(JerseyClient.java:273)

没有找到任何 Reactor 的示例。

谢谢

最佳答案

我再次阅读了使用文档,发现了我的错误。使用 return Flux.just(client); 返回客户端没有意义,因为 Flux 会立即终止,从而触发客户端关闭。

我最终实现了:

public static Flux<StateMutator> createAndExecute(GithubPublicConfiguration config,
                                                  Function<GithubClient, Flux<StateMutator>> toExecute) {

    return Flux.using(
            () -> {
                logger.debug(Thread.currentThread().getName() + " : Created and started the client.");
                return new GithubClient(entityModelHandler, config.getAccessToken(), config.getConnectionCount());
            },
            client -> toExecute.apply(client),
            client -> {
                logger.debug(Thread.currentThread().getName() + " : Closing the client.");
                client.close();
            },
            false
    );
}

然后我调用它:

GithubClient.createAndExecute(config,
            client -> client.loadRepository(organization, repository))

现在所有操作都按适当的顺序进行。

关于java - 如何正确管理 Reactor 中的可关闭资源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42037552/

相关文章:

java - FlatMap a Flux 未执行

java - 无法使用 ClassLoader 加载枚举类

java - 以编程方式下载 JRE?

java - 使用 EasyMock 进行单元测试时出现意外的方法调用

spring-webflux - 我什么时候使用 Mono.flatMapIterable 和 Mono.flapMapMany?

project-reactor - 如何并行处理 Flux 事件?

java - 来自相同包和不同包的 protected 方法调用之间的区别

java - spring form taglib disabled 属性真的必须解析为字符串吗?

java - 如何在 Spring Web MVC Controller 中利用非阻塞请求

spring-boot - 如何在 Spring WebFlux 中记录请求和响应主体