java - 使用执行器服务等待守护线程完成迭代

标签 java multithreading synchronized executorservice atomicinteger

我必须并行化现有的后台任务,这样它就不会连续消耗“x”资源,而是仅使用“y”线程(y << x)并行完成手头的工作。该任务不断在后台运行并不断处理一些资源。


class BaseBackground implements Runnable {
    public void run() {
        int[] resources = findResources(...);

        for (int resource : resources) {


    public abstract void processResource(final int resource);
    public void void stopProcessing() {
         // Override by subclass as needed

class ChildBackground extends BaseBackground {

    public abstract void processResource(final int resource) {
        // does some work here

    public void void stopProcessing() {
        // reset some counts and emit metrics

我按以下方式修改了 ChildBackground:

class ChildBackground extends BaseBackground {

    private final BlockingQueue<Integer> resourcesToBeProcessed;

    public ChildBackground() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; ++i) {
             executorService.submit(new ResourceProcessor());

    public abstract void processResource(final int resource) {

    public void void stopProcessing() {
        // reset some counts and emit metrics

    public class ResourceProcessor implements Runnable {
        public void run() {
            while (true) {
                int nextResource = resourcesToBeProcessed.take();
                // does some work

我不会每次都创建和拆除 ExecutorService,因为垃圾收集在我的服务中是一个问题。不过,我不明白这会有多糟糕,因为我不会在每次迭代中生成超过 10 个线程。

我无法理解如何等待所有 ResourceProcessor 完成一次迭代的资源处理,以便我可以在 stopProcessing 中重置一些计数并发出指标。我考虑过以下选项:

1) executorService.awaitTermination(超时)。这不会真正起作用,因为它总是阻塞直到超时,因为 ResourceProcessor 线程永远不会真正完成其工作

2) 我可以在 findResources 之后找出资源数量,并将其提供给子类,并让每个 ResourceProcessor 增加处理的资源数量。在重置计数之前,我必须等待 stopProcessing 中处理完所有资源。我需要像 CountDownLatch 这样的东西,但它应该计数 UP 。这个选项中会有很多状态管理,我不是特别喜欢。

3)我可以更新public abstract void processResource(final int resources)以包含总资源计数,并让子进程等待,直到所有线程处理完总资源。在这种情况下也会有一些状态管理,但仅限于子类。

在这两种情况下,我都必须添加 wait() 和 notification() 逻辑,但我对我的方法没有信心。这就是我所拥有的:

class ChildBackground extends BaseBackground {

    private static final int UNSET_TOTAL_RESOURCES = -1;

    private final BlockingQueue<Integer> resourcesToBeProcessed;

    private int totalResources = UNSET_TOTAL_RESOURCES;
    private final AtomicInteger resourcesProcessed = new AtomicInteger(0);

    public ChildBackground() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; ++i) {
             executorService.submit(new ResourceProcessor());

    public abstract void processResource(final int resource, final int totalResources) {
        if (this.totalResources == UNSET_TOTAL_RESOURCES) {
            this.totalResources = totalResources;
        } else {
            Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);

    public void void stopProcessing() {
        try {
        } catch (InterruptedException e) {
        totalResources = UNSET_TOTAL_RESOURCES;
        // reset some counts and emit metrics

    private void incrementProcessedResources() {
        synchronized (resourcesProcessed) {

    private void waitForAllResourcesToBeProcessed() throws InterruptedException {
        synchronized (resourcesProcessed) {
             while (resourcesProcessed.get() != totalResources) {

    public class ResourceProcessor implements Runnable {
        public void run() {
            while (true) {
                int nextResource = resourcesToBeProcessed.take();
                try {
                   // does some work
                } finally {

我不确定使用AtomicInteger是否是正确的方法,如果是,我是否需要调用wait()和notify()。如果我不使用 wait() 和 notification() 我什至不必执行同步块(synchronized block)中的所有内容。

如果我应该简单地为每次迭代创建和关闭 ExecutorService,或者是否有我应该追求的第四种方法,请告诉我您对此方法的想法。


您的代码似乎不必要地复杂。当 ExecutorService 内已经有队列时,为什么还要拥有自己的队列? ?当我认为你可以让股票ExecutorService时,你必须做一大堆管理工作。为您处理。


public static class ResourceProcessor implements Runnable {
   private final int resource;
   public ResourceProcessor(int resource) {
      this.resource = resource;
   public void run() {
      try {
         // does some work
      } finally {
         // if this is still necessary then you should use a `Future` instead


ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < totalResources; ++i) {
     executorService.submit(new ResourceProcessor(i));
// shutdown the thread pool after the last submit

executorService.awaitTermination(timeout). This won't really work as it will always block until the timeout because the ResourceProcessor threads will never really finish their jobs


2) I can find out the number of resources [have finished].

如果你可以调用awaitTermination(...),你还需要这个吗? ?

3) I could update the public abstract void processResource(final int resource) to include count of total resources and have the child process wait until all threads have processed total resources...


如果您确实需要知道已处理请求的列表,那么您可以像 @ScaryWombat 提到的那样使用 Future<Integer>Callable<Integer>或使用 ExecutorCompletionService .

Futures aren't an option because the executor threads run within a tight loop that stops only when the service is deactivated.



关于java - 使用执行器服务等待守护线程完成迭代,我们在Stack Overflow上找到一个类似的问题:


c++ - 从多个线程访问 QTcpSocket

java - synchronized 关键字和实例方法上的锁

java - ArrayList 中的 ConcurrentModification 异常

java - 如何解决 "Unrecognized field"问题?

java - 多线程写入和多线程读取的 ConcurrentLinkedQueue 的并发问题。 #快速目录扫描

python - wxpython - 在不阻塞 GUI 的情况下按顺序运行线程

Java 同步方法 - OCPJP

javascript - 在 javascript 中使用 java 变量 - webDriver

java - 在java中使用私钥身份验证来保护FTP

java - SQLite数据库升级后崩溃