java - Spring Integration - 如何通过同一连接实现异步 TCP 套接字请求/响应?

标签 java spring sockets tcp spring-integration

我有一个 Python TCP Socket 服务器服务:

  • 一次仅允许一个客户端连接;
  • 其输入流/输出流独立运行。

另一方面,我有一个使用 Spring Integration 的 Java Spring Boot 客户端应用程序。我实际的 TCP 套接字配置器 实现用途:

@MessagingGateway(defaultRequestChannel = REQUEST_CHANNEL, errorChannel = ERROR_CHANNEL)
public interface ClientGtw {
    Future<Response> send(Request request);
}

@Bean
@ServiceActivator(inputChannel = REQUEST_CHANNEL)
public MessageHandler outboundGateway(TcpNioClientConnectionFactory connectionFactory) {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    gateway.setRemoteTimeout(TimeUnit.SECONDS.toMillis(timeout));
    return gateway;
}

@Bean
public TcpNioClientConnectionFactory clientConnectionFactory(AppConfig config) {    
    Host host = getHost(config);

    TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(host.name, host.port);
    factory.setSingleUse(false);
    factory.setSoTimeout((int) TimeUnit.SECONDS.toMillis(timeout));

    SerializerDeserializer sd = new SerializerDeserializer();
    factory.setDeserializer(sd);
    factory.setSerializer(sd);
    return factory;
}

这种实际方法工作正常,但是,当发送请求时,它会挂起连接,直到收到响应。这是一个问题,因为有时一个请求可能需要太多时间来接收响应,并且系统有其他请求传入,它们的响应可以更快地实现。我想独立地发送和接收尽可能多的请求和响应(它们之间解耦)。传输的对象(序列化和反序列化)包含可以进行正确关联的 key 对。

TL;DR:如何通过同一连接实现异步请求/响应?

Spring TcpOutboundGateway javadoc 提到:针对该用例使用一对出站/入站适配器。

因此,除了上面的声明之外:

第一次尝试

@Bean
public TcpInboundGateway inboundGateway(AbstractServerConnectionFactory connectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    return gateway;
}

@Bean
public AbstractServerConnectionFactory serverFactory(AppConfig config) {
    Host host = getHost(config);
    AbstractServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(host.port);
    connectionFactory.setSingleUse(true);
    connectionFactory.setSoTimeout(timeout);
    return connectionFactory;
}

The requests are blocked until a response is delivered as before.

第二次尝试

@Bean
public TcpInboundGateway inboundGateway(TcpNioClientConnectionFactory connectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    gateway.setClientMode(true);
    return gateway;
}

org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory may only be used by one inbound adapter

有什么线索吗?

最佳答案

使用一对 channel 适配器而不是出站网关。您可以在应用程序中自行进行关联,而不是使用 MessagingGateway,也可以使用 tcp-client-server-multiplex sample app 中使用的相同技术。 。它使用聚合器将出站消息的副本与入站消息聚合起来,回复网关。

它很旧,并且使用 XML 配置,但应用相同的技术。

<publish-subscribe-channel id="input" />

<ip:tcp-outbound-channel-adapter id="outAdapter.client"
    order="2"
    channel="input"
    connection-factory="client" /> <!-- Collaborator -->

<!-- Also send a copy to the custom aggregator for correlation and
     so this message's replyChannel will be transferred to the
     aggregated message.
     The order ensures this gets to the aggregator first -->
<bridge input-channel="input" output-channel="toAggregator.client"
        order="1"/>

<!-- Asynch receive reply -->
<ip:tcp-inbound-channel-adapter id="inAdapter.client"
    channel="toAggregator.client"
    connection-factory="client" /> <!-- Collaborator -->

<!-- dataType attribute invokes the conversion service, if necessary -->
<channel id="toAggregator.client" datatype="java.lang.String" />

<aggregator input-channel="toAggregator.client"
    output-channel="toTransformer.client"
    expire-groups-upon-completion="true"
    expire-groups-upon-timeout="true"
    discard-channel="noResponseChannel"
    group-timeout="1000"
    correlation-strategy-expression="payload.substring(0,3)"
    release-strategy-expression="size() == 2" />

<channel id="noResponseChannel" />

<service-activator input-channel="noResponseChannel" ref="echoService" method="noResponse" />

<transformer input-channel="toTransformer.client"
    expression="payload.get(1)"/> <!-- The response is always second -->

(这个简单的示例与前 3 个字节相关)。

关于java - Spring Integration - 如何通过同一连接实现异步 TCP 套接字请求/响应?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44887847/

相关文章:

java - 尽管未使用复合键,但仍收到错误 "Foreign key must have same number of columns as the referenced primary key"

spring - 如何为 Spring Boot/MVC 资源设置可识别的内容/MIME 类型?

java - 阻止 Spring Boot 测试命中 SpringBootApplication 类的 @PostContruct

sockets - 生成后,Tcp 套接字无法在 erlang 中的远程节点上工作,{错误,关闭}?

c++ - CZMQ : Getting started on Raspberry-Pi

Python 套接字 - 以 10 字节为单位发送字符串

java - 转发请求到非@RequestMapping

java - 当对象是数组的一部分时直接访问对象的属性

java - 常数时间等于

java - 为什么这条线交点代码不起作用?