我的应用程序有一点问题。
我有 3 个线程:主线程、生产者 - 读取一些文件和消费者 - 逐行显示读取的文件;
这里是主类(class)的简短版本:
MAIN.class
for (int i = 0; i < args.length; i++) {
List<Thread> threadList = new ArrayList<Thread>();
String file = args[i];
int queueSize = 10;
int waitTime = 200;
BlockingQueue queue = new LinkedBlockingQueue(queueSize);
Reader reader = (Reader) context.getBean("reader");
Writer writer = (Writer) context.getBean("writer");
Handler handler = (Handler) context.getBean("Handler", writer, reader, queueSize);
Producer producer = (Producer) context.getBean("Producer", handler, queue, waitTime);
Consumer consumer = (Consumer) context.getBean("Consumer", handler, queue, waitTime);
handler.setFile(file, extension.get(0));
threadList.add(new Thread(producer, "Producer"));
threadList.add(new Thread(consumer, "Consumer"));
for (Thread thread : threadList) {
thread.start();
}
for (Thread thread : threadList) {
try {
thread.join();
} catch (InterruptedException e) {
System.err.println("Interrupted Exception thrown by : " + thread.getName());
}
}
} else break;
}
HANDLER.class
public Handler(Writer writer, Reader reader, int count) {
this.reader = reader;
this.writer = writer;
queue = new LinkedBlockingQueue(count);
}
public void getHandle(LinkedList list) {
writer.Printer(list);
}
public LinkedList setHandle(String string, int line) {
reader.objectReader(string, getFileName(), line);
return reader.getList();
}
消费者类
public Consumer(Handler handler, BlockingQueue queue, int waitTime) {
this.queue = queue;
this.handler = handler;
this.waitTime = waitTime;
}
@Override
public void run() {
while (!isEnd()) {
try {
handler.getHandle(queue.poll(waitTime, MILLISECONDS));
} catch (Exception e) {
System.err.println(e);
}
}
}
private boolean isEnd() {
if (queue.isEmpty() && handler.isFileEnd()) {
return true;
} else {
return false;
}
}
PRODUCER.class
public Producer(Handler handler, BlockingQueue queue, int waitTime) {
this.queue = queue;
this.handler = handler;
this.waitTime = waitTime;
}
@Override
public void run() {
try {
if (handler.getFileExtension().equals("xlsx")) {
xlsxCheck(handler.getFile());
} else {
Stream stream = Files.lines(getFilePath());
Iterator iterator = stream.iterator();
String string = (String) iterator.next();
while (string != null) {
queue.offer(handler.setHandle(string , i), waitTime, MILLISECONDS);
if (!iterator.hasNext()) {
setFileEnd(true);
System.out.println("produce stop");
Thread.sleep(100);
break;
}
string = (String) iterator.next();
i++;
}
}
} catch (Exception e) {
System.out.println("Error: " + e);
}
}
我的第一个问题是 Producer 和 Main 类在 Consumer 处于 WAIT 状态时停止,但我使用 BlockingQueue 的 poll() 方法解决了它。 第二个是当我读取文件时,我在控制台中看到以下行:
- 放入队列 [2, 200, order2, 1.csv, 2, OK] ---i--- 2
- 放入队列 [3, 200, order3, 1.csv, 3, OK] ---i--- 3
- 放入队列 [4, 200, order4, 1.csv, 4, OK] ---i--- 4
- 放入队列 [5, 200, order5, 1.csv, 5, OK] ---i--- 5
- 放入队列 [6, 200, order6, 1.csv, 6, OK] ---i--- 6
- 放入队列 [7, 200, order7, 1.csv, 7, OK] ---i--- 7
- 放入队列 [8, 200, order8, 1.csv, 8, OK] ---i--- 8
不错
但是之后,我明白
- {}
- {"id":"8","amount":"200","comment":"order8","filename":"1.csv","line":8,"result":"OK"}
- {"id":"8","amount":"200","comment":"order8","filename":"1.csv","line":8,"result":"OK"}
- {"id":"8","amount":"200","comment":"order8","filename":"1.csv","line":8,"result":"OK"}
- {"id":"8","amount":"200","comment":"order8","filename":"1.csv","line":8,"result":"OK"}
- {"id":"8","amount":"200","comment":"order8","filename":"1.csv","line":8,"result":"OK"}
- {"id":"8","amount":"200","comment":"order8","filename":"1.csv","line":8,"result":"OK"}
- {"id":"8","amount":"200","comment":"order8","filename":"1.csv","line":8,"result":"OK"}
我只有最后一行有 8 次,这是不正确的。我不明白为什么。
有人可以帮我吗?
最佳答案
这很奇怪,但在 reader.objectReader() 中读取文件时,每一步都需要创建新的列表;
关于java - 队列只有最后一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50093224/