java - 如何优雅地结束 spring @Schedule 任务?

标签 java spring spring-boot rabbitmq spring-scheduled

我正在尝试让 Spring Boot 服务优雅地结束。它有一个带有 @Scheduled 的方法注解。该服务对 DB 使用 spring-data,对 RabbitMQ 使用 spring-cloud-stream。在预定的方法结束之前,DB 和 RabbitMQ 是可访问的,这一点至关重要。有一个自动缩放器可以经常启动/停止服务实例,停止时崩溃不是一种选择。

来自这篇文章 Spring - Scheduled Task - Graceful Shutdown我认为应该足以添加

    @Bean
    TaskSchedulerCustomizer taskSchedulerCustomizer() {
        return taskScheduler -> {
            taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
            taskScheduler.setAwaitTerminationSeconds(30);
        };
    }

并且 spring 应该等待关闭应用程序,直到预定的方法完成或 30 秒到期。

在执行计划方法时停止服务时,我可以从日志中看到以下内容
  • spring-cloud-stream 正在关闭连接而不等待方法完成。
  • spring-data 也立即关闭 db-connection。
  • 该方法不会停止并尝试完成但失败,因为它无法再访问 Db。

  • 任何想法如何让包括 db-connection 和 rabbitMq 访问在内的预定方法优雅地完成?

    这是我的应用程序类:
    @SpringBootApplication(scanBasePackages = {
        "xx.yyy.infop.dao",
        "xx.yyy.infop.compress"})
    @EntityScan("ch.sbb.infop.common.entity")
    @EnableJpaRepositories({"xx.yyy.infop.dao", "xx.yyy.infop.compress.repository"})
    @EnableBinding(CompressSink.class)
    @EnableScheduling
    public class ApplicationCompress {
    
        @Value("${max.commpress.timout.seconds:300}")
        private int maxCompressTimeoutSeconds;
    
        public static void main(String[] args) {
            SpringApplication.run(ApplicationCompress.class, args);
        }
    
        @Bean
        TaskSchedulerCustomizer taskSchedulerCustomizer() {
            return taskScheduler -> {
                taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
                taskScheduler.setAwaitTerminationSeconds(maxCompressTimeoutSeconds);
            };
        }
    
    }
    

    这是 bean :
    @Component
    @Profile("!integration-test")
    public class CommandReader {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(CommandReader.class);
    
        private final CompressSink compressSink;
        private final CommandExecutor commandExecutor;
    
        CommandReader(CompressSink compressSink, CommandExecutor commandExecutor) {
            this.compressSink = compressSink;
            this.commandExecutor = commandExecutor;
        }
    
        @PreDestroy
        private void preDestory() {
            LOGGER.info("preDestory");
        }
    
        @Scheduled(fixedDelay = 1000)
        public void poll() {
            LOGGER.debug("Start polling.");
            ParameterizedTypeReference<CompressCommand> parameterizedTypeReference = new ParameterizedTypeReference<>() {
            };
            if (!compressSink.inputSync().poll(this::execute, parameterizedTypeReference)) {
                compressSink.inputAsync().poll(this::execute, parameterizedTypeReference);
            }
            LOGGER.debug("Finished polling.");
        }
    
        private void execute(Message<?> message) {
            CompressCommand compressCommand = (CompressCommand) message.getPayload();
    
            // uses spring-data to write to DB
            CompressResponse compressResponse = commandExecutor.execute(compressCommand);
    
            // Schreibt die Anwort in Rensponse-Queue
            compressSink.outputResponse().send(MessageBuilder.withPayload(compressResponse).build());
        }
    
    }
    

    这里是日志中的一些行(请参阅 https://pastebin.com/raw/PmmqhH1P 以获取完整日志):
    2020-05-15 11:59:35,640 [restartedMain] - INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.initialize - traceid= - Initializing ExecutorService 'taskScheduler'
    
    2020-05-15 11:59:44,976 [restartedMain] - INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.initialize - traceid= - Initializing ExecutorService 'applicationTaskExecutor'
    
    Disconnected from the target VM, address: '127.0.0.1:52748', transport: 'socket'
    2020-05-15 12:00:01,228 [SpringContextShutdownHook] - INFO org.springframework.cloud.stream.binder.BinderErrorChannel.adjustCounterIfNecessary - traceid= - Channel 'application-1.kompressSync.komprimierungSyncProcessingGroup.errors' has 1 subscriber(s).
    2020-05-15 12:00:01,229 [SpringContextShutdownHook] - INFO org.springframework.cloud.stream.binder.BinderErrorChannel.adjustCounterIfNecessary - traceid= - Channel 'application-1.kompressSync.komprimierungSyncProcessingGroup.errors' has 0 subscriber(s).
    2020-05-15 12:00:01,232 [SpringContextShutdownHook] - INFO org.springframework.cloud.stream.binder.BinderErrorChannel.adjustCounterIfNecessary - traceid= - Channel 'application-1.kompressAsync.komprimierungAsyncProcessingGroup.errors' has 1 subscriber(s).
    2020-05-15 12:00:01,232 [SpringContextShutdownHook] - INFO org.springframework.cloud.stream.binder.BinderErrorChannel.adjustCounterIfNecessary - traceid= - Channel 'application-1.kompressAsync.komprimierungAsyncProcessingGroup.errors' has 0 subscriber(s).
    2020-05-15 12:00:01,237 [SpringContextShutdownHook] - INFO org.springframework.integration.endpoint.EventDrivenConsumer.logComponentSubscriptionEvent - traceid= - Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
    2020-05-15 12:00:01,237 [SpringContextShutdownHook] - INFO org.springframework.integration.channel.PublishSubscribeChannel.adjustCounterIfNecessary - traceid= - Channel 'application-1.errorChannel' has 0 subscriber(s).
    2020-05-15 12:00:01,237 [SpringContextShutdownHook] - INFO org.springframework.integration.endpoint.EventDrivenConsumer.stop - traceid= - stopped bean '_org.springframework.integration.errorLogger'
    2020-05-15 12:00:01,244 [SpringContextShutdownHook] - INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.shutdown - traceid= - Shutting down ExecutorService 'applicationTaskExecutor'
    2020-05-15 12:00:01,245 [SpringContextShutdownHook] - INFO yy.xxx.infop.compress.CommandReader.preDestory - traceid= - preDestory
    2020-05-15 12:00:01,251 [SpringContextShutdownHook] - INFO org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean.destroy - traceid= - Closing JPA EntityManagerFactory for persistence unit 'default'
    2020-05-15 12:00:01,256 [SpringContextShutdownHook] - INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown - traceid= - Shutting down ExecutorService 'taskScheduler'
    2020-05-15 12:00:01,256 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 4
    2020-05-15 12:00:02,257 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 5
    2020-05-15 12:00:03,258 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 6
    2020-05-15 12:00:04,260 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 7
    2020-05-15 12:00:05,260 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 8
    2020-05-15 12:00:06,261 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - 9
    2020-05-15 12:00:07,262 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.doInTransactionWithoutResult - traceid=d22b696edc90e123 - end
    2020-05-15 12:00:07,263 [scheduling-1] - INFO yy.xxx.infop.compress.condense.VmLaufVerdichter.verdichte - traceid=d22b696edc90e123 - VarianteTyp=G, vmId=482392382, vnNr=8416
    2020-05-15 12:00:07,326 [scheduling-1] -ERROR yy.xxx.infop.compress.CommandExecutor.execute - traceid=d22b696edc90e123 - 
    
    org.springframework.beans.factory.BeanCreationNotAllowedException: Error creating bean with name 'inMemoryDatabaseShutdownExecutor': Singleton bean creation not allowed while singletons of this factory are in destruction (Do not request a bean from a BeanFactory in a destroy method implementation!)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:208) ~[spring-beans-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    
    2020-05-15 12:00:08,332 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.execute - traceid=d22b696edc90e123 - Compress started. compressCommand=yy.xxx.infop.compress.client.CompressCommand@247ec0d[hostName=K57176,jobId=b1211ee8-4a54-47f2-a58b-92b3560bbddd,cmdId=1,userId=goofy2,commandTyp=verdichtet G, T und komprimiert G, T,vmId=482392382,started=1589536752609]
    2020-05-15 12:00:08,337 [scheduling-1] -ERROR yy.xxx.infop.compress.CommandExecutor.execute - traceid=d22b696edc90e123 - 
    
    org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.IllegalStateException: EntityManagerFactory is closed
        at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:448) ~[spring-orm-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    
    2020-05-15 12:00:10,339 [scheduling-1] - INFO yy.xxx.infop.compress.CommandExecutor.execute - traceid=d22b696edc90e123 - Compress started. compressCommand=yy.xxx.infop.compress.client.CompressCommand@247ec0d[hostName=K57176,jobId=b1211ee8-4a54-47f2-a58b-92b3560bbddd,cmdId=1,userId=goofy2,commandTyp=verdichtet G, T und komprimiert G, T,vmId=482392382,started=1589536752609]
    2020-05-15 12:00:10,343 [scheduling-1] -ERROR yy.xxx.infop.compress.CommandExecutor.execute - traceid=d22b696edc90e123 - 
    
    org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.IllegalStateException: EntityManagerFactory is closed
    
    2020-05-15 12:00:10,351 [scheduling-1] -DEBUG yy.xxx.infop.compress.CommandReader.poll - traceid=d22b696edc90e123 - Finished polling.
    2020-05-15 12:00:10,372 [SpringContextShutdownHook] - INFO org.springframework.integration.monitor.IntegrationMBeanExporter.destroy - traceid= - Summary on shutdown: bean 'response'
    2020-05-15 12:00:10,372 [SpringContextShutdownHook] - INFO org.springframework.integration.monitor.IntegrationMBeanExporter.destroy - traceid= - Summary on shutdown: nullChannel
    2020-05-15 12:00:10,373 [SpringContextShutdownHook] - INFO org.springframework.integration.monitor.IntegrationMBeanExporter.destroy - traceid= - Summary on shutdown: bean 'errorChannel'
    2020-05-15 12:00:10,373 [SpringContextShutdownHook] - INFO org.springframework.integration.monitor.IntegrationMBeanExporter.destroy - traceid= - Summary on shutdown: bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger'
    2020-05-15 12:00:10,374 [SpringContextShutdownHook] - INFO com.zaxxer.hikari.HikariDataSource.close - traceid= - HikariPool-1 - Shutdown initiated...
    2020-05-15 12:00:10,405 [SpringContextShutdownHook] - INFO com.zaxxer.hikari.HikariDataSource.close - traceid= - HikariPool-1 - Shutdown completed.
    
    Process finished with exit code 130
    

    最佳答案

    我已经测试了这个配置,它应该和你的 TaskSchedulerCustomizer 做同样的事情:

    spring.task.scheduling.shutdown.await-termination=true
    spring.task.scheduling.shutdown.await-termination-period=30s
    
    如果有 Activity 任务,Spring 会在所有服务可用时等待 30 秒,然后再关闭任何服务。如果没有 Activity 任务,则立即关闭。
    值得一提的是,让我想到这个问题的是@Async的优雅关机。以非常相似的方式配置的方法:
    spring.task.execution.shutdown.await-termination=true
    spring.task.execution.shutdown.await-termination-period=1s
    
    或在代码中:
    @Bean
    public TaskExecutorCustomizer taskExecutorCustomizer() {
        // Applies to @Async tasks, not @Scheduled as in the question
        return (customizer) -> {
            customizer.setWaitForTasksToCompleteOnShutdown(true);
            customizer.setAwaitTerminationSeconds(10);
        };
    }
    
    回到你的情况,我的猜测是 TaskSchedulerCustomizer并未实际执行或在执行后被其他内容覆盖。
    对于第一个选项,通过添加日志语句或在 taskSchedulerCustomizer() 中设置断点来验证.
    对于第二个选项,我建议在 TaskSchedulerBuilder::configure() 中设置断点看看会发生什么。一旦调试器中断该方法,在 ExecutorConfigurationSupport::awaitTerminationMillis 上添加一个数据断点taskScheduler 的属性(property)查看该属性是否在其他地方被修改。
    可以在方法ExecutorConfigurationSupport::awaitTerminationIfNecessary中看到shutdown过程中使用的最终终止时间。 .

    关于java - 如何优雅地结束 spring @Schedule 任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61816828/

    相关文章:

    spring - Grails + Spring 安全核心 : How to assign a role after user has logged in?

    java - 包含在正文中时缺少请求参数

    java - 有没有办法将字符串数组作为变量传递给服务器端 Xquery 脚本?

    mongodb - 如何查询不区分大小写并包含 Spring MongoTemplate

    spring-boot - 带有 gradle 的 Springboot 项目因错误而失败 - 多个绑定(bind)

    java - 在子类构造函数中调用 getClass() 总是安全的吗?

    java - G1GC 的延迟问题

    java - 当方法位于基类内部并且测试方法类扩展它时,如何在 JUnit 中模拟方法调用?

    java - JDK 1.5 是否支持 -Xdebug 选项?

    java - Eclipse 中 Java 8 换行 RegEx 转义字符的问题