spring - PayloadTypeRouter 直接发送到 Transformer 中,中间没有 channel ?

标签 spring spring-boot spring-integration messaging enterprise-integration

我需要使用 PayloadTypeRouter 并希望将路由消息直接发送到 TransformerFilterServiceActivator.所有内容均应使用 Kotlin DSL(或 Java DSL)进行配置。

目前,代码的一个漂亮部分如下所示:

    @Bean
    fun routeAzureC2DMessage() = integrationFlow {
        channel(IoTHubChannelNames.IOTHUB_TO_DEVICE_CHANNEL)
        route<IotHubC2DRequestMessage<IotHubC2DRequest>> {
            when (it.payload) {
                is IotHubDesiredPropertyUpdate -> IoTHubChannelNames.DESIRED_PROPERTY_UPDATE
                is IotHubMessageToDevice -> IoTHubChannelNames.MESSAGE_TO_DEVICE
            }
        }
    }

然后继续(路线的一侧)

    @Bean
    fun processMessageToDevice() = integrationFlow {
        channel(IoTHubChannelNames.MESSAGE_TO_DEVICE)
        filter(StructureFilter())
        transform(MessageTransformer())
        channel(SharedChannelNames.CLOUD2DEVICE)
    }

我想删除不需要的 channel IoTHubChannelNames.MESSAGE_TO_DEVICE。我尝试了几种方法,在项目的另一部分我发现了类似的东西(Java DSL)

IntegrationFlows
            .from(channelName)
            .route({ message: IotHubMessage -> message.javaClass }) { router: RouterSpec<Class<*>?, MethodInvokingRouter?> ->
                router
                    .subFlowMapping(DeviceToCloudMessage::class.java) {
                        it.handle(gateway, "handleD2CMessage")
                    }
                    .subFlowMapping(DeviceTwinUpdateMessage::class.java) {
                        it.handle(gateway, "handleDeviceTwinReportedProperty")
                    }
            }
            .get()

subFlowMapping 是摆脱之间 channel 的唯一方法吗?我想要一个解决方案,我仍然可以使用 when (it.payload) ,然后可以返回新的 integrationFlow 或某种其他形式的 Flow,而不是 channel / channel 名称定义。

最佳答案

目前唯一的解决方案实际上是通过 API:

inline fun <reified P, T> route(
        crossinline function: (P) -> T,
        crossinline configurer: KotlinRouterSpec<T, MethodInvokingRouter>.() -> Unit) {

目前不支持您使用 when(...) is 语法询问的内容。

请随意就此事提出 GH 问题,并尽可能多地分享从 Kotlin 角度来看它是什么以及如何在 Spring Integration DSL 中使用它的详细信息。

更新

另一方面,当前的 Kotlin 支持还不错:

            route<Int, Boolean>({ it % 2 == 0 }) {
                subFlowMapping(true) { handle<Int> { p, _ -> p * 2 } }
                subFlowMapping(false) { handle<Int> { p, _ -> p * 3 } }
            }

因此,route()方法的参数是when(),而subFlowMapping()is code> 与 -> 输出作为 integrationFlow 构建器结果。因此,我们可能不会追求 Kotlin when() ,它不会给我们带来太多 yield ,除非我们失去 subFlowMapping 而支持 -> 运算符...

更新2

经过更多思考并寻找可能的解决方案后,我必须撤回对这个 when() 类功能请求的请求。

主要问题是 IntegrationFlow 及其所有配置必须在使用前预先在应用程序上下文中注册。您在原始路由器函数中使用 when() 询问的内容并不是框架可以为您检测和处理的内容。该函数不是框架的一部分,不对产生的结果负责。

好吧,我们可以检查 IntegrationFlow 返回来决定如何调用,但不能保证您将从该函数返回的流是已注册的 bean。当我们真正注册它并尝试在这样的函数中使用时,这与我们迄今为止从该函数返回的 channel 及其在某个流 bean 中的映射没有什么区别。

我们可以在一些为此指令设计的例如 subFlowMapping() 中自动将 IntegrationFlow 注册为 bean。无论如何,它只在配置阶段完成一次。但当最终用户代码在运行时返回流时,这样做就不太好了。最好返回 channel 或其他一些我们有现有流映射的键。

我个人更喜欢不要忽略 MessageChannel 抽象,并在需要在不同流之间分配逻辑时使用它。当单个流程中的代码是线性的并且代表单个逻辑工作单元时,它看起来更清晰。不过,其他流程可能会在其他逻辑中重用。我只需要指向来自其他地方的那些流输入 channel !

尽管如此,我的主要观点是:在向其发送消息之前,我们必须注册和 IntegrationFlow

关于spring - PayloadTypeRouter 直接发送到 Transformer 中,中间没有 channel ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66123354/

相关文章:

spring-boot - Date 的默认序列化格式是否随着最近的 Spring Boot 版本/Jackson 版本而改变?

java - 确定正确的 Spring Boot 启动器

java - Spring 启动应用程序 : Negative value of http response time

java - Spring Integration LoggingHandler 中的 NPE

spring - http出站网关的拦截器

java - Spring security REST api 自定义HTTP基本身份验证

java - @ExceptionHandler 没有捕获 HttpMessageNotReadableException

java - 使用Spring的JMS命名空间时监听器容器的id是多少?

java - POST 时传递到持久化的分离实体

Spring 5 Web 响应式(Reactive)编程 - 从流数据的 Spring 响应式(Reactive) Controller 解码 JSON 时出现 WebClient ClassCastException