java - 将 ConcurrentHashMap 安全发布到类字段中

标签 java multithreading thread-safety safe-publication

我正在尝试编写一个测试,证明在多线程环境中为类字段分配新引用不是线程安全的,更具体地说,如果该字段未声明为 volatile ,则会出现可见性问题。或AtomicReference

我使用的场景是PropertiesLoader类(如下所示),它应该加载存储在 Map<String, String> 中的一组属性(当前仅使用一个属性)并且还尝试支持重新加载。因此,有许多线程读取属性,并且在某个时间点另一个线程正在重新加载读取线程需要可见的新值。

测试的目的如下:

  • 它调用读取器线程,这些线程正在旋转等待,直到它们“看到” 属性值变化
  • 在某个时刻,编写器线程会使用属性的新值创建一个新映射,并将该映射分配给相关字段 ( PropertyLoader.propertiesMap )
  • 如果所有读取器线程都看到新值,则测试完成,否则它将永远挂起。

现在我知道,严格来说,没有测试可以证明某些代码的线程安全性(或缺乏线程安全性),但在这种情况下,我觉得它应该相对容易至少从经验上证明这个问题。

我尝试过使用 HashMap实现来存储属性,在这种情况下,即使我只使用一个读取线程,测试也会按预期挂起。

但是,如果 ConcurrentHashMap使用实现时,无论使用多少个读取线程,测试都不会挂起(我也尝试在读取器线程中随机等待,但没有成功)。

据我了解,事实上 ConcurrentHashMap是线程安全的不应该影响它所分配到的字段的可见性。所以volatile/AtomicReference该字段仍然需要。然而,上述测试似乎与此相矛盾,因为它的行为就好像 map 始终安全发布而无需额外同步。

我的理解有误吗?也许ConcurrentHashMap做了一些我不知道的额外同步 promise ?

任何帮助将不胜感激。

附注下面的代码应该可以作为 Junit 测试执行。我在一台装有 AMD Ryzen 5、Windows 10、JDK 1.8.0_201 的机器上运行它,在第二台机器上运行 i7 Intel、Fedora 30、JDK 1.8.xx(不记得 JDK 的确切版本),结果相同。

import org.junit.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

public class PropertiesLoaderTest {

    private static final String NEW_VALUE = "newValue";
    private static final String OLD_VALUE = "oldValue";
    private static final String PROPERTY = "property";

    /**
     *  Controls if the reference we are testing for visibility issues ({@link PropertiesLoader#propertyMap} will
     *  be assigned a HashMap or ConcurrentHashMap implementation during {@link PropertiesLoader#load(boolean)}
     */
    private static boolean USE_SIMPLE_MAP = false;

    @Test
    public void testReload() throws Exception {
        PropertiesLoader loader = new PropertiesLoader();
        Random random = new Random();

        int readerThreads = 5;
        int totalThreads = readerThreads + 1;

        final CountDownLatch startLatch = new CountDownLatch(1);
        final CountDownLatch finishLatch = new CountDownLatch(totalThreads);

        // start reader threads that read the property trying to see the new property value
        for (int i = 0; i < readerThreads; i++) {
            startThread("reader-thread-" + i, startLatch, finishLatch, () -> {
                while (true) {
                    String value = loader.getProperty(PROPERTY);
                    if (NEW_VALUE.equals(value)) {
                        log("Saw new value: " + value + " for property: " + PROPERTY);
                        break;
                    }
                }
            });
        }

        // start writer thread (i.e. the thread that reloads the properties)
        startThread("writer-thread", startLatch, finishLatch, () -> {
            Thread.sleep(random.nextInt(500));

            log("starting reload...");
            loader.reloadProperties();
            log("finished reload...");
        });

        log("Firing " + readerThreads + " threads and 1 writer thread...");
        startLatch.countDown();

        log("Waiting for all threads to finish...");
        finishLatch.await();
        log("All threads finished. Test successful");
    }

