我正在练习一点并发性。
public class WordOccurrencesBigFile {
private String words;
private ConcurrentHashMap<String, Pair<String, Integer>> wordOccurrencesMap = new ConcurrentHashMap<>();
public WordOccurrencesBigFile(String wordsLine) {
this.words = wordsLine;
}
public void processWords() {
parseWordsLines();
printOrderAlphabetically();
printOrderByCount();
printByInsertionOrder();
}
private void parseWordsLines() {
String[] wordsLinesArray = words.split("\n");
ExecutorService executor = Executors.newFixedThreadPool(5);
for(String wordsLine: wordsLinesArray) {
executor.execute(() -> parseWords(wordsLine));
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
private void parseWords(String wordsLine) {
System.out.println(Thread.currentThread().getName() + " Start.");
System.out.println(Thread.currentThread().getName() + " Processing line: '" + wordsLine + "'");
String[] wordsArray = wordsLine.split(" ");
synchronized(this){
for (String word : wordsArray) {
Pair<String, Integer> pair = null;
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
}
wordOccurrencesMap.put(word, pair);
}
}
System.out.println(Thread.currentThread().getName() + " End.");
}
public static void main(String[] args) {
String wordsLines = "bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa";
WordOccurrencesBigFile wordOccurrences = new
WordOccurrencesBigFile(wordsLines);
wordOccurrences.processWords();
}
}
在 parseWordsLines 上,使用 5 个线程池创建了一个 ExecutorService,并且使用包含“\n”创建的多行的 String 实例化了 WordOccurrencesBigFile 类。目的是让每一行由不同的线程处理,并在 map 上插入唯一单词的计数。
我期望通过使用 ConcurrentHashMap 足以处理我有多个线程读取和写入映射的事实。但大多数时候我执行类(class)时都会得到不同的计数。 (奇怪的是主要是针对“bb”这个词。
但是添加同步(this)问题就解决了。
有人可以解释一下为什么会出现这种行为,解决这个问题的最佳方法,以及我应该将“this”传递给同步块(synchronized block)还是线程正在访问的对象吗?
非常感谢。
最佳答案
好吧,添加 synchronized(this)
可以解决问题,但您将失去多线程和并行化带来的所有好处。
您需要的是ConcurrentMap
的computeIfAbsent
方法。因此 for
循环的主体将转换为
Pair<String, Integer> pair = wordOccurrencesMap.computeIfAbsent(word, w -> new Pair<>(w, 0));
synchronized(pair) {
pair.setValue(pair.getValue()+1);
}
现在您可以省略 synchronized(this)
block 。
编辑:但是您必须确保当第一个线程调用pair.setValue()时,其他线程不能调用pair.getValue(),如注释所述。
关于java - 并发和并发数据结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51965383/