java - 增加和删除 ConcurrentHashMap 的元素

标签 java multithreading concurrency concurrenthashmap

有一个类Counter,它包含一组键并允许递增每个键的值并获取所有值。所以,我试图解决的任务与Atomically incrementing counters stored in ConcurrentHashMap中的任务相同。 。不同之处在于键的集合是无界的,因此经常添加新键。

为了减少内存消耗,我在读取值后清除它们,这发生在 Counter.getAndClear() 中。 key 也被移除,这似乎破坏了一切。

一个线程递增随机键,另一个线程获取所有值的快照并清除它们。

代码如下:

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.Map;
import java.util.HashMap;
import java.lang.Thread;

class HashMapTest {
    private final static int hashMapInitSize = 170;
    private final static int maxKeys = 100;
    private final static int nIterations = 10_000_000;
    private final static int sleepMs = 100;

    private static class Counter {
        private ConcurrentMap<String, Long> map;

        public Counter() {
            map = new ConcurrentHashMap<String, Long>(hashMapInitSize);
        }

        public void increment(String key) {
            Long value;
            do {
                value = map.computeIfAbsent(key, k -> 0L);
            } while (!map.replace(key, value, value + 1L));
        }

        public Map<String, Long> getAndClear() {
            Map<String, Long> mapCopy = new HashMap<String, Long>();
            for (String key : map.keySet()) {
                Long removedValue = map.remove(key);
                if (removedValue != null)
                    mapCopy.put(key, removedValue);
            }
            return mapCopy;
        }
    }

    // The code below is used for testing
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();
        Thread thread = new Thread(new Runnable() {
            public void run() {
                for (int j = 0; j < nIterations; j++) {
                    int index = ThreadLocalRandom.current().nextInt(maxKeys);
                    counter.increment(Integer.toString(index));
                }
            }
        }, "incrementThread");
        Thread readerThread = new Thread(new Runnable() {
            public void run() {
                long sum = 0;
                boolean isDone = false;
                while (!isDone) {
                    try {
                        Thread.sleep(sleepMs);
                    }
                    catch (InterruptedException e) {
                        isDone = true;
                    }
                    Map<String, Long> map = counter.getAndClear();
                    for (Map.Entry<String, Long> entry : map.entrySet()) {
                        Long value = entry.getValue();
                        sum += value;
                    }
                    System.out.println("mapSize: " + map.size());
                }
                System.out.println("sum: " + sum);
                System.out.println("expected: " + nIterations);
            }
        }, "readerThread");
        thread.start();
        readerThread.start();
        thread.join();
        readerThread.interrupt();
        readerThread.join();
        // Ensure that counter is empty
        System.out.println("elements left in map: " + counter.getAndClear().size());
    }
}

在测试时我注意到一些增量丢失了。我得到以下结果:

sum: 9993354
expected: 10000000
elements left in map: 0

如果您无法重现此错误(该总和小于预期),您可以尝试将 maxKeys 增加几个数量级或减少 hashMapInitSize 或增加 nIterations(后者也会增加运行时间)。我还包含了测试代码(主要方法),以防出现任何错误。

我怀疑是在运行时增加ConcurrentHashMap的容量时发生的错误。在我的计算机上,当 hashMapInitSize 为 170 时,代码似乎可以正常工作,但当 hashMapInitSize 为 171 时,代码会失败。我相信 171 的大小会触发容量的增加 (128/0.75 = = 170.66,其中0.75是 HashMap 的默认负载因子)。

所以,问题是:我是否正确使用了 removereplacecomputeIfAbsent 操作?根据 Use of ConcurrentHashMap eliminates data-visibility troubles? 的答案,我假设它们是 ConcurrentHashMap 上的原子操作。 。如果是这样,为什么会丢失一些增量?

编辑:

我认为我在这里错过了一个重要的细节,即 increment() 应该比 getAndClear() 更频繁地调用,因此我尽量避免任何increment() 中显式锁定。不过,我稍后会测试不同版本的性能,看看这是否真的是一个问题。

最佳答案

我猜问题是在迭代keySet时使用了remove。这就是 JavaDoc 对于 Map#keySet() 的说法(我的重点):

Returns a Set view of the keys contained in this map. The set is backed by the map, so changes to the map are reflected in the set, and vice-versa. If the map is modified while an iteration over the set is in progress (except through the iterator's own remove operation), the results of the iteration are undefined.

ConcurrentHashMap 的 JavaDoc 提供了进一步的线索:

Similarly, Iterators, Spliterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration.

结论是,在迭代键时改变映射是不可预测的。

一种解决方案是为 getAndClear() 操作创建一个新映射并仅返回旧映射。开关必须受到保护,在下面的示例中,我使用了 ReentrantReadWriteLock:

class HashMapTest {
private final static int hashMapInitSize = 170;
private final static int maxKeys = 100;
private final static int nIterations = 10_000_000;
private final static int sleepMs = 100;

private static class Counter {
    private ConcurrentMap<String, Long> map;
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    ReadLock readLock = lock.readLock();
    WriteLock writeLock = lock.writeLock();

    public Counter() {
        map = new ConcurrentHashMap<>(hashMapInitSize);
    }

    public void increment(String key) {
        readLock.lock();
        try {
            map.merge(key, 1L, Long::sum);
        } finally {
            readLock.unlock();
        }
    }

    public Map<String, Long> getAndClear() {
        ConcurrentMap<String, Long> oldMap;
        writeLock.lock();
        try {
            oldMap = map;
            map = new ConcurrentHashMap<>(hashMapInitSize);
        } finally {
            writeLock.unlock();
        }

        return oldMap;
    }
}

// The code below is used for testing
public static void main(String[] args) throws InterruptedException {
    final AtomicBoolean ready = new AtomicBoolean(false);

    Counter counter = new Counter();
    Thread thread = new Thread(new Runnable() {
        public void run() {
            for (int j = 0; j < nIterations; j++) {
                int index = ThreadLocalRandom.current().nextInt(maxKeys);
                counter.increment(Integer.toString(index));
            }
        }
    }, "incrementThread");

    Thread readerThread = new Thread(new Runnable() {
        public void run() {
            long sum = 0;
            while (!ready.get()) {
                try {
                    Thread.sleep(sleepMs);
                } catch (InterruptedException e) {
                    //
                }
                Map<String, Long> map = counter.getAndClear();
                for (Map.Entry<String, Long> entry : map.entrySet()) {
                    Long value = entry.getValue();
                    sum += value;
                }
                System.out.println("mapSize: " + map.size());
            }
            System.out.println("sum: " + sum);
            System.out.println("expected: " + nIterations);
        }
    }, "readerThread");
    thread.start();
    readerThread.start();
    thread.join();
    ready.set(true);
    readerThread.join();
    // Ensure that counter is empty
    System.out.println("elements left in map: " + counter.getAndClear().size());
}
}

关于java - 增加和删除 ConcurrentHashMap 的元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41229766/

相关文章:

java - Netty 5.0.0 closeFuture().sync() 从未完成

java - Netbeans 和 JDK 10

java - 始终位于顶部但允许在下方单击的透明 JFrame

java - CountdownLatch 任务完成后的 countDown 方法行为

java - Object.wait() 无需在 JVM 中旋转

java - 与 ConcurrentSkipListSet 一起正常工作的即时比较器

java - Gradle 在 FreeBSD Jail 中无法正常工作

java - 变量是否应该在 2 个正在运行的线程之间是 volatile 的?

c++ - 如何从调用的函数获取返回值,该函数在 TBB 的另一个线程中执行?

java - Future.cancel() 没有取消 ScheduledExecutorService 的计划执行