java - 使用java线程实现的生产者-消费者只将一半的数据写入文件

标签 java multithreading

您好,我有一个问题,我必须读取一个巨大的 csv 文件。从中删除第一个字段,然后仅将唯一值存储到文件中。我已经编写了一个使用实现生产者-消费者模式的线程的程序。

CSVLineStripper 类的作用正如其名称所示。从 csv 中取出一行,从每行中删除第一个字段并将其添加到队列中。然后,CSVLineProcessor 将该字段一一存储在数组列表中,并检查字段是否唯一,以便仅存储唯一的字段。 Arraylist仅供引用。每个唯一字段都写入文件。

现在发生的情况是所有字段都被正确剥离。我跑了大约3000行,都是正确的。当我启动所有行(大约 7,00,000 多行)的程序时,我得到不完整的记录,大约 1000 个唯一的记录未被获取。每个字段都用双引号引起来。奇怪的是,生成的文件中的最后一个字段是一个不完整的单词,并且缺少结尾双引号。为什么会发生这种情况?

import java.util.*;
import java.io.*;
class CSVData
{
    Queue <String> refererHosts = new LinkedList <String> ();
    Queue <String> uniqueReferers = new LinkedList <String> (); // final writable queue of unique referers

    private int finished = 0;
    private int safety = 100;
    private String line = "";
    public CSVData(){}
    public synchronized String getCSVLine() throws InterruptedException{
        int i = 0;
        while(refererHosts.isEmpty()){
            if(i < safety){
                wait(10);
            }else{
                return null;
            }
            i++;
        }
        finished = 0;
        line = refererHosts.poll();
        return line;
    }

    public synchronized void putCSVLine(String CSVLine){
        if(finished == 0){ 
            refererHosts.add(CSVLine);
            this.notifyAll();
        }
    }
}
class CSVLineStripper implements Runnable //Producer
{
    private CSVData cd;
    private BufferedReader csv;
    public CSVLineStripper(CSVData cd, BufferedReader csv){ // CONSTRUCTOR
        this.cd = cd;
        this.csv = csv;
    }
    public void run() {
        System.out.println("Producer running");
        String line = "";
        String referer = "";
        String [] CSVLineFields;
        int limit = 700000;
        int lineCount = 1;

        try {
            while((line = csv.readLine()) != null){
                CSVLineFields     = line.split(",");
                referer         = CSVLineFields[0];
                cd.putCSVLine(referer);
                lineCount++;
                if(lineCount >= limit){
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("<<<<<< PRODUCER FINISHED >>>>>>>");
    }

    private String printString(String [] str){
        String string = "";
        for(String s: str){
            string = string + " "+s;
        }
        return string;
    }
}

class CSVLineProcessor implements Runnable
{
    private CSVData cd;
    private FileWriter fw = null;
    private BufferedWriter bw = null;

    public CSVLineProcessor(CSVData cd, BufferedReader bufferedReader){ // CONSTRUCTOR
        this.cd = cd;
        try {
            this.fw = new FileWriter("unique_referer_dump.txt");
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.bw = new BufferedWriter(fw);
    }
    public void run() {
        System.out.println("Consumer Started");
        String CSVLine = "";
        int safety = 10000;
        ArrayList <String> list = new ArrayList <String> ();

        while(CSVLine != null || safety <= 10000){
               try {
                CSVLine = cd.getCSVLine();
                if(!list.contains(CSVLine)){
                    list.add(CSVLine);
                    this.CSVDataWriter(CSVLine);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            if(CSVLine == null){
                break;
            }else{
                safety++;
            }
        }

        System.out.println("<<<<<< CONSUMER FINISHED >>>>>>>");
        System.out.println("Unique referers found in 30000 records "+list.size());
    }  
    private void CSVDataWriter(String referer){
        try {
            bw.write(referer+"\n");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


public class RefererCheck2 
{
    public static void main(String [] args) throws InterruptedException
    {
        String pathToCSV = "/home/shantanu/DEV_DOCS/Contextual_Work/excite_domain_kw_site_wise_click_rev2.csv";
        CSVResourceHandler csvResHandler = new CSVResourceHandler(pathToCSV);
        CSVData cd = new CSVData();
        CSVLineProcessor consumer     = new CSVLineProcessor(cd, csvResHandler.getCSVFileHandler());
        CSVLineStripper producer     = new CSVLineStripper(cd, csvResHandler.getCSVFileHandler());
        Thread consumerThread = new Thread(consumer);
        Thread producerThread = new Thread(producer);
        producerThread.start();
        consumerThread.start();
    }
}

示例输入如下:

"xyz.abc.com","4432"."clothing and gifts","true"
"pqr.stu.com","9537"."science and culture","false"
"0.stu.com","542331"."education, studies","false"
"m.dash.com","677665"."technology, gadgets","false"

队列中的生产者商店:

"xyz.abc.com"
"pqr.stu.com"
"0.stu.com"
"m.dash.com"

消费者在文件中存储唯一值,但打开文件内容后会看到

"xyz.abc.com"
"pqr.stu.com"
"0.st

最佳答案

有几件事,你在 700k 之后就中断了,而不是 7m,而且你没有刷新你的缓冲编写器,所以最后的东西你可能不完整,在最后添加刷新并关闭所有资源。调试器是个好主意:)

关于java - 使用java线程实现的生产者-消费者只将一半的数据写入文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11232346/

相关文章:

java - 在 jsp foreach 中保留 Controller 的索引

java - JPA/Spring/Delete Entity,类型不匹配(int/long for id)

java - ThreadPoolTask​​Executor 中的 ConcurrentModificationException 错误

c - 主函数包含对 exit() 和 pthread_exit() 的调用。这两个调用在执行时的效果会有什么不同?

android - 线程实现

java - Java有自动并行化吗?

java - 如何为 Spring Boot 2 添加自定义 MeterRegisty

java - 将 java ClassLoader 添加到堆而不是 perm gen

java - 根据 Maven 中的组 ID 从依赖项中去除版本号

python - 如何禁用 Flask 中的线程?