java - 有没有办法在 Spring RSocket 中查看 REQUEST_N 交换

标签 java rsocket spring-rsocket

我正在使用 Spring 对 RSocket 的支持,特别是请求流模型。 IE。:

@MessageMapping("stream")
Flux<SubscriptionMessage> stream(final SubscriptionMessage request, @AuthenticationPrincipal UserDetails user) {
    //....
}
如果我正确理解 RSocket,Flux 响应将作为一系列返回给客户端
根据 request(n) 约定加载消息。例如。一次有 n 条有效载荷消息。在每一系列消息之后,客户端使用 REQUEST_N 消息向服务器发送一个附加集,这为背压缓解提供了基础。
在Java库的API(org.springframework.messaging.rsocket,它基于io.rsocket.RSocket)中,有没有办法在REQUEST_N消息到达时处理/访问它们,或者显式设置N的值通过策略(并查看请求者传递给服务器的值)?
原因/原因:我正在 Kafka 上实现一个 rsocket 外观,并尝试为订阅提供一个请求流机制,该机制将在消息被消费时支持自动偏移提交。
对于请求流约定,我认为请求者偶尔的 REQUEST_N 交互
成为可以推进主题提交偏移量的理想点,因为它是通过
requestor 表示已经收到响应者前面发送的 Payload 消息。
我见过的唯一其他选择是使用请求 channel 模型,以便请求者可以发送
和初始订阅请求,并开始接收数据,也发送
周期消息来专门控制同一 channel 上的提交偏移量。无论如何,我正在考虑提供那个,
但想知道是否有办法将逻辑注入(inject)流的周期性请求(n)周期。

最佳答案

react 器的默认值曾经是无限的或取消。您可以使用 take 或 limitRate 等内置运算符来自定义请求 n 行为。
https://github.com/making/rsc/blob/50d0c3dc43c60d3b3fe42e5586df49adb016a9cb/src/main/java/am/ik/rsocket/InteractionModel.java#L41-L52
或者实现自己的操作符,或者使用自定义订阅/订阅逻辑来精确控制。这涉及更多,因此请先尝试一下并展示您正在尝试做的事情的示例。
请参阅 https://projectreactor.io/docs/core/release/reference/#_on_backpressure_and_ways_to_reshape_requests 等文档

关于java - 有没有办法在 Spring RSocket 中查看 REQUEST_N 交换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63838053/

相关文章:

java - 我无法连接到 IBM MQ 的 QM ERROR 2035 MQRC_NOT_AUTHORIZED

spring-boot - rsocket - 如何平衡负载

spring-boot - 使用 RSocket-Java for Spring Rsocket Server 的 rsocket 路由元数据

java.lang.ClassNotFoundException : com/microsoft/sqlserver/jdbc/SQLServerDriver

java - 方法中的异常错误未使用异常

Java输入输出二进制文件字节大小转换后不匹配

rsocket - RSocket Fire and Forget 情况下服务器未接收数据

spring - RSocket 和 Spring 不处理多个请求

Java RSocket 客户端与 Spring RSocket channel 连接