java - 同步访问队列

标签 java multithreading

我有一个要求,我需要点击链接并获得回复。响应是包含子链接的 XML 数据。然后将响应复制到文件中,并将子链接添加到队列中,然后我必须迭代地点击子链接,直到没有更多的子链接。

我首先使用单个队列来完成此操作。但由于它很慢,我尝试实现一个执行器。我不必维护数据的顺序。这是我现在的方法:

 public class Hierarchy2 {

    private static AbstractQueue<String> queue = new ConcurrentLinkedQueue<>();
    private static FileWriter writer;

    private static SAXParser saxParser;
    private static XMLHandler xmlHandler = new XMLHandler();

    public static void main(String[] args) throws IOException, ParserConfigurationException, SAXException {
        writer = new FileWriter(new File("hierarchy.txt"));
        String baseUrl = "my url here";

        queue.add(baseUrl);

        int threadCount = Runtime.getRuntime().availableProcessors() + 1;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executor.execute(new QueueProcess(queue, writer, xmlHandler));
        }

        executor.shutdown();

    }
}

class QueueProcess implements Runnable {

    private AbstractQueue<String> queue;
    private HttpURLConnection connection;
    private URL url;
    private FileWriter writer;
    private SAXParserFactory factory = SAXParserFactory.newInstance();
    private SAXParser saxParser;
    private XMLHandler xmlHandler;

    public QueueProcess(AbstractQueue<String> queue, FileWriter writer, XMLHandler xmlHandler) {
        this.queue = queue;
        this.writer = writer;

        this.xmlHandler = xmlHandler;
    }

    @Override
    public void run() {
        try {
            saxParser = factory.newSAXParser();
            while (true) {
                String link = queue.poll();
                if (link != null) {
                    if (queue.size() >= 500) {
                        System.out.println("here" + "     " + Thread.currentThread().getName());
                        getChildLinks(link);
                    } else {
                        System.out.println(link + "     " + Thread.currentThread().getName());
                        queue.addAll(getChildLinks(link));
                    }
                }
            }
        } catch (IOException | SAXException | ParserConfigurationException e) {
            e.printStackTrace();
        }

    }

    private List<String> getChildLinks(String link) throws IOException, SAXException {
        url = new URL(link);
        connection = (HttpURLConnection) url.openConnection();
        connection.connect();

        String result = new BufferedReader(new InputStreamReader(connection.getInputStream())).lines()
                .collect(Collectors.joining());

        saxParser.parse(new ByteArrayInputStream(result.getBytes()), xmlHandler);
        List<String> urlList = xmlHandler.getURLList();

        writer.write(result + System.lineSeparator());

        connection.disconnect();
        return urlList;
    }

}

程序运行良好,但在某些时候我遇到了空指针异常。它位于 QueueProcess' run 方法中的 queue.addAll 行。

异常(exception):

Exception in thread "pool-1-thread-3" java.lang.NullPointerException
    at java.util.concurrent.ConcurrentLinkedQueue.checkNotNull(ConcurrentLinkedQueue.java:914)
    at java.util.concurrent.ConcurrentLinkedQueue.addAll(ConcurrentLinkedQueue.java:525)
    at QueueProcess.run(Hierarchy2.java:77)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-1" java.lang.NullPointerException
    at java.util.concurrent.ConcurrentLinkedQueue.checkNotNull(ConcurrentLinkedQueue.java:914)
    at java.util.concurrent.ConcurrentLinkedQueue.addAll(ConcurrentLinkedQueue.java:525)
    at QueueProcess.run(Hierarchy2.java:77)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我不明白为什么会有 NPE,因为我在 while 循环的每次迭代之前检查队列是否不为空。请告诉我为什么会出现空指针异常并可以防止这种情况发生。

更新:

我终于修复了 NPE。正如 @gusto2 所建议的,这是由于 arraylist 包含队列不接受的空值。

现在我的代码是这样的:

public class Hierarchy2 {

    private static BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private static FileWriter writer;
    private static XMLHandler xmlHandler = new XMLHandler();

    public static void main(String[] args) throws IOException, ParserConfigurationException, SAXException {
        writer = new FileWriter(new File("hierarchy.txt"));
        String baseUrl = "my url here";

        queue.add(baseUrl);

        int threadCount = Runtime.getRuntime().availableProcessors() + 1;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executor.execute(new QueueProcess(queue, writer, xmlHandler));
        }

        executor.shutdown();

    }
}

class QueueProcess implements Runnable {

    private BlockingQueue<String> queue;
    private HttpURLConnection connection;
    private URL url;
    private FileWriter writer;
    private SAXParserFactory factory = SAXParserFactory.newInstance();
    private SAXParser saxParser;
    private XMLHandler xmlHandler = new XMLHandler();

    public QueueProcess(BlockingQueue<String> queue, FileWriter writer, XMLHandler xmlHandler) {
        this.queue = queue;
        this.writer = writer;
    }

    @Override
    public void run() {
        try {
            saxParser = factory.newSAXParser();
            while (true) {
                String link = queue.poll();
                if (link != null) {
                    System.out.println(link + "     " + Thread.currentThread().getName());
                    queue.addAll(getChildLinks(link));
                }
            }
        } catch (IOException | SAXException | ParserConfigurationException e) {
            e.printStackTrace();
        }

    }

    private List<String> getChildLinks(String link) throws IOException, SAXException {
        url = new URL(link);
        connection = (HttpURLConnection) url.openConnection();
        connection.connect();

        String result = new BufferedReader(new InputStreamReader(connection.getInputStream())).lines()
                .collect(Collectors.joining());

        saxParser.parse(new ByteArrayInputStream(result.getBytes()), xmlHandler);
        List<String> urlList = xmlHandler.getURLList();

        writer.write(result + System.lineSeparator());

        connection.disconnect();
        return urlList;
    }

}

现在的问题是当线程一起处理了 500 条记录时暂停线程。一旦达到 500,我将不得不创建另一个文件,然后再次开始处理。

另请告诉我在所有队列完全读取后如何停止代码。 IE。不会再将任何子链接添加到队列中。由于我使用的是始终为 true 的 while 循环,因此代码将无限期地运行。如果我使用条件 while(!queue.isEmpty()),则只有一个线程会运行,而其他线程会发现队列为空。

最佳答案

Exception in thread "pool-1-thread-1" java.lang.NullPointerException 
at java.util.concurrent.ConcurrentLinkedQueue.checkNotNull(ConcurrentLinkedQueue.java:914) 
at  java.util.concurrent.ConcurrentLinkedQueue.addAll(ConcurrentLinkedQueue.java:525)

我猜List<String> urlList = xmlHandler.getURLList();返回一个 ArrayList,其中包含一些空值。虽然没有更多信息,但很难说得更准确

关于java - 同步访问队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46297699/

相关文章:

java - 使用自定义版本的 Artifact 而不是 repo 中的 Artifact 构建 Maven 项目

java - Activiti中的异步流程执行

java - java中run()方法可以使用synchronized吗?

c - 如何使用_beginthreadex 启动一个线程?

python - python - 从另一个线程启动后将systrace处理程序添加到python线程的方法?

c# - 不能使用与其父 Freezable 属于不同线程的 DependencyObject - prism

java - 如何在 Java 中停止线程?

java - 如何更改使用 Java 制作的打砖 block 游戏中球的 vector ?

java - 如何在 Java 中存储对类的引用?

java - 使用 DateTimeFormatter 将不同格式的 DateTime 处理为唯一的格式,即最长的格式