java - 使用执行器服务等待守护线程完成迭代

标签 java multithreading synchronized executorservice atomicinteger

我必须并行化现有的后台任务,这样它就不会连续消耗“x”资源,而是仅使用“y”线程(y << x)并行完成手头的工作。该任务不断在后台运行并不断处理一些资源。

代码结构如下:

class BaseBackground implements Runnable {
    @Override
    public void run() {
        int[] resources = findResources(...);

        for (int resource : resources) {
            processResource(resource);
        }

        stopProcessing();
     }

    public abstract void processResource(final int resource);
    public void void stopProcessing() {
         // Override by subclass as needed
    }
}

class ChildBackground extends BaseBackground {

    @Override
    public abstract void processResource(final int resource) {
        // does some work here
    }

    public void void stopProcessing() {
        // reset some counts and emit metrics
    }
}

我按以下方式修改了 ChildBackground:

class ChildBackground extends BaseBackground {

    private final BlockingQueue<Integer> resourcesToBeProcessed;

    public ChildBackground() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; ++i) {
             executorService.submit(new ResourceProcessor());
        }
    }

    @Override
    public abstract void processResource(final int resource) {
        resourcesToBeProcessed.add(resource);
    }

    public void void stopProcessing() {
        // reset some counts and emit metrics
    }

    public class ResourceProcessor implements Runnable {
        @Override
        public void run() {
            while (true) {
                int nextResource = resourcesToBeProcessed.take();
                // does some work
            }
        }
    }
}

我不会每次都创建和拆除 ExecutorService,因为垃圾收集在我的服务中是一个问题。不过,我不明白这会有多糟糕,因为我不会在每次迭代中生成超过 10 个线程。

我无法理解如何等待所有 ResourceProcessor 完成一次迭代的资源处理,以便我可以在 stopProcessing 中重置一些计数并发出指标。我考虑过以下选项:

1) executorService.awaitTermination(超时)。这不会真正起作用,因为它总是阻塞直到超时,因为 ResourceProcessor 线程永远不会真正完成其工作

2) 我可以在 findResources 之后找出资源数量,并将其提供给子类,并让每个 ResourceProcessor 增加处理的资源数量。在重置计数之前,我必须等待 stopProcessing 中处理完所有资源。我需要像 CountDownLatch 这样的东西,但它应该计数 UP 。这个选项中会有很多状态管理,我不是特别喜欢。

3)我可以更新public abstract void processResource(final int resources)以包含总资源计数,并让子进程等待,直到所有线程处理完总资源。在这种情况下也会有一些状态管理,但仅限于子类。

在这两种情况下,我都必须添加 wait() 和 notification() 逻辑,但我对我的方法没有信心。这就是我所拥有的:

class ChildBackground extends BaseBackground {

    private static final int UNSET_TOTAL_RESOURCES = -1;

    private final BlockingQueue<Integer> resourcesToBeProcessed;

    private int totalResources = UNSET_TOTAL_RESOURCES;
    private final AtomicInteger resourcesProcessed = new AtomicInteger(0);

    public ChildBackground() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; ++i) {
             executorService.submit(new ResourceProcessor());
        }
    }

    @Override
    public abstract void processResource(final int resource, final int totalResources) {
        if (this.totalResources == UNSET_TOTAL_RESOURCES) {
            this.totalResources = totalResources;
        } else {
            Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);
        }
        resourcesToBeProcessed.add(resource);
    }

    public void void stopProcessing() {
        try {
            waitForAllResourcesToBeProcessed();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        resourcesProcessed.set(0);
        totalResources = UNSET_TOTAL_RESOURCES;
        // reset some counts and emit metrics
    }

    private void incrementProcessedResources() {
        synchronized (resourcesProcessed) {
            resourcesProcessed.getAndIncrement();
            resourcesProcessed.notify();
        }
    }

    private void waitForAllResourcesToBeProcessed() throws InterruptedException {
        synchronized (resourcesProcessed) {
             while (resourcesProcessed.get() != totalResources) {
                resourcesProcessed.wait();
             }
        }
    }

    public class ResourceProcessor implements Runnable {
        @Override
        public void run() {
            while (true) {
                int nextResource = resourcesToBeProcessed.take();
                try {
                   // does some work
                } finally {
                   incrementProcessedResources();
                }
            }
        }
    }
}

我不确定使用AtomicInteger是否是正确的方法,如果是,我是否需要调用wait()和notify()。如果我不使用 wait() 和 notification() 我什至不必执行同步块(synchronized block)中的所有内容。

如果我应该简单地为每次迭代创建和关闭 ExecutorService,或者是否有我应该追求的第四种方法,请告诉我您对此方法的想法。

最佳答案

您的代码似乎不必要地复杂。当 ExecutorService 内已经有队列时,为什么还要拥有自己的队列? ?当我认为你可以让股票ExecutorService时,你必须做一大堆管理工作。为您处理。

我将您的工作定义为:

public static class ResourceProcessor implements Runnable {
   private final int resource;
   public ResourceProcessor(int resource) {
      this.resource = resource;
   }
   public void run() {
      try {
         // does some work
      } finally {
         // if this is still necessary then you should use a `Future` instead
         incrementProcessedResources();
      }
   }
}

然后您可以像这样提交它们:

ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < totalResources; ++i) {
     executorService.submit(new ResourceProcessor(i));
}
// shutdown the thread pool after the last submit
executorService.shutdown();

executorService.awaitTermination(timeout). This won't really work as it will always block until the timeout because the ResourceProcessor threads will never really finish their jobs

现在就可以了。

2) I can find out the number of resources [have finished].

如果你可以调用awaitTermination(...),你还需要这个吗? ?

3) I could update the public abstract void processResource(final int resource) to include count of total resources and have the child process wait until all threads have processed total resources...

同样的问题。有这个必要吗?

如果您确实需要知道已处理请求的列表,那么您可以像 @ScaryWombat 提到的那样使用 Future<Integer>Callable<Integer>或使用 ExecutorCompletionService .

Futures aren't an option because the executor threads run within a tight loop that stops only when the service is deactivated.

你能解释一下吗?

希望这有帮助。

关于java - 使用执行器服务等待守护线程完成迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39443950/

相关文章:

c++ - 从多个线程访问 QTcpSocket

java - synchronized 关键字和实例方法上的锁

java - ArrayList 中的 ConcurrentModification 异常

java - 如何解决 "Unrecognized field"问题?

java - 多线程写入和多线程读取的 ConcurrentLinkedQueue 的并发问题。 #快速目录扫描

python - wxpython - 在不阻塞 GUI 的情况下按顺序运行线程

Java 同步方法 - OCPJP

javascript - 在 javascript 中使用 java 变量 - webDriver

java - 在java中使用私钥身份验证来保护FTP

java - SQLite数据库升级后崩溃