我正在使用 Spring Boot 1.4.0.RELEASE、Spring Integration 4.3.1.RELEASE、Spring Integration DSL 1.2.0.M1。
我正在尝试做的事情:
我正在编写一个应用程序,它将从 FTP 和本地文件系统读取文件(使用入站 channel 适配器),将文件传输到本地工作目录(使用文件出站网关),进行处理,然后将它们移动到最终目的地(文件出站网关/适配器)。
我遇到了“调度程序没有 channel 订阅者”错误的问题。我相信这可能意味着上下文中的某些内容已损坏并且集成组件未启动。当我调试时,上下文本身表示它处于 Activity 状态。
我的实际配置相当大,所以我不想找人为我找到解决方案。我正在寻找一些关于在哪里查找以及如何找出哪个组件在提示的指导。
实际错误如下。
DEBUG [integration.channel.ExecutorChannel] [task-scheduler-1] preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application:test.fileReadingFlow.channel#1'.
fileReadingFlow
是一个 InboundChannelAdapter
,它从目录读取文件(基本上,我问的是 here 。其中没有什么复杂的。适配器发送消息发送到 .log()
处理程序,丰富 header ,将其发送到处理程序 (Files.outboundgateway
),最后发送到 MessageChannel
。
我尝试过的:
- 我已经浏览了
MessageChannel
链,所有内容都排列整齐(没有拼写错误,所有Bean
都存在)。 - 我在
fileReadingFlow
中添加了更多LoggingHandler
来识别消息出错的位置。 - 我删除了
fileReadingFlow
的部分内容,看看是否可以进一步获取消息。 - 我删除了一些
组件
以查看是否可以找到问题。 - 我为
org.springframework.integration
添加了调试日志记录,但没有出现任何类似错误或警告的情况。
我发现,当流程第一次尝试执行日志记录以外的操作(甚至enrichHeaders)时,会发生调度程序错误,并且消息最终会出现在errorChannel
中。当我将 fileReadingFlow 更改为仅读取文件、记录消息并以空处理程序终止时,我收到了 Dispatcher 错误。因此,我相当确定问题不在于 fileReadingFlow
本身。
除了一一删除每个Component
之外,还有什么方法可以追踪导致错误的原因吗?
编辑:
来源:
@Bean(name = "fileReadingFlow")
@Scope("prototype")
@Profile("test")
public IntegrationFlow testFileReadingFlow(MyEntity entity) {
return IntegrationFlows.from(s -> s.file(new File("someFolder")))
.filter(fileListFilterBuilder.buildFileListFilter(File.class))
, endpointConfigurer -> endpointConfigurer.poller(poller)
)
.log(DEBUG, "com.myco.testFileReadingFlow")
.enrichHeaders(h ->
h.header("entity", entity)
.header(FOLDER_NAME, entity.getFolder())
)
.log(DEBUG, "com.myco.testFileReadingFlow", message -> "after headers")
.handle(Files.outboundGateway("workingFolder").deleteSourceFiles(true).autoCreateDirectory(true))
.log(DEBUG, "com.myco.testFileReadingFLow", message -> "sending message to aggregatingFileChannel " + message)
.channel("aggregatingFileChannel")
.get();
}
@Bean
public MessageChannel aggregatingFileChannel() {
return MessageChannels.executor(Executors.newCachedThreadPool()).get();
}
@Bean
public IntegrationFlow aggregatingFlow() {
// Read from the aggregatingFileChannel
return from("aggregatingFileChannel")
<...>
.get();
}
应用:
@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);
MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);
List<MyEntity> entities = entitySvc.findAllActive();
AutowireCapableBeanFactory beanFactory = context.getBeanFactory();
entities.forEach(entity -> {
IntegrationFlow flow = (IntegrationFlow) context.getBean("fileReadingFlow", entity);
beanFactory.getBean(entity.getFolder() + MyConstants.ADAPTER, Lifecycle.class).start();
}
解决方案:
根据我下面的评论,@Prototype
方法在某些时候确实有效,但我破坏了它并且无法轻松回滚更改。根据 Gary 和 Artem 的建议,我尝试更改为使用 IntegrationFlowContext 方法。为了保留运行时启动、配置文件驱动注入(inject)等。我最初将 IntegrationFlow
的定义从 @Configuration
类移动到 @服务
类。这样我就可以将 IntegrationFlowContext
注入(inject) Service
中,并为不同的配置文件实现不同版本的 Service
,而无需我的 Application
了解个人资料
。主要方法是从 Context
中提取 Bean
并手动启动它,到检索 Service
并调用方法。
@Service
@Profile("test")
public class TestFlowSvc implements FlowSvc {
public IntegrationFlow testFileReadingFlow(Vendor vendor) {
return // As previous Flow
}
public void startFileReadingFlow(MyEntity entity) {
IntegrationFlow flow = testFileReadingFlow(entity);
integrationFlowContext.register(flow, true);
}
}
应用:
@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);
MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);
FlowSvc flowSvc = context.getBean(FlowSvc.class);
List<MyEntity> entities = entitySvc.findAllActive();
entities.forEach(entity -> {
flowSvc.startFileReadingFlow(entity);
}
最佳答案
我们有Dispatcher has no Subscribers
当有一些 SubscribableChannel
时出错无 Activity Subscriber
(MessageHandler
就 Spring 集成而言)。
“Active”表示根本没有定义或者处于停止状态。
所以,对于 application:test.fileReadingFlow.channel#1
的问题MessageChannel
,我会调查一下 fileReadingFlow
IntegrationFlow
再一次找到第二个匿名 DirectChannel
上的内容.
我无法找出调试此类问题的简单方法,因为有 MessageChannel
,但没有什么可跟踪的,因为 Dispatcher has no Subscribers
.
所以,请表明 fileReadingFlow
IntegrationFlow
定义,让我们一起解决问题!
查看您的相关问题,应该是这个:
.handle(Files.outboundGateway())
可能只是处于停止状态。
但在实际代码出现之前我们无法确定。
关于java - Spring Integration - 如何调试 'Dispatcher has no Subscribers'?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39130135/