Spring 集成 TCP 服务器未接收消息

标签 spring spring-boot tcp spring-integration

我正在尝试创建一个 TCP 服务器,它在端口 5002 上接受来自外部程序的消息。但是,它没有从外部程序接收消息。

@Bean
public TcpReceivingChannelAdapter inbound(AbstractServerConnectionFactory cf) {
   TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
   adapter.setConnectionFactory(cf);
   adapter.setOutputChannel(tcpIn());
   return adapter;
 }

@Bean
public MessageChannel tcpIn() {
    return new DirectChannel();
}

@Bean
@Transformer(inputChannel = "tcpIn", outputChannel = "serviceChannel")
public ObjectToStringTransformer transformer() {
    return new ObjectToStringTransformer();
}

@ServiceActivator(inputChannel = "serviceChannel")
public void messageToService(String in) {
    // Message received
}

@Bean
public AbstractServerConnectionFactory serverConnectionFactory() {
    TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(5002);
    tcpNetServerConnectionFactory.setSoTimeout(5000);
    tcpNetServerConnectionFactory.setMapper(new TimeoutMapper());
    return tcpNetServerConnectionFactory;
}

为了验证我的 TCP 服务器是否正常工作,我像这样使用了 telnet,程序确实收到了文本“hello”。

telnet 192.168.1.2 5002
Trying 192.168.1.2...
Connected to 192.168.1.2.
Escape character is '^]'.
hello

设置 wireshark 我可以看到计算机正在端口 5002 上接收来自外部程序(我期望的)的消息。为什么我的程序无法接收这些消息?

enter image description here

最终解决方案更新:

由于有效载荷没有停止线,我必须按照@Artem Bilan 的描述实现我自己的反序列化器。我使用“~”字符来表示来自客户端的行结束。

@Bean
public AbstractServerConnectionFactory serverConnectionFactory() {
    TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(tcpPort);
    tcpNetServerConnectionFactory.setSoTimeout(0);
  tcpNetServerConnectionFactory.setDeserializer(endOfLineSerializer());
    tcpNetServerConnectionFactory.setSerializer(endOfLineSerializer());
    tcpNetServerConnectionFactory.setMapper(new TimeoutMapper());
    return tcpNetServerConnectionFactory;
}

我实现的示例序列化程序:

public class EndOfLineSerializer extends AbstractPooledBufferByteArraySerializer {

private static final char MANUAL_STOP_LINE = '~';
private static final char AUTO_STOP_LINE = '\t';
private static final byte[] CRLF = "\r\n".getBytes();

/**
 * Reads the data in the inputStream to a byte[]. Data must be terminated
 * by a single byte. Throws a {@link SoftEndOfStreamException} if the stream
 * is closed immediately after the terminator (i.e. no data is in the process of
 * being read).
 */
@Override
protected byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
    int n = 0;
    int bite;

    try {
        while (true) {

            try {
                bite = inputStream.read();
            } catch (SocketTimeoutException e) {
                bite = -1;
            }

            if (bite < 0) {
                // Payload complete
                break;
            }

            if ((n > 0 && bite == '\n' && buffer[n - 1] == '\r') || bite == this.MANUAL_STOP_LINE || bite == this.AUTO_STOP_LINE) {
                break;
            }

            buffer[n++] = (byte) bite;
            if (n >= this.maxMessageSize) {
                throw new IOException("Terminator not found before max message length: " + this.maxMessageSize);
            }
        }
        return copyToSizedArray(buffer, n);
    } catch (IOException e) {
        publishEvent(e, buffer, n);
        throw e;
    } catch (RuntimeException e) {
        publishEvent(e, buffer, n);
        throw e;
    }
}

/**
 * Writes the byte[] to the stream and appends the CRLF.
 */
@Override
public void serialize(byte[] bytes, OutputStream outputStream) throws IOException {
    outputStream.write(bytes);
    outputStream.write(this.CRLF);
    }
}

最佳答案

默认情况下,TcpNetServerConnectionFactory 使用 ByteArrayCrLfSerializer,这是消息定界符:

private static final byte[] CRLF = "\r\n".getBytes();

因此,您应该确保您的客户端发送的消息最后带有正确的符号。

有一堆开箱即用的序列化程序供您选择:

https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/ip.html#tcp-connection-factories

或者您可以实现自己的Deserializer 并注入(inject)到serverConnectionFactory bean 定义中。

关于Spring 集成 TCP 服务器未接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49040636/

相关文章:

java - Vaadin View 中的 Autowiring 服务和组件不工作

java - 如何批量读取重新投递队列?

spring - JPA Hibernate 无法将 java.lang.String 字段设置为 java.lang.String 异常

spring-boot - Spring Boot "application.properties"文件的语法是什么?

c# - 无法使用 C# TCP 服务器和 C++ TCP 客户端接收消息

c - 拆分 TCP 流上的 Winsock2 错误 10014

c++ - 为传入连接编写过滤器

spring - 如何使用 Spring 重试 PostgreSQL 可序列化事务?

java - `AnnotationConfigNonEmbeddedWebApplicationContext` 还没有刷新

java - 内存数据库中的 H2 在列名和我想要使用的值之间变得困惑