这是我的用例。
旧系统更新数据库队列表 QUEUE。
我想要一个定期的定期工作 - 检查 QUEUE 的内容 - 如果表中有行,它会锁定行并做一些工作 - 删除QUEUE中的行
如果前一个作业仍在运行,则将创建一个新线程来完成该工作。我要配置最大并发线程数。
我正在使用 Spring 3,我目前的解决方案是执行以下操作(使用 1 毫秒的固定速率让线程基本连续运行)
@Scheduled(fixedRate = 1)
@Async
public void doSchedule() throws InterruptedException {
log.debug("Start schedule");
publishWorker.start();
log.debug("End schedule");
}
<task:executor id="workerExecutor" pool-size="4" />
这直接创建了 4 个线程,并且线程正确地共享了队列中的工作负载。但是,当线程需要很长时间才能完成时,我似乎遇到了内存泄漏。
java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0 | 80 | 373,410,496 | 89.74%
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940 | 48 | 373,410,136 | 89.74%
| |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68
所以
1:我应该同时使用@Async 和@Scheduled 吗?
2:如果没有,那我还能如何使用 spring 来实现我的要求?
3:如何在其他线程忙的时候才创建新线程?
谢谢大家!
编辑:我认为工作队列变得无限长......现在使用
<task:executor id="workerExecutor"
pool-size="1-4"
queue-capacity="10" rejection-policy="DISCARD" />
将报告结果
最佳答案
你可以试试
- 运行一个延迟一秒的调度程序,这将锁定并获取所有 目前尚未锁定的 QUEUE 记录。
- 对于每条记录,调用一个 Async 方法,该方法将处理该记录并将其删除。
- 执行器的拒绝策略应该是 ABORT,以便调度器可以解锁尚未发出处理的 QUEUE。这样调度程序就可以在下次运行时再次尝试处理这些 QUEUE。
当然,您必须处理调度程序已锁定 QUEUE 的情况,但无论出于何种原因,处理程序都没有完成处理。
伪代码:
public class QueueScheduler {
@AutoWired
private QueueHandler queueHandler;
@Scheduled(fixedDelay = 1000)
public void doSchedule() throws InterruptedException {
log.debug("Start schedule");
List<Long> queueIds = lockAndFetchAllUnlockedQueues();
for (long id : queueIds)
queueHandler.process(id);
log.debug("End schedule");
}
}
public class QueueHandler {
@Async
public void process(long queueId) {
// process the QUEUE & delete it from DB
}
}
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10"
rejection-policy="ABORT"/>
关于spring - 一起使用 Spring @Scheduled 和 @Async,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14623092/