java - 与 Spring 集成的 PollableChannel

标签 java spring spring-integration spring-annotations spring-messaging

我有一个接口(interface)Channels.java

    final String OUTPUT = "output";

    final String INPUT = "input";


    @Output(OUTPUT)
    MessageChannel output();

    @BridgeFrom(OUTPUT)
    PollableChannel input();

我有另一个类,我在其中执行所有消息传递操作:

@Autowired
@Qualifier(Channels.OUTPUT)
private MessageChannel Output;

我能够很好地向交易所发送消息。如何在这里使用我的 PollableChannel ?我做错了什么?

编辑

如何访问 @Component 类中的 bean?

我现在有@Configuration类

@Bean
@BridgeTo(Channels.OUTPUT)
public PollableChannel polled() {
    return new QueueChannel();
}

想要能够使用此 channel 接收消息吗?

最佳答案

桥必须是一个@Bean,而不是接口(interface)方法上的注释 - 请参阅 the answer to your general question here .

编辑

@SpringBootApplication
@EnableBinding(Source.class)
public class So44018382Application implements CommandLineRunner {

    final Logger logger = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So44018382Application.class, args);
        Thread.sleep(60_000);
        context.close();
    }

    @RabbitListener(bindings =
            @QueueBinding(value = @Queue(value = "foo", autoDelete = "true"),
                            exchange = @Exchange(value = "output", type = "topic"), key = "#"))
    // bind a queue to the output exchange
    public void listen(String in) {
        this.logger.info("received " + in);
    }

    @BridgeTo(value = Source.OUTPUT,
            poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "2"))
    @Bean
    public PollableChannel polled() {
        return new QueueChannel(5);
    }

    @Override
    public void run(String... args) throws Exception {
        for (int i = 0; i < 30; i++) {
            polled().send(new GenericMessage<>("foo" + i));
            this.logger.info("sent foo" + i);
        }
    }

}

这对我来说效果很好;队列深度为5;当它已满时,发送者会阻塞;轮询器一次仅删除 2 条消息并将它们发送到输出 channel 。

此示例还添加了一个兔子监听器来消耗发送到绑定(bind)器的消息。

关于java - 与 Spring 集成的 PollableChannel,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44018382/

相关文章:

java - 当 ListView 已经到达顶部时,如何检测用户是否向上滚动

java - Spring Cloud Stream 的内存中绑定(bind)器

java - 哪种类型的 Spring 集成 channel ?

java - 使用 JFileChooser 制作备份副本

java - 如何确保一行代码对于每个单独的线程仅运行一次?

java - 为单元测试创​​建对象 MockHttpServletResponse 时出错

spring - 休息 : Extending CrudRepository for self-referencing entities throws an exception

spring - 如何让JobDetailBean非并发运行?

spring - 我应该使用 Spring 集成还是 Spring 社交来与 twitter 和 Facebook 交互?

java - 登录 Spring Integration java DSL