java - 具有非 Spring 客户端的 Spring Integration TCP 入站/出站适配器

标签 java spring tcp spring-integration

我正在设计一个将使用 Spring MVC Web 应用程序运行的系统。它将用于向现有(非 Spring)应用程序发送和接收 TCP 命令,该应用程序用于控制某些网络数据过滤器。我只是在玩 Spring Integration TCP 的东西(我是 SI 和 Spring 的新手)来尝试理解它,但我正在努力让一个基本的例子起作用。

我需要异步通信,因为服务器端和客户端可以随时发送数据,它可能需要也可能不需要回复。所以我相信我需要使用的是协作 channel 适配器而不是网关。

我的演示程序应该等待客户端连接,然后接收一系列 String 消息,它会回应回复。用户还可以键入要从服务器端发送的内容。

它基于示例中的 tcp-client-server 示例。我想通过 Java 配置而不是 XML 来完成这一切。

我希望下面的演示将传入的数据回显给客户端。

这是服务器配置类:

@Configuration()
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration implements ApplicationListener<TcpConnectionEvent> {

private final int port = SocketUtils.findAvailableServerSocket(5000);

@MessagingGateway(defaultRequestChannel="toTcp")
public interface Gateway {
    String send(String in);
}

@Bean
public AbstractServerConnectionFactory serverFactory() {
    System.out.println("serverFactory");
    AbstractServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(port);
    return connectionFactory;
}

@Bean MessageChannel toTcp() {
    System.out.println("creating toTcp DirectChannel");
    DirectChannel dc = new DirectChannel();
    dc.setBeanName("toTcp");

    return dc;
}

@Bean
public MessageChannel fromTcp() {
    System.out.println("creating fromTcp DirectChannel");
    DirectChannel dc = new DirectChannel();
    dc.setBeanName("fromTcp");

    return dc;
}

// Inbound channel adapter. This receives the data from the client
@Bean
public TcpReceivingChannelAdapter inboundAdapter(AbstractServerConnectionFactory connectionFactory) {
    System.out.println("Creating inbound adapter");
    TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();

    inbound.setConnectionFactory(connectionFactory);
    inbound.setOutputChannel("fromTcp");

    return inbound;
}

// Outbound channel adapter. This sends the data to the client
@Bean
@ServiceActivator(inputChannel="toTcp")
public TcpSendingMessageHandler outboundAdapter(AbstractServerConnectionFactory connectionFactory) {
    System.out.println("Creating outbound adapter");
    TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
    outbound.setConnectionFactory(connectionFactory);
    return outbound;
}

// Endpoint example 
@MessageEndpoint
public static class Echo {

    // Server
    @Transformer(inputChannel="fromTcp", outputChannel="toEcho")
    public String convert(byte[] bytes) {
        System.out.println("convert: " + new String(bytes));
        return new String(bytes);
    }

    // Server
    @ServiceActivator(inputChannel="toEcho", outputChannel="toTcp")
    public String upCase(String in) {
        System.out.println("upCase: " + in.toUpperCase());
        return in.toUpperCase();
    }
}

@Override
public void onApplicationEvent(TcpConnectionEvent event) {
    System.out.println("Got TcpConnectionEvent: source=" + event.getSource() + 
            ", id=" + event.getConnectionId()); 
}   
}

这是主类:

@SpringBootApplication
@IntegrationComponentScan
@EnableMessageHistory
public class SpringIntegrationTcpTest {

    @Autowired
    private ServerConfiguration.Gateway gateway;

    public String send(String data) {
        return gateway.send(data);
    }


public static void main(String[] args) throws IOException {

    ConfigurableApplicationContext context = SpringApplication.run(SpringIntegrationTcpTest.class, args);

    SpringIntegrationTcpTest si = context.getBean(SpringIntegrationTcpTest.class);

    final AbstractServerConnectionFactory crLfServer = context.getBean(AbstractServerConnectionFactory.class);

    final Scanner scanner = new Scanner(System.in);
    System.out.print("Waiting for server to accept connections on port " + crLfServer.getPort());
    TestingUtilities.waitListening(crLfServer, 100000L);
    System.out.println("running.\n\n");

    System.out.println("Please enter some text and press <enter>: ");
    System.out.println("\tNote:");
    System.out.println("\t- Entering FAIL will create an exception");
    System.out.println("\t- Entering q will quit the application");
    System.out.print("\n");

    while (true) {

        final String input = scanner.nextLine();

        if("q".equals(input.trim())) {
            break;
        }
        else {
            final String result = si.send(input);
            System.out.println(result);
        }
    }

    scanner.close();
    context.close();
}
}

