rss - 如何在 Spring Integration 中动态注册 Feed Inbound Adapter?

标签 rss spring-integration dsl atom-feed enterprise-integration

我正在尝试在 spring-integration 中实现 RSS/Atom 提要聚合器我主要使用 Java DSL 编写我的 IntegrationFlow .此聚合器的一个要求是可以在运行时添加/删除提要。也就是说,提要在设计时是未知的。

我发现使用基本的 Feed.inboundAdapter() 很简单使用测试 url 并使用转换器从提要中提取链接,然后将其传递给 outbound-file-adapter将链接保存到文件。但是,当我试图从 inbound-file-adapter 中读取(数千个)提要 URL 时,我陷入了困境。通过 FileSplitter 运行文件然后传递每个结果 Message<String>包含提要网址,然后注册一个新的 Feed.inboundAdapter() .这对 Java DSL 来说是不可能的吗?

理想情况下,如果我能做到以下几点,我会喜欢它:

@Bean
public IntegrationFlow getFeedsFromFile() throws MalformedURLException {
    return IntegrationFlows.from(inboundFileChannel(), e -> e.poller(Pollers.fixedDelay(10000)))
            .handle(new FileSplitter())
            //register new Feed.inboundAdapter(payload.toString()) foreach Message<String> containing feed url coming from FileSplitter
            .transform(extractLinkFromFeedEntry())
            .handle(appendLinkToFile())
            .get();
} 

尽管在多次通读了 spring integration java DSL 代码(并在此过程中学习了大量内容)后,我还是看不出这样做是否可行。所以... A) 是吗? B) 应该是吗? C)建议?

几乎感觉我应该能够获取 .handle(new FileSplitter()) 的输出并将其传递给 .handleWithAdapter(Feed.inboundAdapter(/*stuff here*/))但 DSL 仅引用 outbound-adapter在那里。入站适配器实际上只是 AbstractMessageSource 的一个子类似乎唯一可以指定其中之一的地方是作为 IntegrationFlows.from(/*stuff here*/) 的参数方法。

我原以为可以从文件中获取输入,逐行拆分,使用该输出来注册入站提要适配器,轮询这些提要,在提要出现时从提要中提取新链接并附加它们到一个文件。看起来好像不是。

我可以做一些聪明的子类化来完成这项工作吗?

如果失败...我怀疑这将是答案,我找到了 spring 集成 Dynamic Ftp Channel Resolver Examplethis回答如何调整它为入站案例动态注册内容...

那么这是要走的路吗?任何帮助/指导表示赞赏。在连续数天研究 DSL 代码和阅读文档之后,我想我将着手实现动态 ftp 示例并将其调整为与 FeedEntryMessageSource 一起使用......在这种情况下我的问题是......动态 ftp 示例有效使用 XML 配置,但是可以使用 Java 配置或 Java DSL 来完成吗?

更新

我已经实现了如下解决方案:

@SpringBootApplication 
class MonsterFeedApplication {

public static void main(String[] args) throws IOException {
    ConfigurableApplicationContext parent = SpringApplication.run(MonsterFeedApplication.class, args);

    parent.setId("parent");
    String[] feedUrls = {
            "https://1nichi.wordpress.com/feed/",
            "http://jcmuofficialblog.com/feed/"};

    List<ConfigurableApplicationContext> children = new ArrayList<>();
    int n = 0;
    for(String feedUrl : feedUrls) {
        AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
        child.setId("child" + ++n);
        children.add(child);
        child.setParent(parent);
        child.register(DynamicFeedAdapter.class);
        StandardEnvironment env = new StandardEnvironment();
        Properties props = new Properties();
        props.setProperty("feed.url", feedUrl);
        PropertiesPropertySource pps = new PropertiesPropertySource("feed", props);
        env.getPropertySources().addLast(pps);
        child.setEnvironment(env);
        child.refresh();
    }

    System.out.println("Press any key to exit...");
    System.in.read();
    for (ConfigurableApplicationContext child : children) {
        child.close();
    }
    parent.close();
}

@Bean
public IntegrationFlow aggregateFeeds() {       
    return IntegrationFlows.from("feedChannel")
            .transform(extractLinkFromFeed())
            .handle(System.out::println)
            .get();
}

@Bean
public MessageChannel feedChannel() {
    return new DirectChannel();
}

@Bean
public AbstractPayloadTransformer<SyndEntry, String> extractLinkFromFeed() {
    return new AbstractPayloadTransformer<SyndEntry, String>() {
        @Override
        protected String transformPayload(SyndEntry payload) throws Exception {
            return payload.getLink();
        }
    };

}

}

DynamicFeedAdapter.java

@Configuration
@EnableIntegration
public class DynamicFeedAdapter {

    @Value("${feed.url}")
    public String feedUrl;

    @Bean
    public static PropertySourcesPlaceholderConfigurer pspc() {
        return new PropertySourcesPlaceholderConfigurer();
    }

    @Bean
    public IntegrationFlow feedAdapter() throws MalformedURLException {

        URL url = new URL(feedUrl);

        return IntegrationFlows
                .from(s -> s.feed(url, "feedTest"), 
                        e -> e.poller(p -> p.fixedDelay(10000)))
                .channel("feedChannel")
                .get();
    }

}

这有效 IF 并且只有 IF 我有 一个application.properties 中定义的 url作为feed.url=[insert url here] .否则它不会告诉我“无法解析属性 {feed.url}”。我怀疑那里发生的事情是 @BeanDynamicFeedAdapter.java 中定义所有的单例都被急切初始化,所以除了在 main 方法的 for 循环中手动创建的 bean(工作正常,因为它们注入(inject)了 feed.url 属性),我们有一个已经被急切初始化的流浪单例,如果没有feed.url 在 application.properties 中定义,然后它无法解析该属性,一切都会崩溃。现在根据我对 Spring 的了解,我知道应该可以 @LazyDynamicFeedAdapter.java 中初始化 bean所以我们不会以这个不想要的流浪单例问题 child 结束。现在的问题是……如果我只标记 feedAdapter() @Lazy然后 beans 永远不会被初始化。我如何自己初始化它们?

更新 - 问题已解决

Without having tested it, I think the problem is that boot is finding the DynamicFeedAdapter during its component scan. A simple solution is to move it to a sibling package. If MonsterFeedApplication is in com.acme.foo, then put the adapter config class in com.acme.bar. That way, boot won't consider it "part" of the application

确实是这个问题。执行 Gary 的建议后,一切正常。

最佳答案

参见 the answer to this questionits follow up有关入站邮件适配器的类似问题。

本质上,每个提要适配器都是在参数化的子上下文中创建的。

在那种情况下,子上下文是在 main() 方法中创建的,但没有理由不能在 .handle() 调用的服务中完成它.

关于rss - 如何在 Spring Integration 中动态注册 Feed Inbound Adapter?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36258791/

相关文章:

c# - Rss feed Ebay c#

python - 使用 Python 混合 RSS 提要的好方法是什么?

javascript - 无法使用Axios和Vue.Js从其他网站检索rss xml信息

java - 无法写入请求: no suitable HttpMessageConverter found for request type and content type [application/x-java-serialized-object]

php - 闭包如何帮助创建 DSL/fluent 接口(interface) : PHP examples?

android - 如何将 RSS 提要加载到 ListView 中?

java - 使用 <int-jpa :outbound-channel-adapter 保留实体列表

java - 如何将字符串转换为 XML 文件?

json - 使用 Scala 进行无噪音 JSON 处理

Python编译eval?