我将遗留的 tcp 服务器代码迁移到 spring-boot 并添加了 spring-intergration(基于注释)依赖项来处理 tcp 套接字连接。
我的入站 channel 是 tcpIn() ,出站 channel 是 serviceChannel() 并且我创建了一个自定义 channel [exceptionEventChannel() ] 来保存异常事件消息。
我有一个自定义的序列化器/反序列化器方法(ByteArrayLengthPrefixSerializer() extends AbstractPooledBufferByteArraySerializer),以及一个 MessageHandler @ServiceActivator 方法来将响应发送回 tcp 客户端。
//SpringBoot 2.0.3.RELEASE, Spring Integration 5.0.6.RELEASE
package com.test.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.TcpDeserializationExceptionEvent;
import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import java.io.IOException;
@Configuration
@IntegrationComponentScan
public class TcpConfiguration {
@SuppressWarnings("unused")
@Value("${tcp.connection.port}")
private int tcpPort;
@Bean
TcpConnectionEventListener customerTcpListener() {
return new TcpConnectionEventListener();
}
@Bean
public MessageChannel tcpIn() {
return new DirectChannel();
}
@Bean
public MessageChannel serviceChannel() {
return new DirectChannel();
}
@ConditionalOnMissingBean(name = "errorChannel")
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel exceptionEventChannel() {
return new DirectChannel();
}
@Bean
public ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer() {
ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer = new ByteArrayLengthPrefixSerializer();
byteArrayLengthPrefixSerializer.setMaxMessageSize(98304); //max allowed size set to 96kb
return byteArrayLengthPrefixSerializer;
}
@Bean
public AbstractServerConnectionFactory tcpNetServerConnectionFactory() {
TcpNetServerConnectionFactory tcpServerCf = new TcpNetServerConnectionFactory(tcpPort);
tcpServerCf.setSerializer(byteArrayLengthPrefixSerializer());
tcpServerCf.setDeserializer(byteArrayLengthPrefixSerializer());
return tcpServerCf;
}
@Bean
public TcpReceivingChannelAdapter tcpReceivingChannelAdapter() {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(tcpNetServerConnectionFactory());
adapter.setOutputChannel(tcpIn());
adapter.setErrorChannel(exceptionEventChannel());
return adapter;
}
@ServiceActivator(inputChannel = "exceptionEventChannel", outputChannel = "serviceChannel")
public String handle(Message<MessagingException> msg) {
//String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
System.out.println("-----------------EXCEPTION ==> " + msg);
return msg.toString();
}
@Transformer(inputChannel = "errorChannel", outputChannel = "serviceChannel")
public String transformer(String msg) {
//String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
System.out.println("-----------------ERROR ==> " + msg);
return msg.toString();
}
@ServiceActivator(inputChannel = "serviceChannel")
@Bean
public TcpSendingMessageHandler out(AbstractServerConnectionFactory cf) {
TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
tcpSendingMessageHandler.setConnectionFactory(cf);
return tcpSendingMessageHandler;
}
@Bean
public ApplicationListener<TcpDeserializationExceptionEvent> listener() {
return new ApplicationListener<TcpDeserializationExceptionEvent>() {
@Override
public void onApplicationEvent(TcpDeserializationExceptionEvent tcpDeserializationExceptionEvent) {
exceptionEventChannel().send(MessageBuilder.withPayload(tcpDeserializationExceptionEvent.getCause())
.build());
}
};
}
}
tcpIn() 中的消息被发送到单独的 @Component 类中的 @ServiceActivator 方法,其结构如下:
@Component
public class TcpServiceActivator {
@Autowired
public TcpServiceActivator() {
}
@ServiceActivator(inputChannel = "tcpIn", outputChannel = "serviceChannel")
public String service(byte[] byteMessage) {
// Business Logic returns String Ack Response
}
我没有遇到运行成功场景的问题。我的 Tcp TestClient 按预期收到 Ack 响应。
但是,当我尝试模拟异常时,比如 Deserializer Exception,异常消息不会作为对 Tcp 客户端的响应发回。 我可以看到我的应用程序监听器获取 TcpDeserializationExceptionEvent 并将消息发送到 exceptionEventChannel。 @ServiceActivator 方法 handle(Message msg) 也打印我的异常消息。但它永远不会到达 MessageHandler 方法 out(AbstractServerConnectionFactory cf) 内的断点(在 Debug模式下)。
我正在努力了解出了什么问题。提前感谢您的帮助。
更新:我注意到套接字在发送响应之前因异常而关闭。我正在想办法解决这个问题
解决方案更新(2019 年 3 月 12 日):
由 Gary 提供,我编辑了反序列化器以返回一条消息,该消息可以通过 @Router 方法进行跟踪并重定向到 errorChannel。监听 errorchannel 的 ServiceActivator 然后将所需的错误消息发送到 outputChannel 。该解决方案似乎有效。
我在 ByteArrayLengthPrefixSerializer 中的反序列化器方法返回 Gary 推荐的“特殊值”,而不是原始的 inputStream 消息。
public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
boolean isValidMessage = false;
try {
int messageLength = this.readPrefix(inputStream);
if (messageLength > 0 && fillUntilMaxDeterminedSize(inputStream, buffer, messageLength)) {
return this.copyToSizedArray(buffer, messageLength);
}
return EventType.MSG_INVALID.getName().getBytes();
} catch (SoftEndOfStreamException eose) {
return EventType.MSG_INVALID.getName().getBytes();
}
}
我还创建了一些新 channel 来容纳我的路由器,流程如下:
成功流程 tcpIn (@Router) -> serviceChannel(保存业务逻辑的@serviceActivator) -> outputChannel (@serviceActivator 向客户端发送响应)
异常流 tcpIn (@Router) -> errorChannel(@serviceActivator 准备错误响应消息) -> outputChannel (@serviceActivator 向客户端发送响应)
我的@Router 和'errorHandling' @serviceActivator -
@Router(inputChannel = "tcpIn", defaultOutputChannel = "errorChannel")
public String messageRouter(byte[] byteMessage) {
String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
System.out.println("------------------> "+unfilteredMessage);
if (Arrays.equals(EventType.MSG_INVALID.getName().getBytes(), byteMessage)) {
return "errorChannel";
}
return "serviceChannel";
}
@ServiceActivator(inputChannel = "errorChannel", outputChannel = "outputChannel")
public String errorHandler(byte[] byteMessage) {
return Message.ACK_RETRY;
}
最佳答案
错误 channel 用于处理处理消息时发生的异常。反序列化错误发生在创建消息之前(反序列化器解码消息的有效负载)。
反序列化异常是致命的,正如您所观察到的,套接字已关闭。
一个选择是在反序列化器中捕获异常并返回一个指示反序列化异常发生的“特殊”值,然后在您的主流程中检查该值。
关于spring-boot - Spring-Integration:异常时未发送 Tcp 服务器响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55093129/