java - Spring boot standalone CommandLineRunner 不会返回 spring-starter-amqp

标签 java spring-boot rabbitmq spring-amqp

我正在构建一个经典的生产者 -> rabbitmq -> 消费者流程。 所有 3 个节点都在单独的 jvm 甚至单独的主机上运行

Producer 是一个 spring boot 命令行运行器应用程序,预计会在完成生产后停止。

消费者应用程序是一个 spring boot web 应用程序,它监听 3 个 rabbitmq 队列(2 个持久队列绑定(bind)到直接交换,1 个非持久队列绑定(bind)到扇出交换)

我的启动顺序如下: - 启动rabbitmq - 开始消费者 - 开始制作人

生产者和消费者amqp依赖mvn dependency:tree

[INFO] |  +- org.springframework.boot:spring-boot-starter-amqp:jar:2.1.6.RELEASE:compile
[INFO] |  |  +- org.springframework:spring-messaging:jar:5.1.8.RELEASE:compile
[INFO] |  |  \- org.springframework.amqp:spring-rabbit:jar:2.1.7.RELEASE:compile
[INFO] |  |     +- org.springframework.amqp:spring-amqp:jar:2.1.7.RELEASE:compile
[INFO] |  |     |  \- org.springframework.retry:spring-retry:jar:1.2.4.RELEASE:compile
[INFO] |  |     +- com.rabbitmq:amqp-client:jar:5.4.3:compile
[INFO] |  |     \- org.springframework:spring-tx:jar:5.1.8.RELEASE:compile

生产者代码

/**
 * @author louis.gueye@gmail.com
 */
@RequiredArgsConstructor
@Slf4j
public class PlatformBrokerExampleProducerJob implements CommandLineRunner {

    private final AmqpTemplate template;

    @Override
    public void run(String... args) {
        final Instant now = Instant.now();
        final Instant anHourAgo = now.minus(Duration.ofHours(1));
        final String directExchangeName = "careassist_queues";
        final String fanoutExchangeName = "careassist_schedules_topics";
        IntStream.range(0, 60).boxed().forEach(i -> {
            final SensorEventDto event = SensorEventDto.builder() //
                    .id(UUID.randomUUID().toString()) //
                    .businessId("sens-q7ikjxk1ftik") //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .state(SensorState.on) //
                    .build();
            final String routingKey = "care.events";
            template.convertAndSend(directExchangeName, routingKey, event);
            log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
        });
        IntStream.range(0, 60).boxed().forEach(i -> {
            final SensorEventDto event = SensorEventDto.builder() //
                    .id(UUID.randomUUID().toString()) //
                    .businessId("sens-q7ikjxk1ftik") //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .state(SensorState.off) //
                    .build();
            final String routingKey = "maintenance.events";
            template.convertAndSend(directExchangeName, routingKey, event);
            log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
        });
        IntStream.range(0, 60).boxed().forEach(i -> {
            final SensorEventDto event = SensorEventDto.builder() //
                    .id(UUID.randomUUID().toString()) //
                    .businessId("sens-q7ikjxk1ftik") //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .state(SensorState.off) //
                    .build();
            final ScheduleDto schedule = ScheduleDto.builder().id(UUID.randomUUID().toString()) //
                    .destination("any.routing.queue") //
                    .message(event) //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .build();
            final String routingKey = "#";
            template.convertAndSend(fanoutExchangeName, routingKey, schedule);
            log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), fanoutExchangeName, routingKey);
        });
    }
}

消费者代码(1个监听器)

@Component
@RabbitListener(queues = {PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME})
@Slf4j
public class PlatformBrokerExampleCareEventsQueueConsumer {
    public static final String QUEUE_NAME = "care_events";
    @RabbitHandler
    public void onMessage(SensorEventDto event) {
        log.info("<<<<<<<<<<<< Received event [" + event + "] from {}...", PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME);
    }
}

我希望生产者生产然后关闭,但相反,java 进程无限期挂起

如能解释为什么生产者在生成消息后不会停止,我们将不胜感激。我怀疑它与 spring-started-amqp 有关,但我不确定。我当然不需要完整的 jar ,只需要包含 AmqpTemplate 的小 jar

注意:消费者收到所有消息

github project

感谢您的帮助。

最佳答案

AMQP 客户端有一些后台线程。

您应该更改 main() 方法以在运行程序返回后关闭应用程序上下文...

public static void main(String[] args) {
    SpringApplication.run(MyApplication.class, args).close();
}

它会干净利落地关闭一切,不像 System.exit() 那样残酷。

关于java - Spring boot standalone CommandLineRunner 不会返回 spring-starter-amqp,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57001749/

相关文章:

java - 对多个属性使用 Spring @Value 注解

python - Django 中消息队列消费者放在哪里?

java - Spring Boot 未连接到 RabbitMQ

java - Spring Boot 安全性无法正常工作

java - 注册 spring-boot-admin-client 错误

spring-boot - 升级到Apache Derby 10.15后,Spring Boot应用程序中断-为什么?

java - spring-amqp读取一条rabbit消息最早的入口点是什么?

java - Java List<String> 和 List<String>[] 有什么区别

java - Pragma Pack 使用 C 库导致 jvm 崩溃

java - 在 Jackcess 中对表格进行排序