我正在编写一个进行批处理的程序。批处理元素可以彼此独立处理,我们希望最大限度地减少整体处理时间。因此,我不是一次循环遍历批处理中的每个元素,而是使用 ExecutorService 并向其提交 Callable 对象:
public void process(Batch batch)
{
ExecutorService execService = Executors.newCachedThreadPool();
CopyOnWriteArrayList<Future<BatchElementStatus>> futures = new CopyOnWriteArrayList<Future<BatchElementStatus>>();
for (BatchElement element : batch.getElement())
{
Future<MtaMigrationStatus> future = execService.submit(new ElementProcessor(batch.getID(),
element));
futures.add(future);
}
boolean done = false;
while (!done)
{
for (Future<BatchElementStatus> future : futures)
{
try
{
if (future.isDone())
{
futures.remove(future);
}
}
catch (Exception e)
{
System.out.println(e.getMessage());
}
if (futures.size() == 0)
{
done = true;
}
}
}
}
我们希望能够取消批处理。因为我没有使用循环,所以我不能只在每个循环的顶部检查是否设置了取消标志。
我们正在使用一个 JMS 主题,BatchProcessor 和 ElementProcessor 都将监听该主题以通知它们批处理已被取消。
在 ElementProcess call() 中有许多步骤,其中一些步骤可以安全地停止处理,但没有返回点。该类具有以下基本设计:
public class ElementProcessor implements Callable, MessageListener
{
private cancelled = false;
public void onMessage(Message msg)
{
// get message object
cancelled = true;
}
public BatchElementStatus call()
{
String status = SUCCESS;
if (!cancelled)
{
doSomehingOne();
}
else
{
doRollback();
status = CANCELLED;
}
if (!cancelled)
{
doSomehingTwo();
}
else
{
doRollback();
status = CANCELLED;
}
if (!cancelled)
{
doSomehingThree();
}
else
{
doRollback();
status = CANCELLED;
}
if (!cancelled)
{
doSomehingFour();
}
else
{
doRollback();
status = CANCELLED;
}
// After this point, we cannot cancel or pause the processing
doSomehingFive();
doSomehingSix();
return new BatchElementStatus("SUCCESS");
}
}
除了在 if(!cancelled)
语句的调用方法中包装方法调用/代码块之外,我想知道是否有更好的方法来检查批处理/元素是否已被取消.
有什么建议吗?
最佳答案
我不认为你可以做得比你现在做的更好,但这里有一个替代方案:
public BatchElementStatus call() {
return callMethod(1);
}
private callMethod(int methodCounter) {
if (cancelled) {
doRollback();
return new BatchElementStatus("FAIL");
}
switch (methodCounter) {
case 1 : doSomethingOne(); break;
case 2 : doSomethingTwo(); break;
...
case 5 : doSomethingFive();
doSomethingSix();
return new BatchElementStatus("SUCCESS");
}
return callMethod(methodCounter + 1);
}
另外,你想制作 cancelled
volatile ,因为onMessage
将从另一个线程调用。但您可能不想使用 onMessage
和 cancelled
无论如何(见下文)。
其他小问题:1) CopyOnWriteArrayList<Future<BatchElementStatus>> futures
应该只是一个 ArrayList
.使用并发集合误导我们认为 futures
在很多线程上。 2) while (!done)
应替换为 while (!futures.isEmpty())
和 done
删除。 3)你可能应该调用future.cancel(true)
而不是“消息”取消。然后你必须检查 if (Thread.interrupted())
而不是 if (cancelled)
.如果你想杀死所有 future ,那么只需调用 execService.shutdownNow()
;你的任务必须处理中断才能工作。
编辑:
而不是你的while(!done) { for (... futures) { ... }}
, 你应该使用 ExecutorCompletionService .它可以完成您正在尝试做的事情,而且可能会做得更好。 API中有一个完整的例子。
关于Java:停止可调用线程的设计思路,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7209731/