java - 具有唯一项和线程池的线程安全 FIFO 队列

标签 java multithreading

我必须管理系统中计划的文件复制。文件复制由用户安排,我需要限制复制期间使用的系统资源量。未定义每个复制可能花费的时间量(即,可以安排复制每 15 分钟运行一次,并且在下一次运行到期时,上一次运行可能仍在运行),如果复制已经排队,则不应对其进行排队或运行。

我有一个调度程序,它会定期检查到期的文件复制,并且对于每个文件复制,(1) 如果它没有排队也没有运行,则将其添加到阻塞队列中,或者 (2) 否则将其删除。

private final Object scheduledReplicationsLock = new Object();
private final BlockingQueue<Replication> replicationQueue = new LinkedBlockingQueue<>();
private final Set<Long> queuedReplicationIds = new HashSet<>();
private final Set<Long> runningReplicationIds = new HashSet<>();

public boolean add(Replication replication) {

    synchronized (scheduledReplicationsLock) {
        // If the replication job is either still executing or is already queued, do not add it.
        if (queuedReplicationIds.contains(replication.id) || runningReplicationIds.contains(replication.id)) {
            return false;
        }
        replicationQueue.add(replication)
        queuedReplicationIds.add(replication.id);
    }

我还有一个线程池,等待队列中出现复制并执行它。下面是线程池中各个线程的主要方法:

public void run() {
    while (True) {
        Replication replication = null;
        synchronized (scheduledReplicationsLock) {
            // This will block until a replication job is ready to be run or the current thread is interrupted.
            replication = replicationQueue.take();

            // Move the ID value out of the queued set and into the active set
            Long replicationId = replication.getId();
            queuedReplicationIds.remove(replicationId);
            runningReplicationIds.add(replicationId);
        }
        executeReplication(replication)
    }
} 

此代码陷入死锁,因为线程轮询中的第一个线程将获得 ScheduledLock 并阻止调度程序将复制添加到队列中。将replicationQueue.take()移出同步块(synchronized block)将消除死锁,但随后可能会从队列中删除一个元素,并且哈希集不会随之自动更新,这可能会导致复制被错误地删除。

如果队列为空,我应该使用 BlockingQueue.poll() 并释放锁 + sleep,而不是使用 BlockingQueue.take() 吗?

欢迎对当前解决方案进行修复或满足要求的其他解决方案。

最佳答案

等待/通知

保持相同的控制流,您可以等待 scheduledReplicationsLock 的通知,而不是在持有互斥锁的同时阻塞 BlockingQueue 实例> 强制工作线程释放锁并返回等待池。

这里是您的制作人的简化样本:

private final List<Replication> replicationQueue = new LinkedList<>();
private final Set<Long> runningReplicationIds = new HashSet<>();

public boolean add(Replication replication) {
    synchronized (replicationQueue) {
        // If the replication job is either still executing or is already queued, do not add it.
        if (replicationQueue.contains(replication) || runningReplicationIds.contains(replication.id)) {
            return false;
        } else {
            replicationQueue.add(replication);
            replicationQueue.notifyAll();
        }
    }
}

worker Runnable 然后将更新如下:

public void run() {
    synchronized (replicationQueue) {
        while (true) {
            if (replicationQueue.isEmpty()) {
                scheduledReplicationsLock.wait();
            }
            if (!replicationQueue.isEmpty()) {
                Replication replication = replicationQueue.poll();
                runningReplicationIds.add(replication.getId())
                executeReplication(replication);
            }
        }
    }
} 

阻塞队列

通常,您最好使用 BlockingQueue 来协调生产者和复制工作池。

顾名思义,BlockingQueue 本质上是阻塞的,并且仅当无法将项目从队列中拉出/推送到队列时才会导致调用线程阻塞。

同时,请注意,您必须更新正在运行/排队的状态管理,因为您将仅同步放弃任何约束的 BlockingQueue 项。这将取决于上下文,无论这是否可以接受。

这样,您将删除所有其他使用的互斥体并在 BlockingQueue 上使用作为同步状态:

private final BlockingQueue<Replication> replicationQueue = new LinkedBlockingQueue<>();

public boolean add(Replication replication) {
    // not sure if this is the proper invariant to check as at some point the replication would be neither queued nor running while still have been processed
    if (replicationQueue.contains(replication)) {
        return false;
    }
    // use `put` instead of `add` as this will block waiting for free space
    replicationQueue.put(replication);
    return true;
}

工作人员将无限期地从 BlockingQueue获取:

public void run() {
    while (true) {
        Replication replication = replicationQueue.take();
        executeReplication(replication);
    }
} 

关于java - 具有唯一项和线程池的线程安全 FIFO 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69910391/

相关文章:

java - 想要更深入地学习编程

java - maven-dependency-plugin 认为 Spring boot 核心依赖项未使用

java - 本地日期仅获取周日期Java

java - WEB应用中的缓存线程池性能

multithreading - 进程和线程有什么区别?

c - 在运行其他任务时接受网络连接的方法?

c# - 可以使用以下 'thread safe double checked lazy initilalization' 模式吗?

java - 在另一个ArrayList中查找JButton源

java - 字符的字符串限制

c++ - 有没有比下面更好的方法来使用C++并发?