java - 是否可以使用 BeanFactory 从不同的属性值创建集成流 bean?

标签 java spring spring-integration spring-integration-dsl

我的目的是从各种源/目录创建 IntegrationFlow bean 实例(首先,以后可能从 ftp)。因此,在 application.properties 中,我想定义类似的内容,入站目录的数量可能会有所不同:

inbound.file.readPath[0]=source1
inbound.file.processedPath[0]=processed1
inbound.file.failedPath[0]=failed1

inbound.file.readPath[1]=source2
inbound.file.processedPath[1]=processed2
inbound.file.failedPath[1]=failed2

我还想维护源代码的来源(通过 header 丰富),因此不能选择将所有文件放入 spring 之外的一个目录中。

那么拥有 FilePollingFlow 是否可以从上述属性创建这些 bean 实例? 我可以想象这样的事情,但我不确定如何将属性传递给 bean 实例,并且如何引用索引:

@Configuration
public class FilePollingIntegrationFlow extends AbstractFactoryBean<IntegrationFlow> {

    @Autowired
    private FilePollingConfiguration config;

    @Override
    public Class<IntegrationFlow> getObjectType() {
        return IntegrationFlow.class;
    }

    @Override
    protected IntegrationFlow createInstance() throws Exception {
      return IntegrationFlows
                .from(s -> /* FIXME config.getReadPath()? instead of inboundReadDirectory, but how to handle indices? */s.file(inboundReadDirectory).preventDuplicates(true).scanEachPoll(true).patternFilter("*.txt"),
                        e -> e.poller(Pollers.fixedDelay(inboundPollingPeriod)
                                .taskExecutor(taskExecutor())
                                .transactionSynchronizationFactory(transactionSynchronizationFactory())
                                .transactional(transactionManager())))
                .log(LoggingHandler.Level.INFO, getClass().getName(), "'Read inbound file: ' .concat(payload)")
                .enrichHeaders(m -> m.headerExpression(FileHeaders.ORIGINAL_FILE, "payload"))
                .transform(Transformers.fileToString())
                .channel(ApplicationConfiguration.FILE_INBOUND_CHANNEL)
                .get();
   }
}
@Component
@ConfigurationProperties("inbound")
public class FilePollingConfiguration {

private List<File> files = new ArrayList<>();

public static class File {
    private String readPath;
    private String processedPath;
    private String failedPath;

    public String getReadPath() {
        return readPath;
    }

    public void setReadPath(String readPath) {
        this.readPath = readPath;
    }

    public String getProcessedPath() {
        return processedPath;
    }

    public void setProcessedPath(String processedPath) {
        this.processedPath = processedPath;
    }

    public String getFailedPath() {
        return failedPath;
    }

    public void setFailedPath(String failedPath) {
        this.failedPath = failedPath;
    }

    @Override
    public String toString() {
        return new ToStringBuilder(this)
                .append("readPath", readPath)
                .append("processedPath", processedPath)
                .append("failedPath", failedPath)
                .toString();
    }


    public List<File> getFiles() {
        return files;
    }

    public void setFiles(List<File> files) {
        this.files = files;
    }
}

最佳答案

对于几个类似的流程,框架为您提供了一个解决方案,例如 IntegrationFlowContext:https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/java-dsl.html#java-dsl-runtime-flows 。因此,您需要的只是动态迭代这些文件和创建流程及其注册。

关于类似属性的列表,例如您内部的File,您应该修改 Spring Boot 的建议:https://docs.spring.io/spring-boot/docs/2.0.0.RELEASE/reference/htmlsingle/#boot-features-external-config-yaml .

注意其中的servers 属性。我的意思是,如果您在 Java 类中将其称为 files,那么属性文件中的它也必须是 files

更新

这对我来说是这样的:

我的application.properties

my.source.dirs=/tmp/in1,/tmp/in2

应用程序是这样的:

@SpringBootApplication
public class So49168720Application {

    public static void main(String[] args) throws IOException {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(So49168720Application.class, args);

        File file1 = new File("/tmp/in1", "foo.txt");
        file1.createNewFile();
        FileCopyUtils.copy("FOO".getBytes(), file1);

        File file2 = new File("/tmp/in2", "bar.txt");
        file2.createNewFile();
        FileCopyUtils.copy("BAR".getBytes(), file2);


        PollableChannel resultChannel = applicationContext.getBean("resultChannel", PollableChannel.class);

        System.out.println(resultChannel.receive(10000));
        System.out.println(resultChannel.receive(10000));

        file1.delete();
        file2.delete();
    }

    @Value("${my.source.dirs}")
    private String[] sourceDirs;

    @Autowired
    private IntegrationFlowContext flowContext;


    @PostConstruct
    private void registerFilePollingFlows() {
        Arrays.asList(this.sourceDirs).forEach(inboundSource -> {
            IntegrationFlow flow =
                    IntegrationFlows
                            .from(Files.inboundAdapter(new File(inboundSource))
                                    .patternFilter("*.txt"))
                            .log(LoggingHandler.Level.INFO, getClass().getName(),
                                    "'Read inbound file: ' .concat(payload)")
                            .transform(Files.toStringTransformer())
                            .channel(resultChannel())
                            .get();

            this.flowContext.registration(flow).register();
        });
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerSpec defaultPoller() {
        return Pollers.fixedDelay(1000);
    }

    @Bean
    public PollableChannel resultChannel() {
        return new QueueChannel();
    }

}

我在日志中有这些消息:

2018-03-13 17:43:05.148  INFO 19676 --- [ask-scheduler-3] ication$$EnhancerBySpringCGLIB$$4fda2b12 : Read inbound file: \tmp\in2\bar.txt
2018-03-13 17:43:05.148  INFO 19676 --- [ask-scheduler-2] ication$$EnhancerBySpringCGLIB$$4fda2b12 : Read inbound file: \tmp\in1\foo.txt
GenericMessage [payload=BAR, headers={file_originalFile=\tmp\in2\bar.txt, id=4a692a68-3871-b708-a28e-c4dc378de7e5, file_name=bar.txt, file_relativePath=bar.txt, timestamp=1520977385150}]
GenericMessage [payload=FOO, headers={file_originalFile=\tmp\in1\foo.txt, id=32597359-6602-3df6-5f6f-dac2f4ad788f, file_name=foo.txt, file_relativePath=foo.txt, timestamp=1520977385150}]

但是,这已经基于 Spring Integration 5.0 和 Spring Boot 2.0。有什么理由不升级您的项目?

关于java - 是否可以使用 BeanFactory 从不同的属性值创建集成流 bean?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49168720/

相关文章:

java - 是否可以从 Grails 应用程序引用 Java 应用程序?

apache-kafka - Spring 集成和 Kafka : How to filter messages based on message header

java - 如何找到在selenium web驱动程序java中显式等待条件为真所花费的时间

java - ActiveMQ 与 Apache Karaf 2.3.1

java - 如何在向 Business Profile Performance API 发送请求时为 setDailyMetric 方法设置多个指标?

子项中的 Spring Data JPA 外键不持久

java - 在现有行之间添加 2 行

java - 从数据库加载 Spring PropertyPlaceholderConfigurer

java - Spring集成异步流程

java - 建议 bean 中的 Spring Integration "onSuccessExpression"语法