java - 使用 rabbitmq 了解 spring cloud 消息传递

标签 java spring spring-boot rabbitmq spring-messaging

我认为我在理解 spring cloud 消息传递方面有问题,并且无法找到我所面临的“问题”的答案。

我有以下设置(使用 spring-boot 2.0.3.RELEASE)。

应用程序.yml

spring:
    rabbitmq:
      host: localhost
      port: 5672
      username: guest
      password: guest
      virtual-host: /
    cloud:
      stream:
        bindings:
          input:
            destination: foo
            group: fooGroup
          fooChannel:
            destination: foo

服务等级

@Autowired
FoodOrderController foodOrderController;

@Bean
public CommandLineRunner runner() {
    return (String[] args) -> {
       IntStream.range(0,50).forEach(e -> foodOrderController.orderFood());
    };
}

@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals(String meal){
    System.out.println("This was a great meal!: "+ meal);
}

@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals1(String meal){
    System.out.println("This was a great meal!: "+ meal);
}

食品订单 Controller

public class FoodOrderController {

    @Autowired
    FoodOrderSource foodOrderSource;

    public String orderFood(){
        var foodOrder = new FoodOrder();
        foodOrder.setCustomerAddress(UUID.randomUUID().toString());
        foodOrder.setOrderDescription(UUID.randomUUID().toString());
        foodOrder.setRestaurant("foo");
        foodOrderSource.foodOrders().send(MessageBuilder.withPayload(foodOrder).build());
       // System.out.println(foodOrder.toString());
        return "food ordered!";
    }
}

食品订单来源

public interface FoodOrderSource {
    String INPUT = "foo";
    String OUTPUT = "fooChannel";

    @Input("foo")
    SubscribableChannel foo();
    @Output("fooChannel")
    MessageChannel foodOrders();
}

FoodOrderPublisher

@EnableBinding(FoodOrderSource.class)
public class FoodOrderPublisher {
}

除了两个 StreamListener 收到相同的消息外,设置工作正常。所以一切都被记录了两次。阅读文档,它说在队列绑定(bind)中指定一个 group ,两个监听器都将在组内注册,只有一个监听器会收到一条消息。我知道上面的示例不合理,但我想模拟具有多个监听器设置的多节点环境。

为什么两个听众都收到消息?我怎样才能确保在设置组中只收到一次消息?

根据文档,默认情况下消息也应该自动确认,但我找不到任何表明消息实际得到确认的信息。我在这里遗漏了什么吗?

这里是rabbit admin的一些截图

enter image description here enter image description here enter image description here enter image description here enter image description here

最佳答案

Reading the documentation, it says specifying a group inside the queues bindings, both the listeners will be registered inside the group and only one listener will receive a single message.

当监听器位于不同的应用程序实例中时,情况确实如此。当同一实例中有多个监听器时,他们都会收到相同的消息。这通常与 condition 一起使用,其中每个听众都可以表达对他们感兴趣的食物的兴趣。Documented here .

基本上,竞争消费者是绑定(bind)本身,它将消息分派(dispatch)到应用程序中的实际 @StreamListener

因此,您不能通过这种方式“模拟具有多个监听器设置的多节点环境”。

but I can't find anything that indicates that the messages actually get acknowledged

你这是什么意思?如果消息处理成功,容器会确认消息并将其从队列中移除。

关于java - 使用 rabbitmq 了解 spring cloud 消息传递,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51907964/

相关文章:

spring - websocket客户端连接到服务器时的SockJsException

java - Spring测试注解

java - 如何通过 BigIntegers 索引 BigIntegers 数组

java - 如何发现对象是否包含设置为 0 的数字字段?

c# - 为 Java 类型生成重复的 C# Web 服务代理类

java - 扭曲的最短路径

java - 实体管理器尚未注入(inject)

java - Spring 安全 : Java Config: How to add the method type?

java - 当我为同一个 .java 文件构建多次生成的项目 .class 文件时

java - Spring Boot : "Cannot set level: INFO, ERROR for ' org. springframework'"使用 logback-spring.xml