这是虚拟客户端类:

public class TcpClient {

    public TcpClient() {
    }

    private void connect(String host, int port) throws InterruptedException {
        Socket socket = null;
        Writer out = null;
        BufferedReader in = null;

        try {
            System.out.print("Connecting to " + host + " on port " + port + " ... ");
            socket = new Socket(host, port);
            System.out.println("connected.");

            System.out.println("sending 100 messages");

            out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            for (int i = 1; i < 100; ++i) {
                String msg =  "hello" + i;

                out.write(msg+"\r\n");
                out.flush();
                //System.out.print(msg+"\r\n");

                System.out.println("Waiting for message ...");

                StringBuffer str = new StringBuffer();
                int c;
                while ((c = in.read()) != -1) {
                    str.append((char) c);
                }       

                String response = str.toString();
                System.out.println("got message: " + response);

                Thread.sleep(1000);
            }


        } catch (IOException e) {

            System.err.println("Test ended with an exception: " + port + ", " + e.getMessage());

        } finally {
            try {
                socket.close();
                out.close();
                //in.close();

            } catch (Exception e) {
                // swallow exception
            }

        }       

    }

    public static void main(String[] args) throws InterruptedException {

        String host = args[0];
        int port = Integer.parseInt(args[1]);
        new TcpClient().connect(host, port);
    }

}

我花了很多时间研究网关等,并让它与 telnet 一起工作,并使用网关从客户端接收消息。我不能做的是让它与 channel 适配器一起正常工作。

当客户端启动时,它会发送由服务器接收并打印到控制台的字符串。似乎没有任何东西被发回,因为客户端只是坐在“等待消息...”上。从服务器端发送内容时,出现以下异常:

Please enter some text and press <enter>:
        Note:
        - Entering FAIL will create an exception
        - Entering q will quit the application

Got TcpConnectionEvent: source=org.springframework.integration.ip.tcp.connection.TcpNetConnection@67162888, id=127.0.0.1:50940:5000:052bf55b-526a-4ea9-bfe3-8ecc573239a3
convert: hello1
upCase: HELLO1
qwe
2017-01-10 12:09:13.995 ERROR 7296 --- [           main] o.s.i.ip.tcp.TcpSendingMessageHandler    : Unable to find outbound socket for GenericMessage [payload=qwe, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6b5894c8, history=serverConfiguration$Gateway,toTcp,serverConfiguration.outboundAdapter.serviceActivator.handler,outboundAdapter, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6b5894c8, id=a4ea72f2-6b12-379b-1b15-f75b821f0b7f, timestamp=1484050153995}]
Exception in thread "main" org.springframework.messaging.MessageHandlingException: Unable to find outbound socket
        at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)

所以问题是没有出站套接字。那么outbound socket是在哪里定义的呢?我还做错了什么?

最佳答案

  1. 您不需要在 channel 上调用 setBeanName - 框架会自动为您完成。

  2. 您的网关正在等待回复,并且 toTcp channel 已连接到不返回回复的 channel 适配器 - 在这种情况下使用 void 返回类型

o.s.i.ip.tcp.TcpSendingMessageHandler : Unable to find outbound socket

要向已连接的客户端发送任意消息,您需要通过设置 ip_connectionId header (有一个常量 IpHeaders.CONNECTION_ID)告诉适配器将其发送到哪个客户端.

您需要设置该 header - 您可以通过 TcpConnectionOpenEvent 捕获它并通过网关将其添加到 header 中...

void send(@Payload String data, @Header(IpHeaders.CONNECTION_ID) String connectionId);

关于java - 具有非 Spring 客户端的 Spring Integration TCP 入站/出站适配器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41568806/

相关文章:

spring - 无法下载 Spring 框架 jar 文件

c#:堆栈在 Invoke 处停止

delphi - 同一应用程序上的多个 TIdCmdTCPServers

java - JDBC 驱动程序 - ClassNotFoundException、NetBeans

java.io.StreamCorruptedException : invalid stream header

java - 如何获取Uri值?

java - Spring Boot 找不到/WEB-INF/classes/index.jsp

使用 Hamcrest 进行 Spring MVC 测试 : how count and test the properties number/size of an object inside a Model

c++ - Linux TCP 服务器问题 C++

java - Apache Kafka 的 Spring : Unrecognized producer configuration - 'delivery.timeout.ms'