java - 如何使用 Spring Cloud Stream 应用程序启动器 TCP 处理消息

标签 java spring tcp spring-cloud-stream

我想使用 Spring Cloud Stream App Starter TCP Source project (maven 工件)以便能够通过套接字/端口接收 TCP 消息,处理它们,然后将结果推送到消息代理(例如 RabbitMQ)。

这个 TCP 源项目似乎完全符合我的要求,但它会自动将收到的消息发送到输出 channel 。那么,是否有一种干净的方法仍然使用 TCP 源项目,但拦截 TCP 传入消息以在内部转换它们,然后将它们输出到我的消息代理?

最佳答案

参见aggregation .

您使用源和处理器创建聚合应用。

Spring Cloud Stream provides support for aggregating multiple applications together, connecting their input and output channels directly and avoiding the additional cost of exchanging messages via a broker. As of version 1.0 of Spring Cloud Stream, aggregation is supported only for the following types of applications:

sources, sinks, processors ...

They can be aggregated together by creating a sequence of interconnected applications, in which the output channel of an element in the sequence is connected to the input channel of the next element, if it exists. A sequence can start with either a source or a processor, it can contain an arbitrary number of processors and must end with either a processor or a sink.

编辑

作为解决源 Autowiring 问题的方法,您可以尝试...

@EnableBinding(Source.class)
@EnableConfigurationProperties(TcpSourceProperties.class)
public class MyTcpSourceConfiguration {

    @Autowired
    private Source channels;

    @Autowired
    private TcpSourceProperties properties;

    @Bean
    public TcpReceivingChannelAdapter adapter(
            @Qualifier("tcpSourceConnectionFactory") AbstractConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(connectionFactory);
        adapter.setOutputChannelName("toMyProcessor");
        return adapter;
    }

    @ServiceActivator(inputChannel = "toMyProcessor", outputChannel = Source.OUTPUT)
    public byte[] myProcessor(byte[] fromTcp) {
        ...
    }

    @Bean
    public TcpConnectionFactoryFactoryBean tcpSourceConnectionFactory(
            @Qualifier("tcpSourceDecoder") AbstractByteArraySerializer decoder) throws Exception {
        TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean();
        factoryBean.setType("server");
        factoryBean.setPort(this.properties.getPort());
        factoryBean.setUsingNio(this.properties.isNio());
        factoryBean.setUsingDirectBuffers(this.properties.isUseDirectBuffers());
        factoryBean.setLookupHost(this.properties.isReverseLookup());
        factoryBean.setDeserializer(decoder);
        factoryBean.setSoTimeout(this.properties.getSocketTimeout());
        return factoryBean;
    }

    @Bean
    public EncoderDecoderFactoryBean tcpSourceDecoder() {
        EncoderDecoderFactoryBean factoryBean = new EncoderDecoderFactoryBean(this.properties.getDecoder());
        factoryBean.setMaxMessageSize(this.properties.getBufferSize());
        return factoryBean;
    }

}

关于java - 如何使用 Spring Cloud Stream 应用程序启动器 TCP 处理消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42346872/

相关文章:

linux - 套接字 FD 存储在哪里?

java - 将字母表示为数字

java - 绑定(bind) OpenGL 上下文时虚假 GLFW_VISIBILE 提示的目的

java - 在 Spring 中使用双值验证

c++ - 数据传输协议(protocol)设计

c#异步客户端循环运行

java - 在 Android 客户端应用程序中混淆 URL

java - 使用平方根来帮助确定数字是否为素数

java - Spring Thymeleaf - 如何映射到需要从 HTML 传递到 Controller 的 ID 的 URL?

java - Spring 数据 : Not an managed type: class java. lang.Object