java - 如何在 Java 迭代器中使用 ExecutorService,而不会有资源泄漏的风险

标签 java concurrency executorservice resource-leak

我有一个 Java 迭代器,它列出了远程位置的项目。项目列表以“页面”形式出现,“获取下一页”操作相当慢。 (具体来说,我的迭代器称为 S3Find 并列出来自 Amazon S3 的对象)。

因此,为了加快速度,我想预取一个列表页面。为此,我使用 ExecutorServiceCallable/Future 模式来预取项目的“页面”。问题是,该迭代器的调用者可能随时放弃该操作,而不通知我的类。例如,考虑以下循环:

for (S3URL f : new S3Find(topdir).withRecurse(true)) {
    // do something with f
    if (some_condition) break;
}

因此,a 发生了资源泄漏,因为即使没有更多对包含的 S3Find 的引用(并且即使下一次预取已完成),我用来提交 CallableExecutorService 仍然处于 Activity 状态并正在运行。

处理这个问题的正确方法是什么?我使用了错误的方法吗?我是否应该放弃 ExecutorService 并为每次预取使用新的裸线程(并在预取完成时终止线程)?请注意,每次获取页面大约需要 500 毫秒,因此相比之下,每次创建一个新线程可能可以忽略不计。 我不希望的一件事是要求调用者明确通知 S3Find 他们已完成迭代(因为它肯定会被某些人忘记)。

这是当前的预取代码(在S3Find内):

/**
 * This class holds one ObjectListing (one "page"), and also pre-fetches
 * the next page using a {@link S3Find#NextPageGetter} Callable on a
 * separate thread.
 */
private static class Pager {
    private final AmazonS3 s3;
    private ObjectListing currentList;
    private Future<ObjectListing> future;
    private final ExecutorService exec;
    public Pager(AmazonS3 s3, ListObjectsRequest request) {
        this.s3 = s3;
        currentList = s3.listObjects(request);
        exec = Executors.newSingleThreadExecutor();
        future = submitPrefetch();
    }
    public ObjectListing getCurrentPage() {
        return currentList;
    }
    /**
     * Move currentList to the next page, and returns it.
     */
    public ObjectListing getNextPage() {
        if (future == null) return null;
        try {
            currentList = future.get();
            future = submitPrefetch();
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
        }
        return currentList;
    }
    private Future<ObjectListing> submitPrefetch() {
        if (currentList == null || !currentList.isTruncated()) {
            exec.shutdown();
            return null;
        } else {
            NextPageGetter worker = new NextPageGetter(s3, currentList);
            return exec.submit(worker);
        }
    }
}

/**
 * This class retrieves the "next page" of a truncated ObjectListing.
 * It is meant to be called in a Callable/Future pattern.
 */
private static class NextPageGetter implements Callable<ObjectListing> {
    private final ObjectListing currentList;
    private final AmazonS3 s3;

    public NextPageGetter(AmazonS3 s3, ObjectListing currentList) {
        super();
        this.s3 = s3;
        this.currentList = currentList;
        if (currentList == null || !currentList.isTruncated()) {
            throw new IllegalArgumentException(currentList==null ?
                        "null List" : "List is not truncated");
        }
    }

    @Override
    public ObjectListing call() throws Exception {
        ObjectListing nextList = s3.listNextBatchOfObjects(currentList);
        return nextList;
    }
}

最佳答案

这是一个我遇到过几次的经典问题。数据库连接发生在我身上。

Should I just abandon ExecutorService and use a new bare thread for every prefetch (and kill the thread when the prefetch is done)?

我想这是你唯一的选择。我不会打扰杀死线程。只要让它完成它的工作并在后台死掉即可。为下一页创建一个新线程。您需要加入线程并使用某种常见的 AtomicReference (或其他东西)在 S3Find 调用者和线程之间共享结果列表。

One thing I do not want is to require callers to explicitly inform S3Find that they are done iterating (as it will for sure be forgotten by some).

我没有看到任何简单的方法可以“正确”执行此操作,调用者无需在 try/finally 中调用某种 close() 方法。您不能在 Javadoc 中明确说明这一点吗?这就是我在 ORMLite database iterators 中所做的.

S3Find s3Find = new S3Find(topdir).withRecurse(true);
try {
    for (S3URL f : s3Find) {
        ...
    }
} finally {
    s3Find.close();
}

然后在S3Find.close()中:

public void close() {
    exec.shutdown();
}

在 Java 7 中,他们添加了 try with resources construct该语言会自动关闭任何Closeable 资源。这是一个巨大的胜利。

关于java - 如何在 Java 迭代器中使用 ExecutorService,而不会有资源泄漏的风险,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12964915/

相关文章:

Java Card Crypto 异常 RSA key 加密

java - 锁定写入 HashMap

c# AsyncSocket 服务器需要锁定?

go - 循环检查并发程序中的条件

java - ScheduledExecutorService 一段时间后未运行

Java:为什么同步列表会出现 ConcurrentModificationException?

java - 我如何打印我的面板,从 (locationx,locationy) 到 (panel.getWidth(),panel.getHeight())?

java - 如何使用 ExecutorService 轮询直到结果到达

java - 我应该如何在 Java 中测试私有(private)方法?

维护进程池的 Java 库