我正在尝试使用 Spring Integration 实现一个 TCP 客户端/服务器应用程序,我需要为每个传入的 TCP 服务器连接打开一个 TCP 客户端套接字。
基本上,我有一堆物联网设备通过原始 TCP 套接字与后端服务器通信。我需要在系统中实现额外的功能。但是设备和服务器上的软件都是封闭源代码,所以我对此无能为力。所以我的想法是在设备和服务器之间放置中间件,以拦截此客户端/服务器通信并提供附加功能。
我正在使用带有入站/出站 channel 适配器的 TcpNioServerConnectionFactory
和 TcpNioClientConnectionFactory
向各方发送消息/从各方接收消息。但是消息结构中没有将消息绑定(bind)到某个设备的信息;因此,每次服务器套接字上出现来自新设备的新连接时,我都必须打开一个新的客户端套接字到后端。此客户端连接必须绑定(bind)到该特定服务器套接字的生命周期。它绝不能被重用,如果这个客户端套接字(后端到中间件)由于任何原因而死,服务器套接字(中间件到设备)也必须关闭。我该怎么做?
编辑:我的第一个想法是将 AbstractClientConnectionFactory
子类化,但它似乎除了在被询问时提供客户端连接外什么都不做。我应该研究入站/出站 channel 适配器的子类化还是其他地方?我还应该提到,我也对非 Spring 集成解决方案持开放态度,例如 Apache Camel,甚至是带有原始 NIO 套接字的自定义解决方案。
编辑 2:我通过切换到 TcpNetServerConnectionFactory
并使用 ThreadAffinityClientConnectionFactory
包装客户端工厂而完成了一半,并且设备可以正常到达后端。但是,当后端发回某些内容时,我收到错误 Unable to find outbound socket for GenericMessage
并且客户端套接字终止。我认为这是因为后端没有正确路由消息所需的 header 。我怎样才能捕捉到这些信息?我的配置类如下:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration {
@Bean
public AbstractServerConnectionFactory serverFactory() {
AbstractServerConnectionFactory factory = new TcpNetServerConnectionFactory(8000);
factory.setSerializer(new MapJsonSerializer());
factory.setDeserializer(new MapJsonSerializer());
return factory;
}
@Bean
public AbstractClientConnectionFactory clientFactory() {
AbstractClientConnectionFactory factory = new TcpNioClientConnectionFactory("localhost", 3333);
factory.setSerializer(new MapJsonSerializer());
factory.setDeserializer(new MapJsonSerializer());
factory.setSingleUse(true);
return new ThreadAffinityClientConnectionFactory(factory);
}
@Bean
public TcpReceivingChannelAdapter inboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
inbound.setConnectionFactory(connectionFactory);
return inbound;
}
@Bean
public TcpSendingMessageHandler outboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
outbound.setConnectionFactory(connectionFactory);
return outbound;
}
@Bean
public TcpReceivingChannelAdapter inboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
inbound.setConnectionFactory(connectionFactory);
return inbound;
}
@Bean
public TcpSendingMessageHandler outboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
outbound.setConnectionFactory(connectionFactory);
return outbound;
}
@Bean
public IntegrationFlow backendIntegrationFlow() {
return IntegrationFlows.from(inboundBackendAdapter(clientFactory()))
.log(LoggingHandler.Level.INFO)
.handle(outboundDeviceAdapter(serverFactory()))
.get();
}
@Bean
public IntegrationFlow deviceIntegrationFlow() {
return IntegrationFlows.from(inboundDeviceAdapter(serverFactory()))
.log(LoggingHandler.Level.INFO)
.handle(outboundBackendAdapter(clientFactory()))
.get();
}
}
最佳答案
目前还不完全清楚你在问什么,所以我假设你的意思是你想要在你的客户端和服务器之间有一个 spring 集成代理。像这样的东西:
iot-device -> spring server -> message-transformation -> spring client -> back-end-server
如果是这种情况,您可以实现包装标准工厂的 ClientConnectionIdAware
客户端连接工厂。
在集成流程中,将消息中传入的 ip_connectionId
header 绑定(bind)到线程(在 ThreadLocal
中)。
然后,在客户端连接工厂中,使用ThreadLocal
值在一个Map中查找对应的传出连接;如果未找到(或关闭),则创建一个新的并将其存储在 map 中以供将来重复使用。
实现一个ApplictionListener
(或@EventListener
)来监听来自服务器连接工厂的TcpConnectionCloseEvent
和close()
相应的出站连接。
这听起来像是一个很酷的增强功能,因此请考虑将其贡献回框架。
编辑
5.0 版添加了 ThreadAffinityClientConnectionFactory
,它可以与 TcpNetServerConnectionFactory
开箱即用,因为每个连接都有自己的线程。
使用 TcpNioServerConnectionFactory
,您将需要额外的逻辑来为每个请求动态地将连接绑定(bind)到线程。
EDIT2
@SpringBootApplication
public class So51200675Application {
public static void main(String[] args) {
SpringApplication.run(So51200675Application.class, args).close();
}
@Bean
public ApplicationRunner runner() {
return args -> {
Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
socket.getOutputStream().write("foo\r\n".getBytes());
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println(reader.readLine());
socket.close();
};
}
@Bean
public Map<String, String> fromToConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public Map<String, String> toFromConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public IntegrationFlow proxyInboundFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
.transform(Transformers.objectToString())
.<String, String>transform(s -> s.toUpperCase())
.handle((p, h) -> {
mapConnectionIds(h);
return p;
})
.handle(Tcp.outboundAdapter(threadConnectionFactory()))
.get();
}
@Bean
public IntegrationFlow proxyOutboundFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
.transform(Transformers.objectToString())
.<String, String>transform(s -> s.toUpperCase())
.enrichHeaders(e -> e
.headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
+ IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
.handle(Tcp.outboundAdapter(serverFactory()))
.get();
}
private void mapConnectionIds(Map<String, Object> h) {
try {
TcpConnection connection = threadConnectionFactory().getConnection();
String mapping = toFromConnectionMappings().get(connection.getConnectionId());
String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
if (mapping == null || !(mapping.equals(incomingCID))) {
System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
}
}
catch (Exception e) {
e.printStackTrace();
}
}
@Bean
public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
return new ThreadAffinityClientConnectionFactory(clientFactory()) {
@Override
public boolean isSingleUse() {
return false;
}
};
}
@Bean
public AbstractServerConnectionFactory serverFactory() {
return Tcp.netServer(1234).get();
}
@Bean
public AbstractClientConnectionFactory clientFactory() {
AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235).get();
clientFactory.setSingleUse(true);
return clientFactory;
}
@Bean
public IntegrationFlow serverFlow() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)))
.transform(Transformers.objectToString())
.<String, String>transform(p -> p + p)
.get();
}
@Bean
public ApplicationListener<TcpConnectionCloseEvent> closer() {
return e -> {
if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
String key = fromToConnectionMappings().remove(e.getConnectionId());
toFromConnectionMappings().remove(key);
System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
threadConnectionFactory().releaseConnection();
}
};
}
}
EDIT3
使用 MapJsonSerializer
对我来说效果很好。
@SpringBootApplication
public class So51200675Application {
public static void main(String[] args) {
SpringApplication.run(So51200675Application.class, args).close();
}
@Bean
public ApplicationRunner runner() {
return args -> {
Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
socket.getOutputStream().write("{\"foo\":\"bar\"}\n".getBytes());
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println(reader.readLine());
socket.close();
};
}
@Bean
public Map<String, String> fromToConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public Map<String, String> toFromConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public MapJsonSerializer serializer() {
return new MapJsonSerializer();
}
@Bean
public IntegrationFlow proxyRequestFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
.<Map<String, String>, Map<String, String>>transform(m -> {
m.put("foo", m.get("foo").toUpperCase());
return m;
})
.handle((p, h) -> {
mapConnectionIds(h);
return p;
})
.handle(Tcp.outboundAdapter(threadConnectionFactory()))
.get();
}
@Bean
public IntegrationFlow proxyReplyFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
.<Map<String, String>, Map<String, String>>transform(m -> {
m.put("foo", m.get("foo").toLowerCase() + m.get("foo"));
return m;
})
.enrichHeaders(e -> e
.headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
+ IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
.handle(Tcp.outboundAdapter(serverFactory()))
.get();
}
private void mapConnectionIds(Map<String, Object> h) {
try {
TcpConnection connection = threadConnectionFactory().getConnection();
String mapping = toFromConnectionMappings().get(connection.getConnectionId());
String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
if (mapping == null || !(mapping.equals(incomingCID))) {
System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
}
}
catch (Exception e) {
e.printStackTrace();
}
}
@Bean
public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
return new ThreadAffinityClientConnectionFactory(clientFactory()) {
@Override
public boolean isSingleUse() {
return false;
}
};
}
@Bean
public AbstractServerConnectionFactory serverFactory() {
return Tcp.netServer(1234)
.serializer(serializer())
.deserializer(serializer())
.get();
}
@Bean
public AbstractClientConnectionFactory clientFactory() {
AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235)
.serializer(serializer())
.deserializer(serializer())
.get();
clientFactory.setSingleUse(true);
return clientFactory;
}
@Bean
public IntegrationFlow backEndEmulatorFlow() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)
.serializer(serializer())
.deserializer(serializer())))
.<Map<String, String>, Map<String, String>>transform(m -> {
m.put("foo", m.get("foo") + m.get("foo"));
return m;
})
.get();
}
@Bean
public ApplicationListener<TcpConnectionCloseEvent> closer() {
return e -> {
if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
String key = fromToConnectionMappings().remove(e.getConnectionId());
toFromConnectionMappings().remove(key);
System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
threadConnectionFactory().releaseConnection();
}
};
}
}
和
添加新映射 localhost:56998:1234:55c822a4-4252-45e6-9ef2-79263391f4be 到 localhost:1235:56999:3d520ca9-2f3a-44c3-b05f-e59695b8c1b0 {“foo”:“barbarBARBAR”} 删除映射 localhost:56998:1234:55c822a4-4252-45e6-9ef2-79263391f4be 到 localhost:1235:56999:3d520ca9-2f3a-44c3-b05f-e59695b8c1b0
关于java - Spring 集成 : TCP Client/Server opening one client connection per server connection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51200675/