java - 永远不会结束的话题

标签 java multithreading io blockingqueue

这是我的代码:

   public class BigFileReader implements Runnable {
    private final String fileName;
    int a = 0;

    private final BlockingQueue<String> linesRead;
    public BigFileReader(String fileName, BlockingQueue<String> linesRead) {
        this.fileName = fileName;
        this.linesRead = linesRead;
    }
    @Override
    public void run() {
        try {
            //since it is a sample, I avoid the manage of how many lines you have read
            //and that stuff, but it should not be complicated to accomplish
            BufferedReader br = new BufferedReader(new FileReader(new File("E:/Amazon HashFile/Hash.txt")));
            String str = "";

            while((str=br.readLine())!=null)
            {
                linesRead.put(str);
                System.out.println(a);
                a++;
            }

        } catch (Exception ex) {
            ex.printStackTrace();
        }

        System.out.println("Completed");
    }
}





public class BigFileProcessor implements Runnable {
    private final BlockingQueue<String> linesToProcess;
    public BigFileProcessor (BlockingQueue<String> linesToProcess) {
        this.linesToProcess = linesToProcess;
    }
    @Override
    public void run() {
        String line = "";
        try {
            while ( (line = linesToProcess.take()) != null) {
                //do what you want/need to process this line...
                String [] pieces = line.split("(...)/g");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}





public class BigFileWholeProcessor {
    private static final int NUMBER_OF_THREADS = 2;
    public void processFile(String fileName) {

        BlockingQueue<String> fileContent = new LinkedBlockingQueue<String>();
        BigFileReader bigFileReader = new BigFileReader(fileName, fileContent);
        BigFileProcessor bigFileProcessor = new BigFileProcessor(fileContent);
        ExecutorService es = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
        es.execute(bigFileReader);
        es.execute(bigFileProcessor);
        es.shutdown();

    }
}

这段代码很好,但有一个重大问题。也就是说,线程永远不会结束!即使整个过程完成,我仍然可以让程序还活着。这里出了什么问题?

最佳答案

BlockingQueue.take()将阻塞直到有元素可用:

Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

因此,在 BigFileReader 完成读取输入文件并将行放入 BlockingQueue 后,BigFileProcessor 将在 take() 方法中永远等待新输入。

您可能想找到一种方法向 BigFileProcessor 发出信号,表明将不再有任何输入放入 BlockingQueue,可能通过添加 a sentinel value到队列或找到其他方法来告诉 BigFileProcessor 停止调用 take()

哨兵方法的示例:

public class BigFileReader implements Runnable {
    public static final String SENTINEL = "SENTINEL"; //the actual value isn't very important
    ...

    while((str=br.readLine())!=null) {
        linesRead.put(str);
    }
    //when reading the file is done, add SENTINEL to the queue
    linesRead.put(SENTINEL);
}

//inside BigFileProcessor...
while ( (line = linesToProcess.take()) != null) {
    // check if value in queue is sentinel value
    if (line == BigFileReader.SENTINEL) {
         //break out of the while loop
         break;
    }
    //otherwise process the line as normal
}

另一种方法可能是使用 the overload of poll that takes a timeout value而不是 take(),并且逻辑是 BigFileProcessor 如果在超过 N 秒内无法从队列中读取任何内容,则会中断循环。

关于java - 永远不会结束的话题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22232505/

相关文章:

java - Java中如何将输入流连接到输出流?

java - Java 中的缓冲和非缓冲流

c - 输出文件中不需要的空格

java - Xquery:使用Java返回多个数据元素?

java - 将原语的流数组转换为流

java - 在java中,是否可以保存线程本地变量?

java - 为什么 Volatile 变量不用于 Atomicity

ios - 非 ARC 项目在使用 dispatch_get_global_queue 时需要 dispatch_release

c++ - openMP - 需要原子或减少条款

java - 在 Java 中使用 OpenSAML 的 ADFS 和 SAML 2.0