java - WebCrawler stop方法逻辑【并发实践7.2.5】

标签 java multithreading concurrency threadpool cancellation

我已经阅读了并发实践中的7.2.5章节(shutdownNow的局限性)

shutdownNow问题,它只返回未启动的任务。

首先,我们创建 ExecutorService 来跟踪关闭后取消的任务。

跟踪执行器:

/**
 * TrackingExecutor
 * <p/>
 * ExecutorService that keeps track of cancelled tasks after shutdown
 *
 * @author Brian Goetz and Tim Peierls
 */
public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown =
            Collections.synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor(ExecutorService exec) {
        this.exec = exec;
    }

    public void shutdown() {
        exec.shutdown();
    }

    public List<Runnable> shutdownNow() {
        return exec.shutdownNow();
    }

    public boolean isShutdown() {
        return exec.isShutdown();
    }

    public boolean isTerminated() {
        return exec.isTerminated();
    }

    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        return exec.awaitTermination(timeout, unit);
    }

    public List<Runnable> getCancelledTasks() {
        if (!exec.isTerminated())
            throw new IllegalStateException(/*...*/);
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                } finally {
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
}

然后我们创建使用 TrackingExecutor 的 Crawler:

爬虫:

/**
 * WebCrawler
 * <p/>
 * Using TrackingExecutorService to save unfinished tasks for later execution
 *
 * @author Brian Goetz and Tim Peierls
 */
public abstract class WebCrawler {
    private volatile TrackingExecutor exec;
    @GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>();

    private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
    private static final long TIMEOUT = 500;
    private static final TimeUnit UNIT = MILLISECONDS;

    public WebCrawler(URL startUrl) {
        urlsToCrawl.add(startUrl);
    }

    public synchronized void start() {
        exec = new TrackingExecutor(Executors.newCachedThreadPool());
        for (URL url : urlsToCrawl) submitCrawlTask(url);
        urlsToCrawl.clear();
    }

    public synchronized void stop() throws InterruptedException {
        try {
            saveUncrawled(exec.shutdownNow());
            if (exec.awaitTermination(TIMEOUT, UNIT))
                saveUncrawled(exec.getCancelledTasks());
        } finally {
            exec = null;
        }
    }

    protected abstract List<URL> processPage(URL url);

    private void saveUncrawled(List<Runnable> uncrawled) {
        for (Runnable task : uncrawled)
            urlsToCrawl.add(((CrawlTask) task).getPage());
    }

    private void submitCrawlTask(URL u) {
        exec.execute(new CrawlTask(u));
    }

    private class CrawlTask implements Runnable {
        private final URL url;

        CrawlTask(URL url) {
            this.url = url;
        }

        private int count = 1;

        boolean alreadyCrawled() {
            return seen.putIfAbsent(url, true) != null;
        }

        void markUncrawled() {
            seen.remove(url);
            System.out.printf("marking %s uncrawled%n", url);
        }

        public void run() {
            for (URL link : processPage(url)) {
                if (Thread.currentThread().isInterrupted())
                    return;
                submitCrawlTask(link);
            }
        }

        public URL getPage() {
            return url;
        }
    }
}

让我们研究停止方法:

 public synchronized void stop() throws InterruptedException {
     try {
         saveUncrawled(exec.shutdownNow()); //1
         if (exec.awaitTermination(TIMEOUT, UNIT)) //2
             saveUncrawled(exec.getCancelledTasks()); //3
         } finally {
             exec = null;
         }
     }
 }

saveUncrawled(exec.shutdownNow()); //1

在第 1 行中,我们执行 shutdownNow 并保存返回(未启动)的任务。
如果我理解正确 shutdownNow 返回未启动的任务并中断已启动的任务

exec.awaitTermination(TIMEOUT, UNIT) //2

此外,我们希望将已取消的任务添加到此集合中。 在第 2 行,我们给出时间并等待超时终止。

问题 1

为什么我们要为此操作设置超时?

据我了解 - shutdownNow 无论如何都会中断正在进行的任务。我认为没有理由等待。

exec.getCancelledTasks() 

awaitTermination 方法在任务成功完成的情况下返回 true,因此我不清楚为什么我们在这种情况下尝试添加已取消的任务。

请澄清 stop 方法的逻辑。

最佳答案

关于boolean waitTermination(long timeout, TimeUnit unit)的超时:

中断线程并不一定会立即(或根本)停止它。引用Java Tutorial on Interrupts :

An interrupt is an indication to a thread that it should stop what it is doing and do something else. It's up to the programmer to decide exactly how a thread responds to an interrupt, but it is very common for the thread to terminate. This is the usage emphasized in this lesson.

它也在 javadoc 中直接拼写为 ExecutorService#shutdownNow() :

There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.

Thread#interrupt() 的 javadoc 中提到了线程在中断后仍然存活的其他原因。 。例如:

Unless the current thread is interrupting itself, which is always permitted, the checkAccess method of this thread is invoked, which may cause a SecurityException to be thrown.

如果不仔细研究ExecutorService的javadoc,stop()方法的逻辑并不明显。 (参见“使用示例”部分,第二个示例)。 shutdownNow() 的问题是它尝试取消所有线程,但是 (a) 这可能需要一些时间,并且 (b) 不能保证它会成功(见上文)。 awaitTermination(long, TimeUnit) 允许跟踪此进度。我将逐行执行 stop() 方法:

saveUncrawled(exec.shutdownNow());

启动 ExecutorService 关闭并收集等待执行的任务。已完成的任务将被忽略,当前正在执行的任务也将被忽略。

if (exec.awaitTermination(TIMEOUT, UNIT))

shutdownNow() 只是向当前正在运行的任务发出信号,表明它们应该通过中断停止。它不会杀死他们。此外,在被打扰后停止工作也需要时间。因此,您必须等待执行完成。超时是为了防止您永远阻塞,以防某些任务永远无法完成(无论出于何种原因)。请记住,线程可以忽略中断,否则它们可能需要比剩余超时更长的时间才能停止工作。因此,在 awaitTermination(TIMEOUT, UNIT) 之后可能仍然有一些任务剩余。 TrackingExecutor 仅收集可以取消的任务。但不是超时到期后可能仍在执行的那些。

saveUncrawled(exec.getCancelledTasks());

如果所有任务都可以取消,awaitTermination() 返回true。在这种情况下,所有取消的任务都会被收集。如果不是所有任务都可以取消(即 awaitTermination() 返回 false),仍然会有一些任务未处理。

关于java - WebCrawler stop方法逻辑【并发实践7.2.5】,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42385664/

相关文章:

java - Spring 和 Jackson ObjectMapper 无法使用自定义过滤器

java - 如何杀死应用程序中由第 3 方库设计的所有线程

c++ - C++11 可变数量的异步线程

java - 有没有办法限制 .getId()

java - 使用 Java 8 简化循环

Java JSON 格式帮助

Java Camel 框架 : Losing message body in processor

multithreading - 让 MATLAB 运行包含无限循环的多个独立函数

c# - 为什么没有像 ParameterizedThreadStart<T> 这样的类?

haskell - 并发性能比顺序性能差