我必须管理系统中计划的文件复制。文件复制由用户安排,我需要限制复制期间使用的系统资源量。未定义每个复制可能花费的时间量(即,可以安排复制每 15 分钟运行一次,并且在下一次运行到期时,上一次运行可能仍在运行),如果复制已经排队,则不应对其进行排队或运行。
我有一个调度程序,它会定期检查到期的文件复制,并且对于每个文件复制,(1) 如果它没有排队也没有运行,则将其添加到阻塞队列中,或者 (2) 否则将其删除。
private final Object scheduledReplicationsLock = new Object();
private final BlockingQueue<Replication> replicationQueue = new LinkedBlockingQueue<>();
private final Set<Long> queuedReplicationIds = new HashSet<>();
private final Set<Long> runningReplicationIds = new HashSet<>();
public boolean add(Replication replication) {
synchronized (scheduledReplicationsLock) {
// If the replication job is either still executing or is already queued, do not add it.
if (queuedReplicationIds.contains(replication.id) || runningReplicationIds.contains(replication.id)) {
return false;
}
replicationQueue.add(replication)
queuedReplicationIds.add(replication.id);
}
我还有一个线程池,等待队列中出现复制并执行它。下面是线程池中各个线程的主要方法:
public void run() {
while (True) {
Replication replication = null;
synchronized (scheduledReplicationsLock) {
// This will block until a replication job is ready to be run or the current thread is interrupted.
replication = replicationQueue.take();
// Move the ID value out of the queued set and into the active set
Long replicationId = replication.getId();
queuedReplicationIds.remove(replicationId);
runningReplicationIds.add(replicationId);
}
executeReplication(replication)
}
}
此代码陷入死锁,因为线程轮询中的第一个线程将获得 ScheduledLock 并阻止调度程序将复制添加到队列中。将replicationQueue.take()移出同步块(synchronized block)将消除死锁,但随后可能会从队列中删除一个元素,并且哈希集不会随之自动更新,这可能会导致复制被错误地删除。
如果队列为空,我应该使用 BlockingQueue.poll() 并释放锁 + sleep,而不是使用 BlockingQueue.take() 吗?
欢迎对当前解决方案进行修复或满足要求的其他解决方案。
最佳答案
等待/通知
保持相同的控制流,您可以等待
scheduledReplicationsLock
的通知,而不是在持有互斥锁的同时阻塞 BlockingQueue
实例> 强制工作线程释放锁并返回等待池。
这里是您的制作人的简化样本:
private final List<Replication> replicationQueue = new LinkedList<>();
private final Set<Long> runningReplicationIds = new HashSet<>();
public boolean add(Replication replication) {
synchronized (replicationQueue) {
// If the replication job is either still executing or is already queued, do not add it.
if (replicationQueue.contains(replication) || runningReplicationIds.contains(replication.id)) {
return false;
} else {
replicationQueue.add(replication);
replicationQueue.notifyAll();
}
}
}
worker Runnable
然后将更新如下:
public void run() {
synchronized (replicationQueue) {
while (true) {
if (replicationQueue.isEmpty()) {
scheduledReplicationsLock.wait();
}
if (!replicationQueue.isEmpty()) {
Replication replication = replicationQueue.poll();
runningReplicationIds.add(replication.getId())
executeReplication(replication);
}
}
}
}
阻塞队列
通常,您最好使用 BlockingQueue
来协调生产者和复制工作池。
顾名思义,BlockingQueue
本质上是阻塞的,并且仅当无法将项目从队列中拉出/推送到队列时才会导致调用线程阻塞。
同时,请注意,您必须更新正在运行/排队的状态管理,因为您将仅同步放弃任何约束的 BlockingQueue
项。这将取决于上下文,无论这是否可以接受。
这样,您将删除所有其他使用的互斥体并在 BlockingQueue
上使用作为同步状态:
private final BlockingQueue<Replication> replicationQueue = new LinkedBlockingQueue<>();
public boolean add(Replication replication) {
// not sure if this is the proper invariant to check as at some point the replication would be neither queued nor running while still have been processed
if (replicationQueue.contains(replication)) {
return false;
}
// use `put` instead of `add` as this will block waiting for free space
replicationQueue.put(replication);
return true;
}
工作人员将无限期地从 BlockingQueue
中获取
:
public void run() {
while (true) {
Replication replication = replicationQueue.take();
executeReplication(replication);
}
}
关于java - 具有唯一项和线程池的线程安全 FIFO 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69910391/