spring-integration - spring 集成轮询器与调度器

标签 spring-integration

我正在尝试使用 spring 集成设置一个简单的应用程序。目标是简单地使用文件入站 channel 适配器来监视新文件的目录并在添加文件时处理文件。为简单起见,目前处理文件只是记录一些输出(正在处理的文件名)。但是,我确实想以多线程方式处理文件。因此,假设选取了 10 个文件并应并行处理,一旦这些文件完成,我们将继续处理接下来的 10 个文件。

为此,我尝试了两种不同的方法,两者的工作方式似乎相似,我想了解使用轮询器或调度器之间的区别。

方法#1 - 使用轮询器

<int-file:inbound-channel-adapter id="filesIn" directory="in">
        <int:poller fixed-rate="1" task-executor="executor" />
</int-file:inbound-channel-adapter>

<int:service-activator ref="moveToStage" method="move" input-channel="filesIn" />

<task:executor id="executor" pool-size="5" queue-capacity="0" rejection-policy="DISCARD" />

因此,据我所知,这里的想法是我们不断轮询目录,一旦收到文件,它就会发送到 filesIn channel ,直到达到池限制。然后直到池被占用,即使我假设轮询仍在后台继续,也不会发送额外的文件。这似乎有效,但我不确定使用每次轮询的最大消息数是否有助于降低轮询频率。通过将每次轮询的最大消息数设置为接近池大小。

方法#2 - 使用调度程序
<int-file:inbound-channel-adapter id="filesIn" directory="in">
    <int:poller fixed-rate="5000" max-messages-per-poll="3" />
</int-file:inbound-channel-adapter>

<int:bridge input-channel="filesIn" output-channel="filesReady" />

<int:channel id="filesReady">
    <int:dispatcher task-executor="executor"/>
</int:channel>

<int:service-activator ref="moveToStage" method="move" input-channel="filesInReady" />

<task:executor id="executor" pool-size="5" queue-capacity="0" rejection-policy="CALLER_RUNS" />

好的,所以这里轮询器没有使用执行器,所以我假设它以顺序方式进行轮询。每个轮询 3 文件都应该被拾取,然后发送到 filesReady channel ,然后该 channel 使用调度程序将文件传递给服务激活器,因为它使用调度程序的执行器,它立即返回控制并允许 filesIn channel 发送更多文件。

我想我的问题是我是否正确理解了这两种方法,以及一种方法是否优于另一种方法。

谢谢

最佳答案

是的,你的理解是正确的。

一般来说,我会说每毫秒轮询一次(并在队列已满时丢弃轮询)是一种资源(CPU 和 I/O)的浪费。

此外,在第一种情况下增加每个轮询的最大消息数也无济于事,因为轮询是在执行程序线程上完成的(调度程序将轮询交给执行程序,该线程将处理 mmpp )。

在第二种情况下,由于调度程序线程在轮询期间(而不是在轮询之前)移交,mmpp将按预期工作。

所以,一般来说,你的第二个实现是最好的(只要你能忍受新文件到达时平均 2.5 秒的延迟)。

关于spring-integration - spring 集成轮询器与调度器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21357605/

相关文章:

java - Spring 集成 : Persistent and transactional QueueChannel

java - 如何使用 JMS 和 Spring Integration 最好地实现请求/回复 (2.2)

java - MarshallingWebServiceInboundGateway 的问题

xmpp - Spring 集成 XMPP 和 Google Cloud Messaging

java - @Header 注释未按预期工作

java - Spring集成多个FTP主机(DefaultFtpSessionFactory)

java - 如何在多个线程中执行 Spring 集成流程以并行使用更多 Amazon SQS 队列消息?

spring-integration - Spring 集成 DSL Scatter-Gather 多个收件人流的异步/并行执行

java - Spring集成FileTailingMessageProducer : Remember current line when restarting

java - Pop3MailReciever 不删除邮件