java - 如何使用 QueueChannel 和 ServiceActivator 正确配置 TCP inboundAdapter

标签 java spring spring-integration spring-integration-dsl

我正在尝试配置一个 TCP 套接字,以在不同的消息中接收格式为 name,value 的数据。这些消息平均每秒到达,有时更快,有时更慢。

我能够设置一个工作配置,但我对 Spring Integration 中实际发生的情况缺乏基本的了解。

我的配置文件如下所示:

@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
    @Bean
    public IntegrationFlow server(
        final CSVProcessingService csvProcessingService,
        @Value("${tcp.socket.server.port}") final int port
    )
    {
        return IntegrationFlows.from(
            Tcp.inboundAdapter(
                Tcp.nioServer(port)
                   .deserializer(serializer())
                   .leaveOpen(true)
            )
               .autoStartup(true)
               .outputChannel(queueChannel())
        ).transform(new ObjectToStringTransformer())
         .handle(csvProcessingService)
         .get();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller()
    {
        return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
    }

    @Bean
    public MessageChannel queueChannel()
    {
        return MessageChannels.queue("queue", 50).get();
    }

    @Bean
    public ByteArrayLfSerializer serializer()
    {
        final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();

        serializer.setMaxMessageSize(10240);

        return serializer;
    }
}

CSVProcessingService 看起来像这样(缩写):

@Slf4j
@Service
public class CSVProcessingService
{
    @ServiceActivator
    public void process(final String message)
    {
        log.debug("DATA RECEIVED: \n" + message);
        final CsvMapper csvMapper = new CsvMapper();
        final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);

        if (StringUtils.contains(message, StringUtils.LF))
        {
            processMultiLineInput(message, csvMapper, csvSchema);
        }
        else
        {
            processSingleLineInput(message, csvMapper, csvSchema);
        }
    }
}

我对此配置的目标如下:

  • 在配置的端口上接收消息
  • 承受更高的负载而不丢失消息
  • 反序列化消息
  • 将它们放入队列 channel
  • (最好还记录错误)
  • 每 50 毫秒轮询一次队列 channel ,并将消息从队列 channel 传递到 ObjectToStringTransformer
  • 在转换器之后,转换后的消息将传递到 CSVProcessingService 进行进一步处理

我是否正确实现了所有这些目标,还是因为误解了 Spring Integration 而犯了错误?是否可以以某种方式将 Poller 和 @ServiceActivator 结合起来?

此外,我在可视化配置的 IntegrationFlow 实际上如何“流动”时遇到问题,也许有人可以帮助我更好地理解这一点。

编辑:

在 Artems 发表评论后,我重新设计了我的配置。现在看起来像这样:

@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
    @Value("${tcp.socket.server.port}") int port;

    @Bean
    public IntegrationFlow server(
        final CSVProcessingService csvProcessingService
    )
    {
        return IntegrationFlows.from(
            Tcp.inboundAdapter(
                tcpNioServer()
            )
               .autoStartup(true)
               .errorChannel(errorChannel())
        )
         .transform(new ObjectToStringTransformer())
         .handle(csvProcessingService)
         .get();
    }

    @Bean
    public AbstractServerConnectionFactory tcpNioServer()
    {
        return Tcp.nioServer(port)
                  .deserializer(serializer())
                  .leaveOpen(true)
                  .taskExecutor(
                      new ThreadPoolExecutor(0, 20,
                                             30L, TimeUnit.SECONDS,
                                             new SynchronousQueue<>(),
                                             new DefaultThreadFactory("TCP-POOL"))
                  ).get();
    }

    @Bean
    public MessageChannel errorChannel()
    {
        return MessageChannels.direct("errors").get();
    }

    @Bean
    public IntegrationFlow errorHandling()
    {
        return IntegrationFlows.from(errorChannel()).log(LoggingHandler.Level.DEBUG).get();
    }

    @Bean
    public ByteArrayLfSerializer serializer()
    {
        final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();

        serializer.setMaxMessageSize(10240);

        return serializer;
    }
}

我还从 CSVProcessingService#process 方法中删除了 @ServiceActivator 注释。

最佳答案

不确定是什么让您感到困惑,但您的配置和逻辑看起来不错。

您可能会忽略这样一个事实:中间不需要 QueueChannel,因为 AbstractConnectionFactory.processNioSelections() 已经是多线程的并且它会调度任务从套接字读取消息。因此,您只需为Tcp.nioServer()配置合适的Executor即可。尽管默认情况下它是一个 Executors.newCachedThreadPool() 。

另一方面,使用内存中的QueueChannel,您确实可能会丢失消息,因为它们已经从网络中读取。

当您执行 Java DSL 时,您应该考虑在端点上使用 poller() 选项。如果您有 inputChannel 属性,则 @Poller 将在 @ServiceActivator 上工作,但在 handle() 中使用相同的属性 将覆盖该 inputChannel,因此您的 @Poller 将不会被应用。不要因混合 Java DSL 和注释配置而感到困惑!

您的配置中的其他一切都很好。

关于java - 如何使用 QueueChannel 和 ServiceActivator 正确配置 TCP inboundAdapter,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60721081/

相关文章:

java - 方法同步,但由于非序列化线程行为,代码产生随机结果

java - 使用 Spring MVC 通过 URL 访问静态方法

java - 在 Spring 4 中导入 Groovy Bean 定义?

java - 如何运行 Spring 与多线程集成

java - Spring Integration Mail - 将 XML 转换为 Java Config

java - 即使计算机进入休眠模式,应用程序也必须运行

java - 这种一对类的情况是否有设计模式?

java - 部署到 Tomcat 的 Spring boot WAR 和缺少静态资源的上下文

java - Spring 集成;在复制完成之前复制服务拾取的文件

java - 来电期间 Activity 生命周期的异常行为 : delayed onStop()