java - Spring Integration 有状态 TCP 连接

标签 java spring sockets tcp spring-integration

我正在尝试使用 org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory 创建一个 TCP 客户端。当工厂创建 TCP 连接时,我需要向服务器发送一些数据,例如授权。服务器发送了一些数据作为响应(例如 salt)。在下一个请求中,需要向服务器发送salt。 Pool 中所有连接的salt 必须不同,我认为Connection 必须自己存储salt

所以,我的实现是...
Spring Beans XML:

<int-ip:tcp-connection-factory id="client"
                                   type="client"
                                   host="localhost"
                                   port="12345"
                                   interceptor-factory-chain="customInterceptorFactory"
                                   mapper="mapper" />

    <bean id="customInterceptorFactory" class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
        <property name="interceptors">
            <bean class="ru.example.gateway.StatefulTcpConnectionFactory" />
        </property>
    </bean>
    <bean id="cachedClient" class="org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory">
        <constructor-arg ref="client" />
        <constructor-arg value="5" />
    </bean>

    <int:channel id="clientRequestChannel"/>

    <int-ip:tcp-outbound-gateway id="clientCrLf"
                                connection-factory="cachedClient"
                                request-channel="clientRequestChannel"/>

    <int:converter ref="byteArrayToStringConverter" />
    <bean id="byteArrayToStringConverter" class="ru.example.gateway.ByteArrayToStringConverter">
        <property name="charset" value="windows-1251" />
    </bean>
    <bean id="mapper" class="org.springframework.integration.ip.tcp.connection.TcpMessageMapper">
        <property name="charset" value="windows-1251" />
    </bean>

TCP连接拦截器:

package ru.example.gateway;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.serializer.Serializer;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@Getter
public class StatefulTcpConnection extends TcpConnectionInterceptorSupport {

    private volatile String nextSalt = "";
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final CountDownLatch latch = new CountDownLatch(1);

    public StatefulTcpConnection() {
    }

    public StatefulTcpConnection(ApplicationEventPublisher applicationEventPublisher) {
        super(applicationEventPublisher);
    }

    @Override
    public void send(Message<?> message) throws Exception {
        if (!initialized.get()) {
            throw new Exception("Connection not initialized");
        }
        Message<?> newMessage = message;
        Object payload = message.getPayload();
        log.debug("Send Payload({})", payload.getClass());
        if (payload instanceof String) {
            new GenericMessage<String>((String) payload + ";Salt=" + nextSalt + ";", message.getHeaders());
        }
        super.send(newMessage);
    }

    /**
     * Invoke initialize after connection wrapped in ConnectionFactory4Interceptors
     * @see org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory#initializeConnection(TcpConnectionSupport, Socket)
     * @param serializer
     */
    @Override
    public void setSerializer(Serializer<?> serializer) {
        super.setSerializer(serializer);
        try {
            initialize();
        } catch (Exception e) {
            throw new RuntimeException("Connection couldn't initialize", e);
        }
    }

    private void initialize() throws Exception {
        GenericMessage<String> loginMessage = new GenericMessage<>("Login");
        super.send(loginMessage);
        log.debug("Waiting initializing response");
        latch.await(10, TimeUnit.SECONDS);
    }

    @Override
    public boolean onMessage(Message<?> message) {
        Object messagePayload = message.getPayload();
        log.debug("onMessage Payload({})", messagePayload.getClass());
        messagePayload = new String((byte[]) messagePayload);
        if (messagePayload instanceof String) {
            String payload = (String) messagePayload,
                salt;
            int saltStart = payload.indexOf(";Salt=");
            if (saltStart >= 0) {
                int saltEnd = payload.indexOf(";", saltStart + 1);
                if (saltEnd >= 0) {
                    salt = payload.substring(saltStart + 6, saltEnd);
                } else {
                    salt = payload.substring(saltStart + 6);
                }
                nextSalt = salt;
            }
            if (!initialized.get()) {
                initialized.set(true);
                latch.countDown();
                log.debug("Initializing complete.");
            }
        }
        return super.onMessage(message);
    }

    @Override
    public String getConnectionId() {
        return "Stateful:" + super.getConnectionId();
    }

    @Override
    public String toString() {
        return getConnectionId();
    }

}

网关:

package ru.example.gateway;

import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway(name = "clientOutGateway",
    defaultRequestChannel = "clientRequestChannel"
)
public interface StringGateway {

    String send(String message);

}

和 Spring MVC 的服务:

package ru.example.service.impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import ru.example.gateway.StringGateway;
import ru.example.service.MessageService;

@Service
public class MessageServiceImpl implements MessageService {

    @Autowired
    private StringGateway clientGateway;

    public String getMessage(String code) {
        return clientGateway.send(code);
    }

}

我有一些错误。在来自 MessageServiceImpl 的第一个请求中,我获得了对“身份验证请求”的响应,并且需要响应丢失,或者抛出异常,或者在日志中显示错误消息:

ERROR org.springframework.integration.ip.tcp.TcpOutboundGateway - Cannot correlate response - no pending reply for Cached:Stateful:localhost:12345:56060:6993fc83-7e69-4f18-9300-8553e6d74a4f

有人有状态连接的解决方案吗?

谢谢! p.s.:对不起我的英语

最佳答案

框架目前没有提供状态连接管理。您可以为连接工厂编写一个包装器,将连接存储在 ThreadLocal 中,但是当您完成连接时,您需要一些机制来清理(并释放它)。

如果只是简单的握手,您可以使用连接拦截器;有一个测试用例(参见 HelloWorldInterceptor )但它有点复杂。

我认为自定义连接工厂包装器会更简单。

关于java - Spring Integration 有状态 TCP 连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35794457/

相关文章:

java - Servlet页面显示404错误,不出现

Spring ApplicationListener在webapp上被触发了两次

c++ - 文件传输 - 数据包回合

java - Spring Boot firebase admin sdk 设置 - 无法解析符号

java - 我的转换方法全错了,我不知道如何解决

spring - 通过 Flux<DataBuffer> 和 DataBufferUtils.write 通过输出流将非常大的文件下载到 Azure Blob 存储时出现问题(

php - 无法在PHP中与fsockopen连接(连接超时)

c# - C#中如何通过套接字发送文件

java - For循环不执行文件写入命令

java - 如何通过带有 spring data rest 的备用键公开实体