java - 使用 Spring Batch 和 Spring Cloud Data Flow 构建文件轮询/摄取任务

标签 java spring spring-batch spring-cloud-stream spring-cloud-dataflow

我们计划创建一个新的处理机制,它包括监听一些目录例如:/opt/dir1、/opt/dirN,并且对于在这些目录中创建的每个文档,启动一个例程处理,将其注册表持久保存在数据库中(通过对现有 CRUD API 的 REST 调用)并生成一个协议(protocol)文件到另一个目录。

出于测试目的,我没有使用任何现代(甚至体面的)框架/方法,只是一个带有 WatchService 实现的常规 SpringBoot 应用程序,它监听这些目录并在创建文件后立即轮询要处理的文件。它有效,但很明显,当我转向生产并开始接收数十个要并行处理的文件时,我肯定会在某个时候对性能产生一些影响,这在我的示例中不是现实。

经过一些研究和几位同事的一些提示后,我发现 Spring Batch + Spring Cloud Data Flow 是满足我需求的最佳组合。但是,我以前从未处理过 Batch 或 Data Flow,我有点困惑我应该构建这些 block 的内容和方式,以便以最简单和高效的方式运行此例程。我有几个关于它的附加值和架构的问题,非常希望听到您的想法!

  • 我设法创建并运行了一个基于样本批处理文件摄取任务的 on this section of Spring Docs .每次在目录中创建文件时如何启动任务?我需要流吗?

  • 如果我这样做,我如何创建一个流应用程序以编程方式为每个将其路径作为参数传递的新文件启动我的任务?我应该为此目的使用 RabbitMQ 吗?

  • 如何为我的任务保留一些外部变量例如目录路径?我可以让这些流和任务在 jar 之外的其他地方读取 application.yml 吗?

  • 为什么我应该将 Spring Cloud Data Flow 与 Spring Batch 一起使用,而不仅仅是一个批处理应用程序?仅仅因为它跨越了每个文件的并行任务,还是我得到了任何其他好处?

  • 纯粹谈论性能,如果您只考虑顺序处理场景(我每小时大约只收到 1 个文件),这个解决方案与我的 WatchService + 普通处理实现相比如何?

此外,如果你们有任何关于如何以编程方式启动任务的指南或示例,我将非常感谢!我仍在寻找它,但似乎我做的不对。

感谢您的关注,非常感谢任何意见!

更新

我设法通过 SCDF REST API 启动了我的任务所以我可以使用 WatchService 保留我原来的 SpringBoot 应用程序,通过 Feign 或 XXX 启动新任务。我仍然知道这远不是我应该在这里做的。经过更多研究后,我认为使用文件源和接收器创建流将是我的方式,除非有人有任何其他意见,但我无法将入站 channel 适配器设置为从多个目录轮询,我不能多个流,因为这个平台应该扩展到我们有成千上万的参与者(或从中轮询文件的目录)。

最佳答案

这里有一些提示。

I managed to create and run a sample batch file ingest task based on this section of Spring Docs. How can I launch a task every time a file is created in a directory? Do I need a Stream for that?

如果您必须在上游事件(例如:新文件)发生时自动启动它,是的,您可以通过流来做到这一点(参见 example )。如果事件来自消息代理,您也可以直接在批处理作业中使用它们(例如:AmqpItemReader)。

If I do, How can I create a stream application that launches my task programmaticaly for each new file passing it's path as argument? Should I use RabbitMQ for this purpose?

希望上面的例子能说明问题。如果您想以编程方式启动任务(而不是通过 DSL/REST/UI),您可以使用新的 Java DSL 来实现。支持,这是在 1.3 中添加的。

How can I keep some variables externalized for my task e.g directories path? Can I have these streams and tasks read an application.yml somewhere else than inside it's jar?

推荐的方法是使用配置服务器。根据编排平台的不同,您必须向任务及其子任务(包括批处理作业)提供配置服务器凭据。在 Cloud Foundry 中,我们只需将 config-server 服务实例绑定(bind)到每个任务,并且在运行时将自动解析外部化属性。

Why should I use Spring Cloud Data Flow alongside Spring Batch and not only a batch application? Just because it spans parallel tasks for each file or do I get any other benefit?

作为 Spring Batch Admin 的替代品,SCDF 为任务/批处理作业提供监控和管理。错误时的执行、步骤、步骤进度和堆栈跟踪将被持久化,并可从仪表板进行探索。您也可以直接使用 SCDF 的 REST 端点来检查此信息。

Talking purely about performance, how would this solution compare to my WatchService + plain processing implementation if you think only about the sequential processing scenario, where I'd receive only 1 file per hour or so?

这是特定于实现的。我们没有任何基准可以分享。但是,如果需要性能,您可以探索 remote-partitioning Spring Batch 中的支持。您可以使用“n”个工作人员来划分摄取或数据处理任务,这样您就可以实现并行性。

关于java - 使用 Spring Batch 和 Spring Cloud Data Flow 构建文件轮询/摄取任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49246772/

相关文章:

java - 无法使用Spring Cloud配置服务器从配置文件中获取信息

facebook - Grails Spring Security Facebook插件重定向到错误页面

带有 Java 8 CompletableFuture 的 Spring TransactionInterceptor

java - StepExecutionListener 中的 Spring Batch 事务

java - 修改 Enunciate 生成的配置文件以使用 CXF 的最新 jackson 库

java - 无法使用 JSoup 解析 HTML

Java、String.matches区别

java - 有没有办法保持与静态 Web 服务的连接?

java - 对我来说是 spring-batch,即使我没有使用 itemReader 和 itemWriter?

java - 如何运行 spring-batch 作业线程池?