Java并发得到不一致的结果。 (带锁和LongAdder)

标签 java concurrency java.util.concurrent

我正在做这些练习:

  1. Write a program that walks a directory tree and generates a thread for each file. In the threads, count the number of words in the files and, without using locks, update a shared counter that is declared as public static long count = 0; Run the program multiple times. What happens? Why?

  2. Fix the program of the preceding exercise with using a lock.

  3. Fix the program of the preceding exercise with using a LongAdder.

我编写了以下程序,其中

  1. CountWordThread 回答练习 1,
  2. CountWordLockThread 回答练习 2,并且
  3. CountWordLongAdderThread 回答练习 3。

Java 代码如下:

import java.io.*;
import java.util.*;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
import java.util.regex.*;

public class ThreadedCountWord {


    public long count = 0;
    LongAdder la = new LongAdder();

    public class CountWordThread extends Thread {
        private File f;
        CountWordThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        count ++;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    ReentrantLock lock = new ReentrantLock();

    public class CountWordLockThread extends Thread {
        private File f;
        CountWordLockThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        // It's important to wrap your code into a
                        // try/finally block to ensure unlocking in case
                        // of exceptions.
                        lock.lock();
                        try {
                            count++;
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    public class CountWordLongAdderThread extends Thread {
        private File f;
        CountWordLongAdderThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        la.increment();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    public void runThreads(Stream<Path> s) {
        // 1. this MAY get inconsistent results
        try {
            count = 0;
            ExecutorService executor = Executors.newCachedThreadPool();
            s.forEach(p -> {
                    CountWordThread t = new CountWordThread(p.toFile());
                    t.start();
                    executor.submit(t);
                });
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
            System.out.printf("(NoLock) count: %d\n", count);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void runThreadsWithLock(Stream<Path> s) {
        // 2. this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ExecutorService executor = Executors.newCachedThreadPool();
            s.forEach(p -> {
                    CountWordLockThread t = new CountWordLockThread(p.toFile());
                    t.start();
                    executor.submit(t);
                });
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
            System.out.printf("(Lock) count: %d\n", count);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void runThreadsWithLongAdder(Stream<Path> s) {
        // 3. this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ExecutorService executor = Executors.newCachedThreadPool();
            s.forEach(p -> {
                    CountWordLongAdderThread t = new CountWordLongAdderThread(p.toFile());
                    t.start();
                    executor.submit(t);
                });
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
            System.out.printf("(LongAdder) count: %d\n", la.sum());
            la.reset();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        // run multi times
        try {
            for (int i = 0; i < 20; i ++) {
                Path path = Paths.get(".");
                Stream<Path> sp = Files.walk(path);
                Stream<Path> s = sp.filter(p -> p.toString().endsWith(".java")
                                           && Files.isRegularFile(p)
                                           && Files.isReadable(p));
                ThreadedCountWord tcw = new ThreadedCountWord();
                // tcw.runThreads(s); // 1. this MAY get inconsistent results
                tcw.runThreadsWithLock(s); // 2. this SHOULD NOT get inconsistent results
                // tcw.runThreadsWithLongAdder(s); // 3. this SHOULD NOT get inconsistent results
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

几乎每次运行 2 或 3 时,我都会得到不一致的答案。我不明白为什么。

示例结果如下:

(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35815 <-- note this
(Lock) count: 35862
(Lock) count: 35862

对于练习 2,以及

(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35826 <-- note this
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862

练习 3。

你能帮我一下吗?

更新

在@chrylis的帮助下,我使用以下代码更新了我的答案,该代码按预期运行: (上面的代码得到错误答案的原因正是@Ivan所说的。

import java.io.*;
import java.util.*;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
import java.util.regex.*;

public class ThreadedCountWord {

    public long count = 0;
    LongAdder la = new LongAdder();

    public class CountWordThread extends Thread {
        private File f;
        CountWordThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        count ++;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    ReentrantLock lock = new ReentrantLock();

    public class CountWordLockThread extends Thread {
        private File f;
        CountWordLockThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        // It's important to wrap your code into a
                        // try/finally block to ensure unlocking in case
                        // of exceptions.
                        lock.lock();
                        try {
                            count++;
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    public class CountWordLongAdderThread extends Thread {
        private File f;
        CountWordLongAdderThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        la.increment();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    public void runThreads(Stream<Path> s) {
        // this MAY get inconsistent results
        try {
            count = 0;
            ArrayList<Thread> ts = new ArrayList<>();
            s.forEach(p -> {
                    CountWordThread t = new CountWordThread(p.toFile());
                    t.start();
                    ts.add(t);
                });
            ts.stream().forEach(t -> {
                    try {
                        t.join();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            System.out.printf("(NoLock) count: %d\n", count);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void runThreadsWithLock(Stream<Path> s) {
        // this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ArrayList<Thread> ts = new ArrayList<>();
            s.forEach(p -> {
                    CountWordLockThread t = new CountWordLockThread(p.toFile());
                    t.start();
                    ts.add(t);
                });
            ts.stream().forEach(t -> {
                    try {
                        t.join();   
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                });
            System.out.printf("(Lock) count: %d\n", count);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void runThreadsWithLongAdder(Stream<Path> s) {
        // this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ArrayList<Thread> ts = new ArrayList<>();
            s.forEach(p -> {
                    CountWordLongAdderThread t = new CountWordLongAdderThread(p.toFile());
                    t.start();
                    ts.add(t);
                });
            ts.stream().forEach(t -> {
                    try {
                        t.join();   
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            System.out.printf("(LongAdder) count: %d\n", la.sum());
            la.reset();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        // run multi times
        try {
            for (int i = 0; i < 20; i ++) {
                Path path = Paths.get(".");
                Stream<Path> sp = Files.walk(path);
                Stream<Path> s = sp.filter(p -> p.toString().endsWith(".java")
                                           && Files.isRegularFile(p)
                                           && Files.isReadable(p));
                ThreadedCountWord tcw = new ThreadedCountWord();
                // tcw.runThreads(s); // this MAY get inconsistent results
                // tcw.runThreadsWithLock(s); // this SHOULD NOT get inconsistent results
                tcw.runThreadsWithLongAdder(s); // this SHOULD NOT get inconsistent results
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

最佳答案

您启动任务两次:第一次使用 t.start() ,第二次提交给执行程序。并且由于您没有在 t.start() 之后调用 t.join() 来等待任务完成,因此您可能会得到不一致的结果,因为您在所有作业之前打印了值已完成

关于Java并发得到不一致的结果。 (带锁和LongAdder),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51074109/

相关文章:

java - 清除所有事件 Google 日历 Java

java - 如果一个 `changeSet` 申请失败,如何回滚 `databaseChangeLog` 内的所有 `changeSet` ?

java - 如何在不使用静态的情况下将信息从一个类传输到另一个类以显示它?

java - 无法调用 java.util.concurrent.CopyOnWriteArrayList.readObject() 此错误意味着什么?

java - 在 ExecutorService 中使用同步块(synchronized block)

java - 从字符串输入中打印偶数单词?

java - java中如何在一个线程完成时结束其他线程的处理

java - 兰特流的高性能缓冲

如果我们只允许局部变量是可变的,我们可以防止共享可变状态吗?

java - 两把锁的守护状态