java - 如何在项目reactor中设置阻塞异步请求/响应?

标签 java project-reactor

我正在与 ANT+ USB 棒连接,并用项目 react 器替换我自己天真的“MessageBus”,因为它看起来非常合适。
USB接口(interface)本质上是异步的(单独的输入/输出管道),我想以阻塞方式处理一组请求/响应消息。

我已经设置了一个单独的线程,它不断地从 USB 管道读取消息并将它们写入接收器,该接收器提供任何人都可以订阅的共享 Flux。这似乎工作正常。

目前,我向 USB 管道发送一条消息,然后在共享通量上使用 .filter() 和 .blockFirst():(人为代码)

    /**
     * Puts a message on the Usb  out Pipe and waits for the relevant asynchronous response on the {@link AntUsbReader#antMessages()} {@link Flux}
     *
     * @param message Message to send.
     * @return related response message.
     */

    public AntMessage sendBlocking(AntBlockingMessage message) {
        send(message); // in essence, calls usbOutPipe.syncSubmit(message.getBytes()), returns void
        // bug: ant dongle can reply to message even before following Flux is activated, meaning .blockFirst() goes in timeout.
        return this.antUsbMessageReader.antMessages() // .antMessages() is an (infinite)  Flux<AntMessage>
                .filter(antMessage -> antMessage.getMessageId() == message.getMessageId())
                .blockFirst(Duration.ofSeconds(10));
    }

问题在于,即使在激活 Flux 之前,U 盘也可以响应,从而导致 TimeoutException。
向 USB 读卡器添加 Thread.sleep(10) 可以“解决”该问题,但是实现这种阻塞行为的正确方法是什么?

  • 设置订阅(使用 .take(1))、发送消息然后阻止订阅?
  • 设置一个 Flux 来完成发送和等待正确响应?

我想不通……

最佳答案

我找到了一个可行的解决方案,但我不确定它是否是最好的:

我设置了一个 Mono 来发送异步消息,并将其与一个过滤匹配消息的 Flux 合并。 看到 Mono 从不发出值,我知道合并中的第一个对象是来自 Flux 的响应消息,因此我可以将其转换为正确的类型。

这仍然感觉有点脏,但话又说回来,尝试使用用于异步工作的框架来获得阻塞行为总是会感觉有点脏......

    public AntMessage sendBlocking(AntBlockingMessage requestMessage) {
        Flux<AntMessage> response = this.antUsbReader.antMessages()
                .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))
                .take(1);

        Mono<Void> messageSender = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage));
        return (AntMessage) Flux.merge(response, messageSender).blockFirst(Duration.ofSeconds(1));
    }

    private boolean isMatchingResponse(AntBlockingMessage message, AntMessage response) {
        if (message instanceof RequestMessage) {
            return response.getMessageId() == ((RequestMessage) message).getMsgIdRequested();
        }
        return response.getMessageId() == message.getMessageId();
    }

关于java - 如何在项目reactor中设置阻塞异步请求/响应?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57294242/

相关文章:

java - 如何使用 JavaFX MediaPlayer 多次播放同一音频剪辑

java - 项目 react 堆通量的并发处理

java - android json数据到spring mvc4

Java 8 streams/maps/filters 动态修改或删除列表元素

java - 字符串文字的引用相等性

java - Apache Flink 测试中是否有像 Reactor 和 RxJava 中那样的虚拟时间概念

java - Spring Reactive - 重用Mono值

apache-kafka - 如何在 Webflux 应用程序中制作 Spring Cloud Stream 消费者?

spring - 为什么默认配置的spring webflux中没有异常堆栈跟踪?

java - 没有为集成测试运行 testcontainers 的 Localstack 模块