    static class PropertiesLoader {
        // The reference in question: this is assigned in the constructor and again when calling reloadProperties()
        // It is not volatile nor AtomicReference so there are visibility concerns
        Map<String, String> propertyMap;

        PropertiesLoader() {
            this.propertyMap = load(false);
        }

        public void reloadProperties() {
            this.propertyMap = load(true);
        }

        public String getProperty(String propertyName) {
            return propertyMap.get(propertyName);
        }

        private static Map<String, String> load(boolean isReload) {
            // using a simple HashMap always hang the test as expected: the new reference cannot be
            // seen by the reader thread

            // using a ConcurrentHashMap always allow the test to finish no matter how many reader
            // threads are used
            Map<String, String> newMap = USE_SIMPLE_MAP ? new HashMap<>() : new ConcurrentHashMap<>();
            newMap.put(PROPERTY, isReload ? NEW_VALUE : OLD_VALUE);
            return newMap;
        }
    }

    static void log(String msg) {
        //System.out.println(Thread.currentThread().getName() + " - " + msg);
    }

    static void startThread(String name, CountDownLatch start, CountDownLatch finish, ThreadTask task) {
        Thread t = new Thread(new ThreadTaskRunner(name, start, finish, task));
        t.start();
    }

    @FunctionalInterface
    interface ThreadTask {
        void execute() throws Exception;
    }

    static class ThreadTaskRunner implements Runnable {
        final CountDownLatch start;
        final CountDownLatch finish;
        final ThreadTask task;
        final String name;

        protected ThreadTaskRunner(String name, CountDownLatch start, CountDownLatch finish, ThreadTask task) {
            this.start = start;
            this.finish = finish;
            this.task = task;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                Thread.currentThread().setName(name);
                start.await();
                log("thread started");
                task.execute();
                log("thread finished successfully");
            } catch (Exception e) {
                log("Error: " + e.getMessage());
            }
            finish.countDown();
        }
    }
}

最佳答案

这比你想象的要糟糕一些,但也有一个可取之处。

更糟糕的是:构造函数不同步。在这种情况下,这意味着在构造函数中创建的 PropertiesLoader.propertyMap 不能保证对其他线程(读取器或写入器)可见。这里的可取之处是您使用的 CountDownLatch(它们建立了发生之前的关系)以及 Thread.start(这也建立发生之前关系)。此外,在实践中,“构造函数不同步”很少成为问题并且难以重现(另请参见下面的测试代码)。有关此事的更多信息,请阅读this question 。结论是 PropertiesLoader.propertyMap 必须是 volatile/AtomicReferencefinal (final) code> 可以与 ConcurrentHashMap 结合使用)。

无法使用 ConcurrentHashMap 重现同步问题的原因与难以重现“构造函数不同步”问题的原因相同:ConcurrentHashMap 使用同步在内部(请参阅 this answer )触发内存刷新,不仅使映射中的新值对其他线程可见,而且还使新的 PropertiesLoader.propertyMap 值可见。

请注意, volatile PropertiesLoader.propertyMap 将保证(而不仅仅是使其成为可能)新值对其他线程可见(ConcurrentHashMap 不是必需的,另请参阅this answer)。我通常将此类映射设置为只读映射(借助Collections.unmodifyingMap()),以向其他程序员广播这不是一个可以在以下位置更新或更改的普通映射:会的。

下面是一些更多的测试代码,试图消除尽可能多的同步。测试的最终结果完全相同,但它也显示了循环中存在 volatile boolean 值的副作用,并且 propertyMap 的非空赋值总是被其他线程看到.

