java - Spring云流0​​x104567911

标签 java spring-kafka spring-cloud-stream

我正在尝试使用 kafka 绑定(bind)构建一个简单的云流应用程序。让我描述一下设置。 1. 我有一个生产者正在生产主题 topic_1
2. 有一个流绑定(bind)器,经过一些处理后将 topic_1 绑定(bind)到 topic_2 中。

@StreamListener(MyBinder.INPUT)
    @SendTo(MyBinder.OUTPUT_2)
    public String handleIncomingMsgs(String s) {
        logger.info(s);  // prints all the messages
        return s;
    }
  • 当生产者生成消息时,StreamListner handleIncomingMsgs 会获取所有消息。
  • 接收后,它应该将消息转发到其他 channel 。
  • @Service
    @EnableBinding(MyBinder.class)
    public class LogMsg {
    
        @StreamListener(MyBinder.OUTPUT_2)
        public void handle(String board) {
            logger.info("Received payload: " + board); //prints every alternate messages
        }
    
  • 这是我的 Binder
  • public interface ViewsStreams {
    
        String INPUT = "input";
        String OUTPUT_1 = "output_1";
        String OP_USERS = "output_2";
    
        @Autowired
        @Input(INPUT)
        SubscribableChannel job_board_views();
    
        @Autowired
        @Output(OUTPUT_1)
        MessageChannel outboundJobBoards();
    
        @Autowired
        @Output(OUTPUT_2)
        MessageChannel outboundUsers();
    }
    

    我是这些技术的新手。无法弄清楚这里出了什么问题。有人可以帮忙吗?

    最佳答案

    你的猜测是正确的; OUTPUT_2 channel 上有两个使用者 - 监听器和发送消息的绑定(bind)。

    他们每个人都会收到备用消息。

    关于java - Spring云流0​​x104567911,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58110643/

    相关文章:

    java - 如何正确缓冲 Java 输入/输出/文件流?

    java - Spring-Kafka消费者收不到消息

    java - Spring Kafka 禁用消费者再平衡

    spring - 如何在我的 Spring Cloud Stream 项目中将传入的 header 映射为 String 而不是 byte[]?

    java - libGDX 字体被绘制出视口(viewport)

    java - 当elasticsearch在单台服务器上时,对于非常大的数据是否需要设置多个主分片?

    Spring Cloud 流 MessageChannel send() 总是返回 true

    spring-boot - 如果消息处理程序 (Consumer<T>) 调用用 @Transactional 注释的服务,则没有可用/创建事务

    java - 为什么不显示整个图 block map ?

    java - 如何将 spring.cloud.stream.kafka.bindings 配置属性应用于所有消费者