java - 如何在 ArrayBlockingQueue 中使用谓词来使用方法 "removeIf"

标签 java collections predicate threadpoolexecutor remove-if

我有以下类(class):

工作任务.java

   public interface WorkerTask extends Task {

   // Constants
   public static final short WORKERTASK_SPIDER = 1;
   public static final short WORKERTASK_PARSER = 2;
   public static final short WORKERTASK_PRODUCT = 3;

   public int getType();
}

WorkerPool.java

class workerPool {

     private ThreadPoolExecutor executorPool_;

     //----------------------------------------------------  

     public WorkerPool(int poolSize) 
     {
        executorPool_ = new ThreadPoolExecutor(
           poolSize,5,10,TimeUnit.SECONDS,
           new ArrayBlockingQueue<Runnable>(10000000,false),
           Executors.defaultThreadFactory()
     );

     //----------------------------------------------------        

     public void assign(WorkerTask workerTask) {
         executorPool_.execute(new WorkerThread(workerTask));
     }

     //----------------------------------------------------  

     public void removeTasks(int siteID) {
        executorPool_.getQueue().removeIf(...);     
     }
}

我想调用 removeTasks 方法来删​​除一定数量的待处理任务,但我不知道如何使用 removeIf 方法。它说:删除此集合中满足给定谓词的所有元素,但我不知道如何创建参数谓词。有什么想法吗?

最佳答案

如果你有一个 Queue<WorkerTask> ,你可以这样做:

queue.removeIf(task -> task.getSiteID() == siteID)

有几个问题。一个问题是您从 getQueue() 获得的队列是BlockingQueue<Runnable>而不是 Queue<WorkerTask> .如果您提交 Runnable池中的实例,队列可能包含对您实际任务的引用;如果是这样,您可以将它们向下转换为 WorkerTask .然而,这并不能保证。此外,ThreadPoolExecutor 的类文档说(在“队列维护”下):

Method getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged. Two supplied methods, remove(Runnable) and purge() are available to assist in storage reclamation when large numbers of queued tasks become cancelled.

查看 remove(Runnable)方法,它的文档说

It may fail to remove tasks that have been converted into other forms before being placed on the internal queue.

这表明您应该卡在 Runnable 上已提交以调用 remove() 的实例稍后在他们身上。或者,调用 submit(Runnable)得到 Future并保存这些实例以取消它们。

但是还有第二个问题可能会导致这种方法不够充分。假设您找到了一种从队列中删除或取消匹配任务的方法。另一个线程可能已决定提交匹配的新任务,但尚未提交。这里有一个竞争条件。您也许可以取消排队的任务,但在您这样做之后,您不能保证没有提交新的匹配任务。

这是另一种方法。据推测,当您取消(或其他)站点 ID 时,某处有一些逻辑可以停止提交与该端 ID 匹配的新任务。问题是如何处理“进行中”的匹配任务,即在队列中或即将入队的任务。

与其尝试取消匹配的任务,不如更改任务,以便如果其站点 ID 已被取消,则该任务变为空操作。您可以在 ConcurrentHashMap 中记录站点 ID 的取消。 .任何任务都会在开始工作之前检查这张 map ,如果站点 ID 存在,它就会简单地返回。向 map 添加一个站点 ID 会立即生效,确保不会在该站点 ID 上开始任何新任务。 (已经开始的任务将运行到完成。)任何正在进行的任务最终都会从队列中耗尽,而不会导致任何实际工作发生。

关于java - 如何在 ArrayBlockingQueue 中使用谓词来使用方法 "removeIf",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34720304/

相关文章:

java - 为什么 Java 8 的 Predicate<T> 不扩展 Function<T, Boolean>

java - 如何使用java的linkedlist库打印插入链表的数据

java - 如何使 ICS 和 Gingerbread 中的选项卡看起来相同?

java - 使用 JDBC 在 Postgres 中使用串联列运行选择时出错

java - 无界通配符参数化类型数组的实际用法是什么?

wpf - ObservableCollection<T> 不更新 UI

Java - 具有 LinkedHashMap 属性的集合

jpa - JPA Criteria Query 中的复合谓词 - 使用和,或方法

c++ - is_permutation 的编译器错误和结构的谓词

java - android检查是否以编程方式启用了 "Allow sound"通知设置