package so;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class MapVisibility {

    static int readerThreadsAmount = 2;

    public static void main(String[] args) {

        ExecutorService executors = Executors.newFixedThreadPool(readerThreadsAmount);
        try {
            new MapVisibility().run(executors);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executors.shutdownNow(); // Does not work on FAIL, manually kill reader-task from task-manager.
        }
    }

    //final boolean useConcurrentMap = false;
    // When ConcurrentHashMap is used, test is always a success.
    final boolean useConcurrentMap = true;

    final boolean useStopBoolean = false;
    // When volatile stop boolean is used, test is always a success.
    //final boolean useStopBoolean = true;

    //final boolean writeToConsole = false;
    // Writing to System.out is synchronized, this can make a test succeed that would otherwise fail.
    final boolean writeToConsole = true;

    Map<String, String> propertyMap;
    // When the map is volatile, test is always a success.
    //volatile Map<String, String> propertyMap;

    final String oldValue = "oldValue";
    final String newValue = "newValue";
    final String key = "key";
    volatile boolean stop;

    void run(ExecutorService executors) throws Exception {

        IntStream.range(0,  readerThreadsAmount).forEach(i -> {
            executors.execute(new MapReader());
        });
        sleep(500); // give readers a chance to start
        setMap(oldValue);
        sleep(100); // give readers a chance to read map
        setMap(newValue);
        sleep(100); // give readers a chance to read new value in new map
        executors.shutdown();
        if (!executors.awaitTermination(100L, TimeUnit.MILLISECONDS)) {
            System.out.println("FAIL");
            stop = true;
        } else {
            System.out.println("Success");
        }
    }

    void setMap(String value) {

        Map<String, String> newMap = (useConcurrentMap ? new ConcurrentHashMap<>() : new HashMap<>());
        newMap.put(key, value);
        propertyMap = newMap;
    }

    class MapReader implements Runnable {

        @Override
        public void run() {
            print("Reader started.");
            final long startTime = System.currentTimeMillis();
            while (propertyMap == null) {
                // In worse case, this loop should never exit but it always does.
                // No idea why.
                sleep(1);
            }
            print((System.currentTimeMillis() - startTime) + " Reader got map.");
            if (useStopBoolean) {
                while (!stop) {
                    if (newValue.equals(propertyMap.get(key))) {
                        break;
                    }
                }
            } else {
                while (true) {
                    if (newValue.equals(propertyMap.get(key))) {
                        break;
                    }
                }
            }
            print((System.currentTimeMillis() - startTime) + " Reader got new value.");
        }
    }

    void print(String msg) {
        if (writeToConsole) {
            System.out.println(msg);
        }
    }

    void sleep(int timeout) {

        // instead of using Thread.sleep, do some busy-work instead.
        final long startTime = System.currentTimeMillis();
        Random r = new Random();
        @SuppressWarnings("unused")
        long loopCount = 0;
        while (System.currentTimeMillis() - startTime < timeout) {
            for (int i = 0; i < 100_000; i++) {
                double d = r.nextDouble();
                double v = r.nextDouble();
                @SuppressWarnings("unused")
                double dummy = d / v;
            }
            loopCount++;
        }
        //print("Loops: " + loopCount);
    }

}

关于java - 将 ConcurrentHashMap 安全发布到类字段中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59919450/

相关文章:

java - Spring REST Web 服务每天的第一个请求很慢

C++: 跨线程异常处理问题与 boost::exception

java - 使用多线程读取单个文件 : should speed up?

java - 当将 `openjdk:11` 或 `openjdk:8` 与 CGroup 的实验标志一起使用时,是否仍需要 `-Xmx`

java - 使用 JSON 和 PHP 从 Mysql DB 获取数据时出错

c++ - 为什么 boost.log 在 Windows XP 上会崩溃?

ios - 解读iOS崩溃日志与-[UIApplication _cachedSystemAnimationFenceCreatingIfNecessary :]

java - 如果一个方法只是调用 Java 中的另一个线程安全方法,它是否是线程安全的?

multithreading - selenium 线程可以安全地使用 Python 进行抓取吗?

java - BigDecimal 中 Divide 方法的 Scale()