java - Spring批量集成

标签 java spring spring-batch polling

我正在寻找 Spring 批量集成的指导/解决方案。我有一个外部应用程序将向其发送 xml 文件的目录。我的应用程序应该读取文件内容并将文件移动到另一个目录。

应用程序应该能够并行处理文件。

提前致谢。

最佳答案

您可以将 Spring Integration ftp/sftp 与 Spring Batch 结合使用:

1.Spring集成Ftp配置:

<bean id="ftpClientFactory"
    class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
    <property name="host" value="${host.name}" />
    <property name="port" value="${host.port}" />
    <property name="username" value="${host.username}" />
    <property name="password" value="${host.password}" />
    <property name="bufferSize" value="100000"/>
</bean>
<int:channel id="ftpChannel" />
<int-ftp:outbound-channel-adapter id="ftpOutbound"
    channel="ftpChannel" remote-directory="/yourremotedirectory/" session-factory="ftpClientFactory" use-temporary-file-name="false" />

2.创建您的阅读器并 Autowiring 服务以在需要时提供您的项目:

 @Scope("step")
 public class MajorItemReader implements InitializingBean{

        private List<YourItem> yourItems= null;

        @Autowired
        private MyService provider;


        public YourItem read() {
            if ((yourItems!= null) && (yourItems.size() != 0)) {
                return yourItems.remove(0);
            }
            return null;
        }

        //Reading Items from Service
        private void reloadItems() {

        this.yourItems= new ArrayList<YourItem>();
        // use the service to provide your Items
       if (yourItems.isEmpty()) {
                yourItems= null;
            }
        }
        public MyService getProvider() {
            return provider;
        }
        public void setProvider(MyService provider) {
            this.provider = provider;
        }
        @Override
        public void afterPropertiesSet() throws Exception {
            reloadItems();
        }
}

3.创建您自己的项目处理器

     public class MyProcessor implements
    ItemProcessor<YourItem, YourItem> {
    @Override
    public YourItem process(YourItem arg0) throws Exception {
    // Apply any logic to your Item before transferring it to the writer
    return arg0;
    }
    }

4.创建您自己的作家:

   public class MyWriter{
   @Autowired
   @Qualifier("ftpChannel")
   private MessageChannel messageChannel;
   public void write(YourItem pack) throws IOException {
   //create your file and from your Item 
   File file = new File("the_created_file");
   // Sending the file via Spring Integration Ftp Channel
   Message<File> message = MessageBuilder.withPayload(file).build();
   messageChannel.send(message);
   }

5.批量配置:

<bean id="dataSourcee"
    class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="" />
    <property name="url" value="" />
    <property name="username" value="" />
    <property name="password" value="" />
</bean>
<bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <property name="dataSource" ref="dataSourcee" />
    <property name="transactionManager" ref="transactionManagerrr" />
    <property name="databaseType" value="" />
</bean>
<bean id="transactionManagerrr"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher"
    class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>

6.另一个用于配置您的作业的 ApplicationContext 文件:

<context:annotation-config />
<bean id="provider" class="mypackage.MyService" />
<context:component-scan base-package="mypackage" />
<bean id="myReader" class="mypackage.MyReader"
    <property name="provider" ref="provider" />
 </bean>
<bean id="myWriter" class="mypackage.MyWriter" />
<bean id="myProcessor" class="mypackage.MyProcessor" />
   <bean id="mReader"
    class="org.springframework.batch.item.adapter.ItemReaderAdapter">
    <property name="targetObject" ref="myReader" />
    <property name="targetMethod" value="read" />
</bean>
<bean id="mProcessor"
    class="org.springframework.batch.item.adapter.ItemProcessorAdapter">
    <property name="targetObject" ref="myProcessor" />
    <property name="targetMethod" value="process" />
</bean>
<bean id="mWriter"
    class="org.springframework.batch.item.adapter.ItemWriterAdapter">
    <property name="targetObject" ref="myWriter" />
    <property name="targetMethod" value="write" />
</bean>
<batch:job id="myJob">
    <batch:step id="step01">
        <batch:tasklet>
            <batch:chunk reader="mReader" writer="mWriter"
                processor="mProcessor" commit-interval="1">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>
<bean id="myRunScheduler" class="mypackage.MyJobLauncher" />
<task:scheduled-tasks>
    <task:scheduled ref="myJobLauncher" method="run"
        cron="0 0/5 * * * ?" />
    <!-- this will maker the job runs every 5 minutes -->
</task:scheduled-tasks>

7.最后配置启动器来启动您的作业:

public class MyJobLauncher {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("myJob")
private Job job;
public void run() {
    try {
        String dateParam = new Date().toString();
        JobParameters param = new JobParametersBuilder().addString("date",
                dateParam).toJobParameters();
        JobExecution execution = jobLauncher.run(job, param);
        execution.stop();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

关于java - Spring批量集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23723449/

相关文章:

java - 多个appender log4j 性能

java - 指定为 1 时 Derby 自动递增 100

java - 创建作业实例时出现死锁

java - 跨多个服务器执行单个作业

java - 读取 CSV 中的换行符,这些换行符在 spring 批处理的 FlatfileItemReader 中的文件中引用

java - Google Maps API V2 如何获取位置?

java - 无法为 applicationContext.xml 中的给定键提供 HashMap 作为 "value"的问题

java - Java编译错误: Exception in thread “main” java.lang.VerifyError:

java - 如何授予 Java Web 应用程序对 Digital Ocean Droplet 的访问权限

java.sql.SQLException : Access denied for user 'root' @'localhost' . 无法创建与数据库服务器的连接