我有以下类(class):
工作任务.java
public interface WorkerTask extends Task {
// Constants
public static final short WORKERTASK_SPIDER = 1;
public static final short WORKERTASK_PARSER = 2;
public static final short WORKERTASK_PRODUCT = 3;
public int getType();
}
WorkerPool.java
class workerPool {
private ThreadPoolExecutor executorPool_;
//----------------------------------------------------
public WorkerPool(int poolSize)
{
executorPool_ = new ThreadPoolExecutor(
poolSize,5,10,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10000000,false),
Executors.defaultThreadFactory()
);
//----------------------------------------------------
public void assign(WorkerTask workerTask) {
executorPool_.execute(new WorkerThread(workerTask));
}
//----------------------------------------------------
public void removeTasks(int siteID) {
executorPool_.getQueue().removeIf(...);
}
}
我想调用 removeTasks 方法来删除一定数量的待处理任务,但我不知道如何使用 removeIf 方法。它说:删除此集合中满足给定谓词的所有元素,但我不知道如何创建参数谓词。有什么想法吗?
最佳答案
如果你有一个 Queue<WorkerTask>
,你可以这样做:
queue.removeIf(task -> task.getSiteID() == siteID)
有几个问题。一个问题是您从 getQueue()
获得的队列是BlockingQueue<Runnable>
而不是 Queue<WorkerTask>
.如果您提交 Runnable
池中的实例,队列可能包含对您实际任务的引用;如果是这样,您可以将它们向下转换为 WorkerTask
.然而,这并不能保证。此外,ThreadPoolExecutor 的类文档说(在“队列维护”下):
Method
getQueue()
allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged. Two supplied methods,remove(Runnable)
andpurge()
are available to assist in storage reclamation when large numbers of queued tasks become cancelled.
查看 remove(Runnable)
方法,它的文档说
It may fail to remove tasks that have been converted into other forms before being placed on the internal queue.
这表明您应该卡在 Runnable
上已提交以调用 remove()
的实例稍后在他们身上。或者,调用 submit(Runnable)
得到 Future
并保存这些实例以取消它们。
但是还有第二个问题可能会导致这种方法不够充分。假设您找到了一种从队列中删除或取消匹配任务的方法。另一个线程可能已决定提交匹配的新任务,但尚未提交。这里有一个竞争条件。您也许可以取消排队的任务,但在您这样做之后,您不能保证没有提交新的匹配任务。
这是另一种方法。据推测,当您取消(或其他)站点 ID 时,某处有一些逻辑可以停止提交与该端 ID 匹配的新任务。问题是如何处理“进行中”的匹配任务,即在队列中或即将入队的任务。
与其尝试取消匹配的任务,不如更改任务,以便如果其站点 ID 已被取消,则该任务变为空操作。您可以在 ConcurrentHashMap
中记录站点 ID 的取消。 .任何任务都会在开始工作之前检查这张 map ,如果站点 ID 存在,它就会简单地返回。向 map 添加一个站点 ID 会立即生效,确保不会在该站点 ID 上开始任何新任务。 (已经开始的任务将运行到完成。)任何正在进行的任务最终都会从队列中耗尽,而不会导致任何实际工作发生。
关于java - 如何在 ArrayBlockingQueue 中使用谓词来使用方法 "removeIf",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34720304/