我正在尝试使用 kafka 绑定(bind)构建一个简单的云流应用程序。让我描述一下设置。
1. 我有一个生产者正在生产主题 topic_1
。
2. 有一个流绑定(bind)器,经过一些处理后将 topic_1
绑定(bind)到 topic_2
中。
@StreamListener(MyBinder.INPUT)
@SendTo(MyBinder.OUTPUT_2)
public String handleIncomingMsgs(String s) {
logger.info(s); // prints all the messages
return s;
}
- 当生产者生成消息时,
StreamListner handleIncomingMsgs
会获取所有消息。 - 接收后,它应该将消息转发到其他 channel 。
@Service
@EnableBinding(MyBinder.class)
public class LogMsg {
@StreamListener(MyBinder.OUTPUT_2)
public void handle(String board) {
logger.info("Received payload: " + board); //prints every alternate messages
}
- 这是我的 Binder
public interface ViewsStreams {
String INPUT = "input";
String OUTPUT_1 = "output_1";
String OP_USERS = "output_2";
@Autowired
@Input(INPUT)
SubscribableChannel job_board_views();
@Autowired
@Output(OUTPUT_1)
MessageChannel outboundJobBoards();
@Autowired
@Output(OUTPUT_2)
MessageChannel outboundUsers();
}
我是这些技术的新手。无法弄清楚这里出了什么问题。有人可以帮忙吗?
最佳答案
你的猜测是正确的; OUTPUT_2 channel 上有两个使用者 - 监听器和发送消息的绑定(bind)。
他们每个人都会收到备用消息。
关于java - Spring云流0x104567911,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58110643/