我正在尝试配置一个 TCP 套接字,以在不同的消息中接收格式为 name,value
的数据。这些消息平均每秒到达,有时更快,有时更慢。
我能够设置一个工作配置,但我对 Spring Integration 中实际发生的情况缺乏基本的了解。
我的配置文件如下所示:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService,
@Value("${tcp.socket.server.port}") final int port
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
)
.autoStartup(true)
.outputChannel(queueChannel())
).transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller()
{
return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
}
@Bean
public MessageChannel queueChannel()
{
return MessageChannels.queue("queue", 50).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
CSVProcessingService
看起来像这样(缩写):
@Slf4j
@Service
public class CSVProcessingService
{
@ServiceActivator
public void process(final String message)
{
log.debug("DATA RECEIVED: \n" + message);
final CsvMapper csvMapper = new CsvMapper();
final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);
if (StringUtils.contains(message, StringUtils.LF))
{
processMultiLineInput(message, csvMapper, csvSchema);
}
else
{
processSingleLineInput(message, csvMapper, csvSchema);
}
}
}
我对此配置的目标如下:
- 在配置的端口上接收消息
- 承受更高的负载而不丢失消息
- 反序列化消息
- 将它们放入队列 channel
- (最好还记录错误)
- 每 50 毫秒轮询一次队列 channel ,并将消息从队列 channel 传递到
ObjectToStringTransformer
- 在转换器之后,转换后的消息将传递到
CSVProcessingService
进行进一步处理
我是否正确实现了所有这些目标,还是因为误解了 Spring Integration 而犯了错误?是否可以以某种方式将 Poller 和 @ServiceActivator 结合起来?
此外,我在可视化配置的 IntegrationFlow 实际上如何“流动”时遇到问题,也许有人可以帮助我更好地理解这一点。
编辑:
在 Artems 发表评论后,我重新设计了我的配置。现在看起来像这样:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Value("${tcp.socket.server.port}") int port;
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
tcpNioServer()
)
.autoStartup(true)
.errorChannel(errorChannel())
)
.transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean
public AbstractServerConnectionFactory tcpNioServer()
{
return Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
.taskExecutor(
new ThreadPoolExecutor(0, 20,
30L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new DefaultThreadFactory("TCP-POOL"))
).get();
}
@Bean
public MessageChannel errorChannel()
{
return MessageChannels.direct("errors").get();
}
@Bean
public IntegrationFlow errorHandling()
{
return IntegrationFlows.from(errorChannel()).log(LoggingHandler.Level.DEBUG).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
我还从 CSVProcessingService#process
方法中删除了 @ServiceActivator
注释。
最佳答案
不确定是什么让您感到困惑,但您的配置和逻辑看起来不错。
您可能会忽略这样一个事实:中间不需要 QueueChannel
,因为 AbstractConnectionFactory.processNioSelections()
已经是多线程的并且它会调度任务从套接字读取消息。因此,您只需为Tcp.nioServer()
配置合适的Executor
即可。尽管默认情况下它是一个 Executors.newCachedThreadPool() 。
另一方面,使用内存中的QueueChannel
,您确实可能会丢失消息,因为它们已经从网络中读取。
当您执行 Java DSL 时,您应该考虑在端点上使用 poller()
选项。如果您有 inputChannel
属性,则 @Poller
将在 @ServiceActivator
上工作,但在 handle() 中使用相同的属性
将覆盖该 inputChannel
,因此您的 @Poller
将不会被应用。不要因混合 Java DSL 和注释配置而感到困惑!
您的配置中的其他一切都很好。
关于java - 如何使用 QueueChannel 和 ServiceActivator 正确配置 TCP inboundAdapter,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60721081/