我正在设计一个将使用 Spring MVC Web 应用程序运行的系统。它将用于向现有(非 Spring)应用程序发送和接收 TCP 命令,该应用程序用于控制某些网络数据过滤器。我只是在玩 Spring Integration TCP 的东西(我是 SI 和 Spring 的新手)来尝试理解它,但我正在努力让一个基本的例子起作用。
我需要异步通信,因为服务器端和客户端可以随时发送数据,它可能需要也可能不需要回复。所以我相信我需要使用的是协作 channel 适配器而不是网关。
我的演示程序应该等待客户端连接,然后接收一系列 String 消息,它会回应回复。用户还可以键入要从服务器端发送的内容。
它基于示例中的 tcp-client-server 示例。我想通过 Java 配置而不是 XML 来完成这一切。
我希望下面的演示将传入的数据回显给客户端。
这是服务器配置类:
@Configuration()
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration implements ApplicationListener<TcpConnectionEvent> {
private final int port = SocketUtils.findAvailableServerSocket(5000);
@MessagingGateway(defaultRequestChannel="toTcp")
public interface Gateway {
String send(String in);
}
@Bean
public AbstractServerConnectionFactory serverFactory() {
System.out.println("serverFactory");
AbstractServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(port);
return connectionFactory;
}
@Bean MessageChannel toTcp() {
System.out.println("creating toTcp DirectChannel");
DirectChannel dc = new DirectChannel();
dc.setBeanName("toTcp");
return dc;
}
@Bean
public MessageChannel fromTcp() {
System.out.println("creating fromTcp DirectChannel");
DirectChannel dc = new DirectChannel();
dc.setBeanName("fromTcp");
return dc;
}
// Inbound channel adapter. This receives the data from the client
@Bean
public TcpReceivingChannelAdapter inboundAdapter(AbstractServerConnectionFactory connectionFactory) {
System.out.println("Creating inbound adapter");
TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
inbound.setConnectionFactory(connectionFactory);
inbound.setOutputChannel("fromTcp");
return inbound;
}
// Outbound channel adapter. This sends the data to the client
@Bean
@ServiceActivator(inputChannel="toTcp")
public TcpSendingMessageHandler outboundAdapter(AbstractServerConnectionFactory connectionFactory) {
System.out.println("Creating outbound adapter");
TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
outbound.setConnectionFactory(connectionFactory);
return outbound;
}
// Endpoint example
@MessageEndpoint
public static class Echo {
// Server
@Transformer(inputChannel="fromTcp", outputChannel="toEcho")
public String convert(byte[] bytes) {
System.out.println("convert: " + new String(bytes));
return new String(bytes);
}
// Server
@ServiceActivator(inputChannel="toEcho", outputChannel="toTcp")
public String upCase(String in) {
System.out.println("upCase: " + in.toUpperCase());
return in.toUpperCase();
}
}
@Override
public void onApplicationEvent(TcpConnectionEvent event) {
System.out.println("Got TcpConnectionEvent: source=" + event.getSource() +
", id=" + event.getConnectionId());
}
}
这是主类:
@SpringBootApplication
@IntegrationComponentScan
@EnableMessageHistory
public class SpringIntegrationTcpTest {
@Autowired
private ServerConfiguration.Gateway gateway;
public String send(String data) {
return gateway.send(data);
}
public static void main(String[] args) throws IOException {
ConfigurableApplicationContext context = SpringApplication.run(SpringIntegrationTcpTest.class, args);
SpringIntegrationTcpTest si = context.getBean(SpringIntegrationTcpTest.class);
final AbstractServerConnectionFactory crLfServer = context.getBean(AbstractServerConnectionFactory.class);
final Scanner scanner = new Scanner(System.in);
System.out.print("Waiting for server to accept connections on port " + crLfServer.getPort());
TestingUtilities.waitListening(crLfServer, 100000L);
System.out.println("running.\n\n");
System.out.println("Please enter some text and press <enter>: ");
System.out.println("\tNote:");
System.out.println("\t- Entering FAIL will create an exception");
System.out.println("\t- Entering q will quit the application");
System.out.print("\n");
while (true) {
final String input = scanner.nextLine();
if("q".equals(input.trim())) {
break;
}
else {
final String result = si.send(input);
System.out.println(result);
}
}
scanner.close();
context.close();
}
}
这是虚拟客户端类:
public class TcpClient {
public TcpClient() {
}
private void connect(String host, int port) throws InterruptedException {
Socket socket = null;
Writer out = null;
BufferedReader in = null;
try {
System.out.print("Connecting to " + host + " on port " + port + " ... ");
socket = new Socket(host, port);
System.out.println("connected.");
System.out.println("sending 100 messages");
out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
for (int i = 1; i < 100; ++i) {
String msg = "hello" + i;
out.write(msg+"\r\n");
out.flush();
//System.out.print(msg+"\r\n");
System.out.println("Waiting for message ...");
StringBuffer str = new StringBuffer();
int c;
while ((c = in.read()) != -1) {
str.append((char) c);
}
String response = str.toString();
System.out.println("got message: " + response);
Thread.sleep(1000);
}
} catch (IOException e) {
System.err.println("Test ended with an exception: " + port + ", " + e.getMessage());
} finally {
try {
socket.close();
out.close();
//in.close();
} catch (Exception e) {
// swallow exception
}
}
}
public static void main(String[] args) throws InterruptedException {
String host = args[0];
int port = Integer.parseInt(args[1]);
new TcpClient().connect(host, port);
}
}
我花了很多时间研究网关等,并让它与 telnet 一起工作,并使用网关从客户端接收消息。我不能做的是让它与 channel 适配器一起正常工作。
当客户端启动时,它会发送由服务器接收并打印到控制台的字符串。似乎没有任何东西被发回,因为客户端只是坐在“等待消息...”上。从服务器端发送内容时,出现以下异常:
Please enter some text and press <enter>:
Note:
- Entering FAIL will create an exception
- Entering q will quit the application
Got TcpConnectionEvent: source=org.springframework.integration.ip.tcp.connection.TcpNetConnection@67162888, id=127.0.0.1:50940:5000:052bf55b-526a-4ea9-bfe3-8ecc573239a3
convert: hello1
upCase: HELLO1
qwe
2017-01-10 12:09:13.995 ERROR 7296 --- [ main] o.s.i.ip.tcp.TcpSendingMessageHandler : Unable to find outbound socket for GenericMessage [payload=qwe, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6b5894c8, history=serverConfiguration$Gateway,toTcp,serverConfiguration.outboundAdapter.serviceActivator.handler,outboundAdapter, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6b5894c8, id=a4ea72f2-6b12-379b-1b15-f75b821f0b7f, timestamp=1484050153995}]
Exception in thread "main" org.springframework.messaging.MessageHandlingException: Unable to find outbound socket
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
所以问题是没有出站套接字。那么outbound socket是在哪里定义的呢?我还做错了什么?
最佳答案
您不需要在 channel 上调用
setBeanName
- 框架会自动为您完成。您的网关正在等待回复,并且
toTcp
channel 已连接到不返回回复的 channel 适配器 - 在这种情况下使用void
返回类型
o.s.i.ip.tcp.TcpSendingMessageHandler : Unable to find outbound socket
要向已连接的客户端发送任意消息,您需要通过设置 ip_connectionId
header (有一个常量 IpHeaders.CONNECTION_ID
)告诉适配器将其发送到哪个客户端.
您需要设置该 header - 您可以通过 TcpConnectionOpenEvent
捕获它并通过网关将其添加到 header 中...
void send(@Payload String data, @Header(IpHeaders.CONNECTION_ID) String connectionId);
关于java - 具有非 Spring 客户端的 Spring Integration TCP 入站/出站适配器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41568806/