我需要以下方面的建议:
我有一个@Scheduled 服务方法,它有几秒钟的固定延迟,在该方法中它会扫描工作队列并在发现任何工作时处理适当的工作。在同一个服务中,我有一个将工作放入工作队列的方法,我希望这种方法在完成后立即触发队列扫描(因为我确信现在扫描器会有一些工作要做)为了避免在计划开始之前出现延迟(因为这可能是几秒钟,而且时间有点关键)。
Task Execution and Scheduling 子系统的“立即触发”功能将是理想的,它也将在手动启动执行后重置 fixedDelay(因为我不希望我的手动执行与计划的执行冲突)。注意:队列中的工作可能来自外部源,因此需要进行定期扫描。
欢迎任何建议
编辑: 队列存储在基于文档的数据库中,因此基于本地队列的解决方案不合适。
我不太满意的解决方案(不太喜欢使用原始线程)会是这样的:
@Service
public class MyProcessingService implements ProcessingService {
Thread worker;
@PostCreate
public void init() {
worker = new Thread() {
boolean ready = false;
private boolean sleep() {
synchronized(this) {
if (ready) {
ready = false;
} else {
try {
wait(2000);
} catch(InterruptedException) {
return false;
}
}
}
return true;
}
public void tickle() {
synchronized(this) {
ready = true;
notify();
}
}
public void run() {
while(!interrupted()) {
if(!sleep()) continue;
scan();
}
}
}
worker.start();
}
@PreDestroy
public void uninit() {
worker.interrup();
}
public void addWork(Work work) {
db.store(work);
worker.tickle();
}
public void scan() {
List<Work> work = db.getMyWork();
for (Work w : work) {
process();
}
}
public void process(Work work) {
// work processing here
}
}
最佳答案
因为如果工作队列中没有任何项目,那么 @Scheduled
方法将没有任何工作要做,也就是说,如果没有人在执行周期之间将任何工作放入队列中.同样,如果某些工作项在计划执行完成后立即插入工作队列(可能由外部源),则直到下一次执行时才会处理该工作。
在这种情况下,您需要的是消费者-生产者队列
。一个队列
,其中一个或多个生产者放入工作项,消费者从队列
中取出项目并处理它们。你想要的是 BlockingQueue .它们可用于以线程安全的方式解决消费者-生产者问题。
你可以有一个Runnable
执行当前 @Scheduled
方法执行的任务。
public class SomeClass {
private final BlockingQueue<Work> workQueue = new LinkedBlockingQueue<Work>();
public BlockingQueue<Work> getWorkQueue() {
return workQueue;
}
private final class WorkExecutor implements Runnable {
@Override
public void run() {
while (true) {
try {
// The call to take() retrieves and removes the head of this
// queue,
// waiting if necessary until an element becomes available.
Work work = workQueue.take();
// do processing
} catch (InterruptedException e) {
continue;
}
}
}
}
// The work-producer may be anything, even a @Scheduled method
@Scheduled
public void createWork() {
Work work = new Work();
workQueue.offer(work);
}
}
一些其他的 Runnable 或其他类可能会放入以下项目:
public class WorkCreator {
@Autowired
private SomeClass workerClass;
@Override
public void run() {
// produce work
Work work = new Work();
workerClass.getWorkQueue().offer(work);
}
}
我想这是解决您手头问题的正确方法。您可以拥有多种变体/配置,只需查看 java.util.concurrent
包裹。
问题编辑后更新
即使外部源是db,仍然是生产者-消费者问题。每当你在数据库中存储数据时,你都可以调用 scan()
方法,并且 scan()
方法可以将从数据库中检索到的数据放入 阻塞队列
。
解决有关重置 fixedDelay
这实际上是不可能的,使用 Java
或使用 Spring
是不可能的,除非你自己处理调度部分。也没有 trigger-now
功能。如果您有权访问正在执行任务的 Runnable
,您可能可以自己调用 run()
方法。但这与您自己从任何地方调用处理方法是一样的,您实际上并不需要 Runnable
。
另一种可能的解决方法
private Lock queueLock = new ReentrantLock();
@Scheduled
public void findNewWorkAndProcess() {
if(!queueLock.tryLock()) {
return;
}
try {
doWork();
} finally {
queueLock.unlock();
}
}
void doWork() {
List<Work> work = getWorkFromDb();
// process work
}
// To be called when new data is inserted into the db.
public void newDataInserted() {
queueLock.lock();
try {
doWork();
} finally {
queueLock.unlock();
}
}
newDataInserted()
在您插入任何新数据时被调用。如果预定的执行正在进行中,它将等到它完成后再做工作。此处对 lock()
的调用是阻塞的,因为我们知道数据库中有一些工作,并且在插入工作之前可能已经调用了计划调用。在 findNewWorkAndProcess()
中以非阻塞方式获取锁的调用,如果锁已被 newDataInserted
方法获取,则意味着计划的方法不应该被执行。
好吧,你可以随意微调。
关于java - 手动触发一个@Scheduled 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16719509/