spring-boot - 运行 Spring Boot Scheduler 和 Apache Camel 时出现问题

标签 spring-boot apache-camel scheduler

我正在尝试使用 Spring Boot 和 Apache Camel 文件组件的演示文件传输程序。我使用 Spring Boot 编写了一个调度程序,该调度程序每 1 分钟运行一次,调用 Apache Camel 路由并执行文件传输。我在目录 C:\CamelDemo\inputFolder 中有三个文件,即 input1.txt、input2.txt 和 input3.txt。我的 Spring Boot 调度程序如下:

package com.example.demo.scheduler;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class Scheduler {

    @Autowired private ProducerTemplate producerTemplate;

    @Scheduled(fixedRate=60000)
    public void fixedRateSch() {
          SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
          Date now = new Date();
          String strDate = sdf.format(now);
          System.out.println("Fixed Rate scheduler:: " + strDate);


          producerTemplate.sendBody("direct:transferFile", null);
          System.out.println("Success");

       }

}

我的路线如下:

package com.example.demo.route;

import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class FileTransferRoute extends RouteBuilder {

    @SuppressWarnings("deprecation")
    @Override
    public void configure() {
        errorHandler(defaultErrorHandler()
            .maximumRedeliveries(3)
            .redeliverDelay(1000)
            .retryAttemptedLogLevel(LoggingLevel.WARN));

        from("direct:transferFile")
            .log("Route reached")
            .pollEnrich("file:C:\\CamelDemo\\inputFolder?noop=true")
            .to("file:C:\\CamelDemo\\outputFolder?autoCreate=false")
        .end();
    }

}

当我运行程序时,调度程序运行了 3 次,将三个文件一一传输。之后调度程序不再运行。然后,当我尝试通过关闭嵌入式 Tomcal 来停止 Spring Boot 应用程序时,它会给出以下错误:

2019-09-11 15:51:14.711  INFO 10408 --- [n(15)-127.0.0.1] inMXBeanRegistrar$SpringApplicationAdmin : Application shutdown requested.
2019-09-11 15:51:14.714  INFO 10408 --- [n(15)-127.0.0.1] o.a.camel.spring.SpringCamelContext      : Apache Camel 2.24.0 (CamelContext: camel-1) is shutting down
2019-09-11 15:51:14.714  INFO 10408 --- [n(15)-127.0.0.1] o.a.camel.impl.DefaultShutdownStrategy   : Starting to graceful shutdown 1 routes (timeout 300 seconds)
2019-09-11 15:51:14.717  INFO 10408 --- [ - ShutdownTask] o.a.camel.impl.DefaultShutdownStrategy   : Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 300 seconds. Inflights per route: [route1 = 1]
2019-09-11 15:51:14.718  INFO 10408 --- [ - ShutdownTask] o.a.camel.impl.DefaultShutdownStrategy   : There are 1 inflight exchanges:
    InflightExchange: [exchangeId=ID-PCIN467166-1568196927146-0-10, fromRouteId=route1, routeId=route1, nodeId=pollEnrich1, elapsed=0, duration=167106]

所以有以下问题: 1.如何使调度程序永远运行,以便它继续轮询文件位置,当新文件进入文件夹时,它会将文件传输到输出目录。 2. 如果我想要的话,如何正确关闭我的 Spring Boot 应用程序以及为什么在关闭期间抛出错误? 3. 如何在第一次运行时同时传输所有三个文件而不是一个一个地传输?

最佳答案

对于问题#3:我认为这是不可能的,请参阅 your other related question 中的评论

对于问题 #1 和 #2:理解此行为的关键点是您正在使用 pollEnrich接收模式下,因为您没有设置任何超时:请参阅PollEnrich documentation中有关超时的详细解释,特别是:

Good practice to use timeout value === By default Camel will use the receive. Which may block until there is a message available. It is therefore recommended to always provide a timeout value, to make this clear that we may wait for a message, until the timeout is hit. ===

回答您的问题:以下是您当前的路线设置所发生的情况:

  1. 每 60 秒 Spring 调度程序就会触发 FileTransferRoute通过向“direct:transferFile” channel 发送一 strip 有“null”正文的虚拟消息,
  2. FileTransferRoute检测并使用此消息:已创建 Exchange,将步骤 #1 创建的消息作为输入消息,
  3. pollEnrich FileTransferRoute 中的组件阻塞,直到在输入目录中检测到新文件(因为没有定义超时)

    => 这也会阻止 Spring 调度程序,因为您正在使用同步方法 producerTemplate.sendBody()和一个 direct: channel ( direct 组件使用同步调用)

    => 这回答了你的第一个问题:Spring 调度程序实际上仍在运行,但它的线程被阻塞在 sendBody() 中。方法调用,因为 pollEnrich组件轮询新传入的文件。输入目录中的下一个传入文件将恢复轮询;然后,计划的方法应完成,并且应在 60 秒后触发新的调用。在处理完第一个文件后,调度程序没有理由停止。

  4. 当您停止 Springboot 应用程序时,pollEnrich组件仍然被阻止,轮询新的传入文件:这意味着仍然有一个待处理的 Exchange(在上面的步骤 2 创建)尚未完成

    => 这回答了您的第二个问题:“飞行中的交换”实际上是由最新的调度程序调用创建的最新交换。 Camel 默认shutdown strategy如果有挂起(飞行中)交换,将在关闭前等待 X 秒。

我建议您为 Camel 记录器启用 DEBUG 级别,您将更好地了解幕后发生的情况。

您只需在 pollEnrich 组件中使用超时即可消除飞行中的交换问题并改进您的路线。由于您在专用的 Spring 调度程序中实现调度程序逻辑,我认为您应该使用 receiveNoWait模式(0 秒超时)。

    from("direct:transferFile").autoStartup(true)
            .pollEnrich("file:C:\\var\\CamelDemo\\inputFolder?noop=true&delay=2000", 0)

            .choice()
                .when(exchange -> exchange.getIn().getBody() != null)
                    .to("file:C:\\var\\CamelDemo\\outputFolder?autoCreate=false")
                // handle case where no file is found 
                .otherwise()
                    .log(" skip, no file found. ")
            .end();

希望这有帮助。

关于spring-boot - 运行 Spring Boot Scheduler 和 Apache Camel 时出现问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57887253/

相关文章:

java - SpringBoot : Inject repository into a class defined in service layer

maven - Gradle无法识别Cumulocity Microservice SDK软件包

java - 在运行时更改 Spring 任务的预定时间?

linux - 在linux task scheduler中,周期性调用什么函数来做调度工作?

java - 为什么这个 Spring Boot 应用程序找不到主页?

java - Spring 启动: getting this error - Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured

grails - 提取码头组件路线的url参数

java - 普通队列与 SEDA 队列

java - 使用 apache camel 从 gmail 收件箱中读取所有邮件

scheduler - DHTMLX Scheduler - 日历 View - 将焦点设置为 config.mark_now