java - 手动触发一个@Scheduled 方法

标签 java multithreading spring synchronization queue

我需要以下方面的建议:

我有一个@Scheduled 服务方法,它有几秒钟的固定延迟,在该方法中它会扫描工作队列并在发现任何工作时处理适当的工作。在同一个服务中,我有一个将工作放入工作队列的方法,我希望这种方法在完成后立即触发队列扫描(因为我确信现在扫描器会有一些工作要做)为了避免在计划开始之前出现延迟(因为这可能是几秒钟,而且时间有点关键)。

Task Execution and Scheduling 子系统的“立即触发”功能将是理想的,它也将在手动启动执行后重置 fixedDelay(因为我不希望我的手动执行与计划的执行冲突)。注意:队列中的工作可能来自外部源,因此需要进行定期扫描。

欢迎任何建议

编辑: 队列存储在基于文档的数据库中,因此基于本地队列的解决方案不合适。

我不太满意的解决方案(不太喜欢使用原始线程)会是这样的:

@Service
public class MyProcessingService implements ProcessingService {

    Thread worker;

    @PostCreate
    public void init() {
        worker = new Thread() {
            boolean ready = false;

            private boolean sleep() {
                synchronized(this) {
                    if (ready) {
                        ready = false;
                    } else {
                        try {
                            wait(2000);
                        } catch(InterruptedException) {
                            return false;
                        }
                    }
                }

                return true;
            }

            public void tickle() {
                synchronized(this) {
                    ready = true;
                    notify();
                }
            }

            public void run() {
                while(!interrupted()) {
                    if(!sleep()) continue;

                    scan();
                }
            }
        }

        worker.start();
    }

    @PreDestroy
    public void uninit() {
        worker.interrup();
    }

    public void addWork(Work work) {
        db.store(work);

        worker.tickle();
    }

    public void scan() {
        List<Work> work = db.getMyWork();

        for (Work w : work) {
            process();
        }
    }

    public void process(Work work) {
        // work processing here
    }

}

最佳答案

因为如果工作队列中没有任何项目,那么 @Scheduled 方法将没有任何工作要做,也就是说,如果没有人在执行周期之间将任何工作放入队列中.同样,如果某些工作项在计划执行完成后立即插入工作队列(可能由外部源),则直到下一次执行时才会处理该工作。

在这种情况下,您需要的是消费者-生产者队列。一个队列,其中一个或多个生产者放入工作项,消费者从队列中取出项目并处理它们。你想要的是 BlockingQueue .它们可用于以线程安全的方式解决消费者-生产者问题。

你可以有一个Runnable执行当前 @Scheduled 方法执行的任务。

public class SomeClass {
        private final BlockingQueue<Work> workQueue = new LinkedBlockingQueue<Work>();

    public BlockingQueue<Work> getWorkQueue() {
        return workQueue;
    }

    private final class WorkExecutor implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    // The call to take() retrieves and removes the head of this
                    // queue,
                    // waiting if necessary until an element becomes available.
                    Work work = workQueue.take();
                    // do processing
                } catch (InterruptedException e) {
                    continue;
                }
            }
        }
    }

    // The work-producer may be anything, even a @Scheduled method
    @Scheduled
    public void createWork() {
        Work work = new Work();
        workQueue.offer(work);
    }

}

一些其他的 Runnable 或其他类可能会放入以下项目:

public class WorkCreator {

    @Autowired 
    private SomeClass workerClass;

    @Override
    public void run() {
        // produce work
        Work work = new Work();
        workerClass.getWorkQueue().offer(work);
    }

}

我想这是解决您手头问题的正确方法。您可以拥有多种变体/配置,只需查看 java.util.concurrent包裹。

问题编辑后更新

即使外部源是db,仍然是生产者-消费者问题。每当你在数据库中存储数据时,你都可以调用 scan() 方法,并且 scan() 方法可以将从数据库中检索到的数据放入 阻塞队列

解决有关重置 fixedDelay

的实际问题

这实际上是不可能的,使用 Java 或使用 Spring 是不可能的,除非你自己处理调度部分。也没有 trigger-now 功能。如果您有权访问正在执行任务的 Runnable,您可能可以自己调用 run() 方法。但这与您自己从任何地方调用处理方法是一样的,您实际上并不需要 Runnable

另一种可能的解决方法

private Lock queueLock = new ReentrantLock();


@Scheduled
public void findNewWorkAndProcess() {
    if(!queueLock.tryLock()) {
        return;
    }
    try {
        doWork();
    } finally {
        queueLock.unlock();
    }
}

void doWork() {
    List<Work> work = getWorkFromDb();
    // process work
}

// To be called when new data is inserted into the db.
public void newDataInserted() {
    queueLock.lock();
    try {
        doWork();
    } finally {
        queueLock.unlock();
    }
}

newDataInserted() 在您插入任何新数据时被调用。如果预定的执行正在进行中,它将等到它完成后再做工作。此处对 lock() 的调用是阻塞的,因为我们知道数据库中有一些工作,并且在插入工作之前可能已经调用了计划调用。在 findNewWorkAndProcess() 中以非阻塞方式获取锁的调用,如果锁已被 newDataInserted 方法获取,则意味着计划的方法不应该被执行。

好吧,你可以随意微调。

关于java - 手动触发一个@Scheduled 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16719509/

相关文章:

java - Spring MVC 中的条件@RequestBody

java - JFrame 关闭对话框

java - View.getView() 返回 null

linux - mmap 是原子的吗?

python - thread.start_new_thread 与 threading.Thread.start

java - @Positive Annotation 仍然取负值

java - 如何在Eclipse中使用 "java -Dcom.sun.org.apache.xml.internal.security.ignoreLineBreaks=true"

java - 使用 jar 文件运行 Java 程序 (Netbeans/Maven) - 需要帮助

c++ - 我可以从第三个线程监控 boost::lockfree::spsc_queue 吗?

java - 使用矩阵参数创建 GET 请求