Java:停止可调用线程的设计思路

标签 java multithreading

我正在编写一个进行批处理的程序。批处理元素可以彼此独立处理,我们希望最大限度地减少整体处理时间。因此,我不是一次循环遍历批处理中的每个元素,而是使用 ExecutorService 并向其提交 Callable 对象:

    public void process(Batch batch)
    {
        ExecutorService execService = Executors.newCachedThreadPool();
        CopyOnWriteArrayList<Future<BatchElementStatus>> futures = new CopyOnWriteArrayList<Future<BatchElementStatus>>();

        for (BatchElement element : batch.getElement())
        {
            Future<MtaMigrationStatus> future = execService.submit(new ElementProcessor(batch.getID(),
                    element));
            futures.add(future);
        }

        boolean done = false;

        while (!done)
        {
            for (Future<BatchElementStatus> future : futures)
            {
                try
                {
                    if (future.isDone())
                    {
                        futures.remove(future);
                    }
                }
                catch (Exception e)
                {
                    System.out.println(e.getMessage());
                }

                if (futures.size() == 0)
                {
                    done = true;
                }
            }
        }
    }

我们希望能够取消批处理。因为我没有使用循环,所以我不能只在每个循环的顶部检查是否设置了取消标志。

我们正在使用一个 JMS 主题,BatchProcessor 和 ElementProcessor 都将监听该主题以通知它们批处理已被取消。

在 ElementProcess call() 中有许多步骤,其中一些步骤可以安全地停止处理,但没有返回点。该类具有以下基本设计:

public class ElementProcessor implements Callable, MessageListener
{
    private cancelled = false;

    public void onMessage(Message msg)
    {
        // get message object
        cancelled = true;
    }

    public BatchElementStatus call()
    {
        String status = SUCCESS;

        if (!cancelled)
        {
            doSomehingOne();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }            

        if (!cancelled)
        {
            doSomehingTwo();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }            

        if (!cancelled)
        {
            doSomehingThree();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }            

        if (!cancelled)
        {
            doSomehingFour();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }

        // After this point, we cannot cancel or pause the processing

        doSomehingFive();
        doSomehingSix();

        return new BatchElementStatus("SUCCESS");
    }

}

除了在 if(!cancelled) 语句的调用方法中包装方法调用/代码块之外,我想知道是否有更好的方法来检查批处理/元素是否已被取消.

有什么建议吗?

最佳答案

我不认为你可以做得比你现在做的更好,但这里有一个替代方案:

public BatchElementStatus call() {
    return callMethod(1);
}

private callMethod(int methodCounter) {
    if (cancelled) {
       doRollback();
       return new BatchElementStatus("FAIL");
    }
    switch (methodCounter) {
       case 1 : doSomethingOne(); break;
       case 2 : doSomethingTwo(); break;
       ...
       case 5 : doSomethingFive();
                doSomethingSix();
                return new BatchElementStatus("SUCCESS");
    }
    return callMethod(methodCounter + 1);
}          

另外,你想制作 cancelled volatile ,因为onMessage将从另一个线程调用。但您可能不想使用 onMessagecancelled无论如何(见下文)。

其他小问题:1) CopyOnWriteArrayList<Future<BatchElementStatus>> futures应该只是一个 ArrayList .使用并发集合误导我们认为 futures在很多线程上。 2) while (!done)应替换为 while (!futures.isEmpty())done删除。 3)你可能应该调用future.cancel(true)而不是“消息”取消。然后你必须检查 if (Thread.interrupted())而不是 if (cancelled) .如果你想杀死所有 future ,那么只需调用 execService.shutdownNow() ;你的任务必须处理中断才能工作。

编辑:

而不是你的while(!done) { for (... futures) { ... }} , 你应该使用 ExecutorCompletionService .它可以完成您正在尝试做的事情,而且可能会做得更好。 API中有一个完整的例子。

关于Java:停止可调用线程的设计思路,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7209731/

相关文章:

Java泛型返回类型问题

java - jboss wildfly 数据源连接失败 - 在配置的阻塞超时内没有可用的托管连接

java - 通过json在python和java之间传递对象

multithreading - 在后台线程上的 NSBlockOperation 之后立即在主线程上运行一个 block

c# - C#中的基本线程

java - 如何简化尴尬的并发代码?

java - 等待用户输入继续

Java - Spark SQL DataFrame 映射函数不工作

Java 密码填充错误

python matplotlib : Plotting to GUI in non-main thread