我必须并行化现有的后台任务,这样它就不会连续消耗“x”资源,而是仅使用“y”线程(y << x)并行完成手头的工作。该任务不断在后台运行并不断处理一些资源。
代码结构如下:
class BaseBackground implements Runnable {
@Override
public void run() {
int[] resources = findResources(...);
for (int resource : resources) {
processResource(resource);
}
stopProcessing();
}
public abstract void processResource(final int resource);
public void void stopProcessing() {
// Override by subclass as needed
}
}
class ChildBackground extends BaseBackground {
@Override
public abstract void processResource(final int resource) {
// does some work here
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
}
我按以下方式修改了 ChildBackground
:
class ChildBackground extends BaseBackground {
private final BlockingQueue<Integer> resourcesToBeProcessed;
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource) {
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
// does some work
}
}
}
}
我不会每次都创建和拆除 ExecutorService,因为垃圾收集在我的服务中是一个问题。不过,我不明白这会有多糟糕,因为我不会在每次迭代中生成超过 10 个线程。
我无法理解如何等待所有 ResourceProcessor
完成一次迭代的资源处理,以便我可以在 stopProcessing
中重置一些计数并发出指标。我考虑过以下选项:
1) executorService.awaitTermination(超时)。这不会真正起作用,因为它总是阻塞直到超时,因为 ResourceProcessor
线程永远不会真正完成其工作
2) 我可以在 findResources
之后找出资源数量,并将其提供给子类,并让每个 ResourceProcessor
增加处理的资源数量。在重置计数之前,我必须等待 stopProcessing
中处理完所有资源。我需要像 CountDownLatch 这样的东西,但它应该计数 UP
。这个选项中会有很多状态管理,我不是特别喜欢。
3)我可以更新public abstract void processResource(final int resources)
以包含总资源计数,并让子进程等待,直到所有线程处理完总资源。在这种情况下也会有一些状态管理,但仅限于子类。
在这两种情况下,我都必须添加 wait() 和 notification() 逻辑,但我对我的方法没有信心。这就是我所拥有的:
class ChildBackground extends BaseBackground {
private static final int UNSET_TOTAL_RESOURCES = -1;
private final BlockingQueue<Integer> resourcesToBeProcessed;
private int totalResources = UNSET_TOTAL_RESOURCES;
private final AtomicInteger resourcesProcessed = new AtomicInteger(0);
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource, final int totalResources) {
if (this.totalResources == UNSET_TOTAL_RESOURCES) {
this.totalResources = totalResources;
} else {
Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);
}
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
try {
waitForAllResourcesToBeProcessed();
} catch (InterruptedException e) {
e.printStackTrace();
}
resourcesProcessed.set(0);
totalResources = UNSET_TOTAL_RESOURCES;
// reset some counts and emit metrics
}
private void incrementProcessedResources() {
synchronized (resourcesProcessed) {
resourcesProcessed.getAndIncrement();
resourcesProcessed.notify();
}
}
private void waitForAllResourcesToBeProcessed() throws InterruptedException {
synchronized (resourcesProcessed) {
while (resourcesProcessed.get() != totalResources) {
resourcesProcessed.wait();
}
}
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
try {
// does some work
} finally {
incrementProcessedResources();
}
}
}
}
}
我不确定使用AtomicInteger
是否是正确的方法,如果是,我是否需要调用wait()和notify()。如果我不使用 wait() 和 notification() 我什至不必执行同步块(synchronized block)中的所有内容。
如果我应该简单地为每次迭代创建和关闭 ExecutorService,或者是否有我应该追求的第四种方法,请告诉我您对此方法的想法。
最佳答案
您的代码似乎不必要地复杂。当 ExecutorService
内已经有队列时,为什么还要拥有自己的队列? ?当我认为你可以让股票ExecutorService
时,你必须做一大堆管理工作。为您处理。
我将您的工作定义为:
public static class ResourceProcessor implements Runnable {
private final int resource;
public ResourceProcessor(int resource) {
this.resource = resource;
}
public void run() {
try {
// does some work
} finally {
// if this is still necessary then you should use a `Future` instead
incrementProcessedResources();
}
}
}
然后您可以像这样提交它们:
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < totalResources; ++i) {
executorService.submit(new ResourceProcessor(i));
}
// shutdown the thread pool after the last submit
executorService.shutdown();
executorService.awaitTermination(timeout)
. This won't really work as it will always block until the timeout because the ResourceProcessor threads will never really finish their jobs
现在就可以了。
2) I can find out the number of resources [have finished].
如果你可以调用awaitTermination(...)
,你还需要这个吗? ?
3) I could update the public abstract void processResource(final int resource) to include count of total resources and have the child process wait until all threads have processed total resources...
同样的问题。有这个必要吗?
如果您确实需要知道已处理请求的列表,那么您可以像 @ScaryWombat 提到的那样使用 Future<Integer>
和Callable<Integer>
或使用 ExecutorCompletionService
.
Futures aren't an option because the executor threads run within a tight loop that stops only when the service is deactivated.
你能解释一下吗?
希望这有帮助。
关于java - 使用执行器服务等待守护线程完成迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39443950/