java - ThreadpoolExecutor 具有重试能力,并在任务失败多次后关闭

标签 java multithreading scala runnable threadpoolexecutor

我需要一个线程池执行器,它需要完成确切数量(相同)的任务。

它必须能够重新提交失败的任务 n 次。如果任何任务失败超过 n,则线程池应关闭并且不再继续处理任何其他任务。

我尝试结合在不同答案中找到的两种方法 - 一种通过重写 ThreadPoolExecutor.afterExecute 重新提交失败的任务,并子类化 CountDownLatch 以便等待锁存器的线程被中断并关闭执行器。

到目前为止,这是子类倒计时锁存器:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AbortableCountDownLatch extends CountDownLatch {
protected boolean aborted = false;

public AbortableCountDownLatch(int count) {
    super(count);
}

/**
 * Unblocks all threads waiting on this latch and cause them to receive an
 * AbortedException.  If the latch has already counted all the way down,
 * this method does nothing.
 */
public void abort() {
    if( getCount() == 0 )
        return;

    this.aborted = true;
    while(getCount() > 0)
        countDown();
}

@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    final boolean rtrn = super.await(timeout,unit);
    if (aborted)
        throw new AbortedException();
    return rtrn;
}

@Override
public void await() throws InterruptedException {
    super.await();
    if (aborted)
        throw new AbortedException();
}


public static class AbortedException extends InterruptedException {
    public AbortedException() {
    }

    public AbortedException(String detailMessage) {
        super(detailMessage);
    }
}
}

以及线程池执行器:

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

private static final int RETRY_LIMIT = 3;

private Map<Runnable, Integer> retriedTasks = new ConcurrentHashMap<>();
private AbortableCountDownLatch latch;

public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue, AbortableCountDownLatch latch) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    this.latch = latch;
}

@Override
public void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    // If submit() method is called instead of execute()
    if (t == null && r instanceof Future<?>) {
        try {
            Object result = ((Future<?>) r).get();
        } catch (CancellationException e) {
            t = e;
        } catch (ExecutionException e) {
            t = e.getCause();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    if (t != null) {
        retriedTasks.put(r, retriedTasks.getOrDefault(r, 0) + 1);
        System.out.println("Retries for " + r + " -> " + retriedTasks.get(r));
        /* check to see if we have retried this task too many times, if so - shutdown */
        if (retriedTasks.containsKey(r) && retriedTasks.get(r) > RETRY_LIMIT) {
            System.err.println("Thread failed for more than " + RETRY_LIMIT + " times, aborting everything..");
            this.latch.abort();
        } else {
            System.err.println("Thread threw  exception " + t.getMessage() + ". Retry-ing task...");
            execute(r);
        }
    } else {
        /* clear any previous retry count for this runnable */
        retriedTasks.remove(r);
    }
}
}

主要会像这样使用它们:

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MainProcessor {

public static void main(String[] args) {

    AbortableCountDownLatch latch = new AbortableCountDownLatch(5);
    ThreadPoolExecutor threadPoolExecutor = new MyThreadPoolExecutor(8, 8, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), latch);

    for (int i = 0; i < 5; i++) {
        threadPoolExecutor.submit(() -> {
            System.out.println("Started thread " + Thread.currentThread().getName());
            Random random = new Random();
            try {
                Thread.sleep(random.nextInt(7000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
                if (random.nextBoolean()){
                    System.err.println("Thread " + Thread.currentThread().getName() + " failed - throwing exception..");
                    throw new RuntimeException("Thread " + Thread.currentThread().getName() + "failed! spectacularly :!");
                }
                else {
                    System.out.println("Thread " + Thread.currentThread().getName() + " finished.");
                    latch.countDown();
                }
        });
    }

    try {
        latch.await();
    } catch (InterruptedException e) {
        threadPoolExecutor.shutdownNow();
    }

    threadPoolExecutor.shutdown();
}
}

这种方法看起来正确吗?我不太喜欢必须将闩锁传递给线程池执行器和实际的 Runnable。有实现这一目标的标准方法吗?我也很喜欢 Scala 版本。

我见过其他人建议任务应该在失败的情况下重新提交到池中,但这似乎不是一个好主意,因为任务应该只负责实际的运行逻辑,而不是执行细节。

最佳答案

您可以使用任务包装器来完成这项工作,那么它会相当简单:

public class TaskWrapper implements Runnable
{
    private Runnable task;
    private int maxResubmits;
    private ThreadPoolExecutor executor;
    private CountDownLatch latch;

    public TaskWrapper(Runnable task, int maxResubmits, ThreadPoolExecutor executor, CountDownLatch latch) {
        this.task=task;
        this.maxResubmits=maxResubmits;
        this.executor=executor;
        this.latch=latch;
        executor.submit(this);
    }

    public void run() {
        try {
            task.run();
            latch.countdoun();
        }
        catch(Exception e) {
            maxResubmits--;
            if(maxResubmits>0)
                executor.submit(this);
            else
            {
                latch.countdoun();
                executor.shutdownNow()
            }                
        }
    }
}

您现在只需要创建闩锁,调用您的任务,然后等待执行:


List<Runnable> tasks;
int maxResubmits;

CountDownLatch latch=new CountDownLatch(tasks.size());

tasks.forEach(task->new TaskWrapper(task,maxResubmits,executor,latch));

latch.await();

if(!executor.isShutdown())
    executor.shutdown();

关于java - ThreadpoolExecutor 具有重试能力,并在任务失败多次后关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56130677/

相关文章:

c# - Asp.net ThreadPool 中可用的最大线程数是多少

scala - 将值从请求传递到 Controller 下面的所有层

java - switch case 中的枚举引用

java - 如何在 Java 中使用 Swing 修复元素的显示

java - 在python中压缩音频流数据并在java中解压缩

java - JAVA 中的 SSL 套接字连接池

java - 改变java中透明像素的颜色

multithreading - 计算数据包和计算数据包总字节数的区别

scala - 模糊的 Scala : what does following mean in scala? (: _* )

java - Play WS 异常 : javax.net.ssl.SSLException: Received fatal alert: internal_error