java - Spring Integration - 如何调试 'Dispatcher has no Subscribers'?

标签 java spring debugging spring-integration

我正在使用 Spring Boot 1.4.0.RELEASE、Spring Integration 4.3.1.RELEASE、Spring Integration DSL 1.2.0.M1。

我正在尝试做的事情:

我正在编写一个应用程序,它将从 FTP 和本地文件系统读取文件(使用入站 channel 适配器),将文件传输到本地工作目录(使用文件出站网关),进行处理,然后将它们移动到最终目的地(文件出站网关/适配器)。

我遇到了“调度程序没有 channel 订阅者”错误的问题。我相信这可能意味着上下文中的某些内容已损坏并且集成组件未启动。当我调试时,上下文本身表示它处于 Activity 状态。

我的实际配置相当大,所以我不想找人为我找到解决方案。我正在寻找一些关于在哪里查找以及如何找出哪个组件在提示的指导。

实际错误如下。

DEBUG [integration.channel.ExecutorChannel] [task-scheduler-1] preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application:test.fileReadingFlow.channel#1'.

fileReadingFlow 是一个 InboundChannelAdapter,它从目录读取文件(基本上,我问的是 here 。其中没有什么复杂的。适配器发送消息发送到 .log() 处理程序,丰富 header ,将其发送到处理程序 (Files.outboundgateway),最后发送到 MessageChannel

我尝试过的:

  • 我已经浏览了 MessageChannel 链,所有内容都排列整齐(没有拼写错误,所有 Bean 都存在)。
  • 我在 fileReadingFlow 中添加了更多 LoggingHandler 来识别消息出错的位置。
  • 我删除了 fileReadingFlow 的部分内容,看看是否可以进一步获取消息。
  • 我删除了一些组件以查看是否可以找到问题。
  • 我为 org.springframework.integration 添加了调试日志记录,但没有出现任何类似错误或警告的情况。

我发现,当流程第一次尝试执行日志记录以外的操作(甚至enrichHeaders)时,会发生调度程序错误,并且消息最终会出现在errorChannel中。当我将 fileReadingFlow 更改为仅读取文件、记录消息并以空处理程序终止时,我收到了 Dispatcher 错误。因此,我相当确定问题不在于 fileReadingFlow 本身。

除了一一删除每个Component之外,还有什么方法可以追踪导致错误的原因吗?

编辑:

来源:

@Bean(name = "fileReadingFlow")
@Scope("prototype")
@Profile("test")
public IntegrationFlow testFileReadingFlow(MyEntity entity) {

    return IntegrationFlows.from(s -> s.file(new File("someFolder")))
            .filter(fileListFilterBuilder.buildFileListFilter(File.class))
            , endpointConfigurer -> endpointConfigurer.poller(poller)
    )
            .log(DEBUG, "com.myco.testFileReadingFlow")
            .enrichHeaders(h ->
                    h.header("entity", entity)
                            .header(FOLDER_NAME, entity.getFolder())
            )
            .log(DEBUG, "com.myco.testFileReadingFlow", message -> "after headers")
            .handle(Files.outboundGateway("workingFolder").deleteSourceFiles(true).autoCreateDirectory(true))
                .log(DEBUG, "com.myco.testFileReadingFLow", message -> "sending message to aggregatingFileChannel " + message)
                .channel("aggregatingFileChannel")
                .get();
    }
@Bean
public MessageChannel aggregatingFileChannel() {
    return MessageChannels.executor(Executors.newCachedThreadPool()).get();

}

@Bean
public IntegrationFlow aggregatingFlow() {

    // Read from the aggregatingFileChannel
    return from("aggregatingFileChannel")
            <...>
            .get();
}

应用:

@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
        basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);

        MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);

        List<MyEntity> entities = entitySvc.findAllActive();

        AutowireCapableBeanFactory beanFactory = context.getBeanFactory();

        entities.forEach(entity -> {
            IntegrationFlow flow = (IntegrationFlow) context.getBean("fileReadingFlow", entity);

            beanFactory.getBean(entity.getFolder() + MyConstants.ADAPTER, Lifecycle.class).start();
    }

解决方案:

根据我下面的评论,@Prototype 方法在某些时候确实有效,但我破坏了它并且无法轻松回滚更改。根据 Gary 和 Artem 的建议,我尝试更改为使用 IntegrationFlowContext 方法。为了保留运行时启动、配置文件驱动注入(inject)等。我最初将 IntegrationFlow 的定义从 @Configuration 类移动到 @服务类。这样我就可以将 IntegrationFlowContext 注入(inject) Service 中,并为不同的配置文件实现不同版本的 Service,而无需我的 Application 了解个人资料。主要方法是从 Context 中提取 Bean 并手动启动它,到检索 Service 并调用方法。

@Service
@Profile("test")
public class TestFlowSvc implements FlowSvc {
    public IntegrationFlow testFileReadingFlow(Vendor vendor) {
        return // As previous Flow
    }

    public void startFileReadingFlow(MyEntity entity) {

        IntegrationFlow flow = testFileReadingFlow(entity);

        integrationFlowContext.register(flow, true);
    }
}

应用:

@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
        basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);

        MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);
        FlowSvc flowSvc = context.getBean(FlowSvc.class);
        List<MyEntity> entities = entitySvc.findAllActive();

        entities.forEach(entity -> {
            flowSvc.startFileReadingFlow(entity);
    }

最佳答案

我们有Dispatcher has no Subscribers当有一些 SubscribableChannel 时出错无 Activity Subscriber (MessageHandler 就 Spring 集成而言)。 “Active”表示根本没有定义或者处于停止状态。

所以,对于 application:test.fileReadingFlow.channel#1 的问题MessageChannel ,我会调查一下 fileReadingFlow IntegrationFlow再一次找到第二个匿名 DirectChannel 上的内容.

我无法找出调试此类问题的简单方法,因为有 MessageChannel ,但没有什么可跟踪的,因为 Dispatcher has no Subscribers .

所以,请表明 fileReadingFlow IntegrationFlow定义,让我们一起解决问题!

查看您的相关问题,应该是这个:

.handle(Files.outboundGateway())

可能只是处于停止状态。

但在实际代码出现之前我们无法确定。

关于java - Spring Integration - 如何调试 'Dispatcher has no Subscribers'?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39130135/

相关文章:

java - 如何在 Java AWT 中使面板在框架内可见?

java - 警告 : static method should be accessed in a static way

eclipse - 在 Ubuntu 12.04 LTS + Eclipse Juno 中运行/调试 Openerp7.0

java - 尊重 Java/Swing 中的字体连字

java - 我正在为 ListView 使用简单适配器,但在 ListView 中滚动时开关会更改其状态?

java - Spring JsonPath 包含任何顺序异常

java - 如何在Spring-Quartz中使用“MethodInvokingJobDetailFactoryBean”创建“targetObject”的原型(prototype)实例?

java - Spring Boot 中单个资源使用 @RepositoryRestController 和 @RepositoryRestResource 的冲突

debugging - Netbeans 调试器不工作(GDB 意外停止并返回 1)

c++ - 应用程序在编译错误后执行时关闭,但在调试时它工作正常!