java - 使用 java 配置的 TCP 客户端缺少 bean 工厂

标签 java spring spring-integration

我正在尝试创建一个使用 spring 集成启动 TCP 客户端的服务。该服务传递 hostNameport 来创建 AbstractClientConnectionFactory。然后,它使用相同的 AbstractClientConnectionFactory 创建一个 TcpInboundGateway。最后,它启动网关。我收到的错误发生在 endOfLineSerializer

完成之后
@Service
public class TcpService {

    @Autowired
    private TaskScheduler taskScheduler;

    private TcpInboundGateway tcpInboundGateway;

    @Autowired
    private MessageChannel toTcp;

    @Autowired
    private EndOfLineSerializer endOfLineSerializer;

    @Scheduled(initialDelay = 1000, fixedRate = 10000000)
    public void test() {

        if(tcpInboundGateway != null && tcpInboundGateway.isRunning()) {
            return;
        }

        AbstractClientConnectionFactory abstractClientConnectionFactory = clientConnectionFactory("192.XXX.XXX.XX", 4321);
        tcpInboundGateway = tcpInbound(abstractClientConnectionFactory);
        tcpInboundGateway.setTaskScheduler(taskScheduler);
        tcpInboundGateway.start();
    }

    public AbstractClientConnectionFactory clientConnectionFactory(String hostName, int port) {
        TcpNetClientConnectionFactory tcpNetServerConnectionFactory = new TcpNetClientConnectionFactory(hostName, port);
        tcpNetServerConnectionFactory.setSingleUse(false);
        tcpNetServerConnectionFactory.setSoTimeout(300000);
        tcpNetServerConnectionFactory.setDeserializer(endOfLineSerializer);
        tcpNetServerConnectionFactory.setSerializer(endOfLineSerializer);
        tcpNetServerConnectionFactory.setMapper(new TimeoutMapper());
        return tcpNetServerConnectionFactory;
    }

    public TcpInboundGateway tcpInbound(AbstractClientConnectionFactory connectionFactory) {
        TcpInboundGateway gate = new TcpInboundGateway();
        gate.setConnectionFactory(connectionFactory);
        gate.setClientMode(true);
        gate.setRetryInterval(60000);
        gate.setRequestChannel(toTcp);
        gate.setReplyChannelName("toTcp");
        return gate;
    }
}

@EnableIntegration
@IntegrationComponentScan
@Configuration
public class TcpClientConfig {

    @Bean
    public EndOfLineSerializer endOfLineSerializer() {
        return new EndOfLineSerializer();
    }

    @MessageEndpoint
    public static class Echo {

        @Transformer(inputChannel = "toTcp", outputChannel = "serviceChannel")
        public String convert(byte[] bytes) {
            return new String(bytes);
        }
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    public void messageToService(String in) {
        System.out.println(in);
    }

    @Bean
    public MessageChannel toTcp() {
        return new DirectChannel();
    }
}

我尝试@Autowired BeanFactory 并将其设置在TcpInboundGateway 上,但是错误仍然发生。为什么MessageGateway找不到BeanFactory

错误

java.lang.IllegalArgumentException: BeanFactory must not be null
    at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.support.channel.BeanFactoryChannelResolver.<init>(BeanFactoryChannelResolver.java:76) ~[spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.context.IntegrationObjectSupport.getChannelResolver(IntegrationObjectSupport.java:218) ~[spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.getReplyChannel(MessagingGatewaySupport.java:384) ~[spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:736) ~[spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:483) ~[spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:470) ~[spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:120) ~[spring-integration-ip-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:98) ~[spring-integration-ip-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:198) ~[spring-integration-ip-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) [na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [na:na]
    at java.base/java.lang.Thread.run(Thread.java:844) [na:na]

最佳答案

这是因为这些对象不是由 Spring 管理的 - 您要么需要满足所有 ...Aware 接口(interface)并调用 afterPropertiesSet(),要么需要让 Spring 为您处理。有两种方法可以实现后者。

手动使用bean工厂

@Autowired
private ConfigurableListableBeanFactory beanFactory;

public AbstractClientConnectionFactory clientConnectionFactory(String hostName, int port) {
    TcpNetClientConnectionFactory server = new TcpNetClientConnectionFactory(hostName, port);
    server.setSingleUse(false);
    server.setSoTimeout(300000);
    server = (TcpNetClientConnectionFactory) this.beanFactory.initializeBean(server, "cf");
    this.beanFactory.registerSingleton("cf", server);
    return server;
}

public TcpInboundGateway tcpInbound(AbstractClientConnectionFactory connectionFactory) {
    TcpInboundGateway gate = new TcpInboundGateway();
    gate.setConnectionFactory(connectionFactory);
    gate.setClientMode(true);
    gate.setRetryInterval(60000);
    gate.setRequestChannelName("toTcp");
    gate = (TcpInboundGateway) this.beanFactory.initializeBean(gate, "gate");
    this.beanFactory.registerSingleton("gate", gate);
    return gate;
}

使用 Java DSL 动态流注册功能

@Autowired
private IntegrationFlowContext flowContext;

public void tcpInbound(String host, int port, String flowId) {
    IntegrationFlow flow = IntegrationFlows.from(
                Tcp.inboundGateway(Tcp.netClient(host, port))
                    .clientMode(true))
            .channel("toTcp")
            .get();
    this.flowContext.registration(flow).id(flowId).register();
}

(您还可以使用 DSL 配置其他属性)。

   gate.setRequestChannel(toTcp);
   gate.setReplyChannelName("toTcp");

您不能对请求和回复使用同一 channel ;您通常不需要回复 channel ,框架会解决它。仅当您想要执行添加窃听器来记录回复之类的操作时,才需要回复 channel 。

关于java - 使用 java 配置的 TCP 客户端缺少 bean 工厂,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53455338/

相关文章:

java - Spring MVC 中的 HandlerInterceptor 和 HandlerInceptorAdaptor 有什么区别?

java - 带有 spring 的拦截器 - 上下文初始化失败

java - 如何从 ActiveMQ 队列创建 Spring Reactor Flux?

java - Spring Batch Integration 远程分块错误 - 消息包含错误的作业实例 ID [25] 应该是 [24]

spring-integration - 如何使用文件出站适配器移动锁定的文件

java - 在 jUnit 中设置 JNDI 数据源

java - NativeLoader 尚未初始化。使用标准 native 库加载

c# - 如何以编程方式确定我的处理器类型?

Spring:帮助类替换

spring - 具有多个服务器的计划任务 - 单点责任