我有一个 Python TCP Socket 服务器服务:
- 一次仅允许一个客户端连接;
- 其输入流/输出流独立运行。
另一方面,我有一个使用 Spring Integration 的 Java Spring Boot 客户端应用程序。我实际的 TCP 套接字配置器 实现用途:
@MessagingGateway(defaultRequestChannel = REQUEST_CHANNEL, errorChannel = ERROR_CHANNEL)
public interface ClientGtw {
Future<Response> send(Request request);
}
@Bean
@ServiceActivator(inputChannel = REQUEST_CHANNEL)
public MessageHandler outboundGateway(TcpNioClientConnectionFactory connectionFactory) {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(connectionFactory);
gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
gateway.setRemoteTimeout(TimeUnit.SECONDS.toMillis(timeout));
return gateway;
}
@Bean
public TcpNioClientConnectionFactory clientConnectionFactory(AppConfig config) {
Host host = getHost(config);
TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(host.name, host.port);
factory.setSingleUse(false);
factory.setSoTimeout((int) TimeUnit.SECONDS.toMillis(timeout));
SerializerDeserializer sd = new SerializerDeserializer();
factory.setDeserializer(sd);
factory.setSerializer(sd);
return factory;
}
这种实际方法工作正常,但是,当发送请求时,它会挂起连接,直到收到响应。这是一个问题,因为有时一个请求可能需要太多时间来接收响应,并且系统有其他请求传入,它们的响应可以更快地实现。我想独立地发送和接收尽可能多的请求和响应(它们之间解耦)。传输的对象(序列化和反序列化)包含可以进行正确关联的 key 对。
TL;DR:如何通过同一连接实现异步请求/响应?
Spring TcpOutboundGateway javadoc 提到:针对该用例使用一对出站/入站适配器。
因此,除了上面的声明之外:
第一次尝试
@Bean
public TcpInboundGateway inboundGateway(AbstractServerConnectionFactory connectionFactory) {
TcpInboundGateway gateway = new TcpInboundGateway();
gateway.setConnectionFactory(connectionFactory);
gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
return gateway;
}
@Bean
public AbstractServerConnectionFactory serverFactory(AppConfig config) {
Host host = getHost(config);
AbstractServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(host.port);
connectionFactory.setSingleUse(true);
connectionFactory.setSoTimeout(timeout);
return connectionFactory;
}
The requests are blocked until a response is delivered as before.
第二次尝试
@Bean
public TcpInboundGateway inboundGateway(TcpNioClientConnectionFactory connectionFactory) {
TcpInboundGateway gateway = new TcpInboundGateway();
gateway.setConnectionFactory(connectionFactory);
gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
gateway.setClientMode(true);
return gateway;
}
org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory may only be used by one inbound adapter
有什么线索吗?
最佳答案
使用一对 channel 适配器而不是出站网关。您可以在应用程序中自行进行关联,而不是使用 MessagingGateway
,也可以使用 tcp-client-server-multiplex sample app 中使用的相同技术。 。它使用聚合器将出站消息的副本与入站消息聚合起来,回复网关。
它很旧,并且使用 XML 配置,但应用相同的技术。
<publish-subscribe-channel id="input" />
<ip:tcp-outbound-channel-adapter id="outAdapter.client"
order="2"
channel="input"
connection-factory="client" /> <!-- Collaborator -->
<!-- Also send a copy to the custom aggregator for correlation and
so this message's replyChannel will be transferred to the
aggregated message.
The order ensures this gets to the aggregator first -->
<bridge input-channel="input" output-channel="toAggregator.client"
order="1"/>
<!-- Asynch receive reply -->
<ip:tcp-inbound-channel-adapter id="inAdapter.client"
channel="toAggregator.client"
connection-factory="client" /> <!-- Collaborator -->
<!-- dataType attribute invokes the conversion service, if necessary -->
<channel id="toAggregator.client" datatype="java.lang.String" />
<aggregator input-channel="toAggregator.client"
output-channel="toTransformer.client"
expire-groups-upon-completion="true"
expire-groups-upon-timeout="true"
discard-channel="noResponseChannel"
group-timeout="1000"
correlation-strategy-expression="payload.substring(0,3)"
release-strategy-expression="size() == 2" />
<channel id="noResponseChannel" />
<service-activator input-channel="noResponseChannel" ref="echoService" method="noResponse" />
<transformer input-channel="toTransformer.client"
expression="payload.get(1)"/> <!-- The response is always second -->
(这个简单的示例与前 3 个字节相关)。
关于java - Spring Integration - 如何通过同一连接实现异步 TCP 套接字请求/响应?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44887847/