java - 以下代码是否线程安全

标签 java multithreading concurrency hashmap concurrenthashmap

我有一个场景,我必须维护一个可以由多个线程填充的映射,每个线程修改相应的列表(唯一标识符/键是线程名称),当线程的列表大小超过固定的批处理大小时,我们必须将记录保存在数据库中。

示例代码如下:

private volatile ConcurrentHashMap<String, List<T>>  instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReadWriteLock lock ;

public void addAll(List<T> entityList, String threadName) {
    try {
        lock.readLock().lock();
        List<T> instrumentList = instrumentMap.get(threadName);
        if(instrumentList == null) {
            instrumentList = new ArrayList<T>(batchSize);
            instrumentMap.put(threadName, instrumentList);
        }

        if(instrumentList.size() >= batchSize -1){
            instrumentList.addAll(entityList);
            recordSaver.persist(instrumentList); 
            instrumentList.clear();
        } else {
            instrumentList.addAll(entityList);  
        }
    } finally {
        lock.readLock().unlock();
    }

}

每 2 分钟后会有一个单独的线程运行,以持久化 Map 中的所有记录(以确保每 2 分钟后我们有一些东西持久化并且 map 大小不会变得太大)并且当它启动时它会阻塞所有其他线程(检查 readLock 和 writeLock us,其中 writeLock 具有更高的优先级)

if(//Some condition) {
                    Thread.sleep(//2 minutes);
                    aggregator.getLock().writeLock().lock();
                    List<T> instrumentList = instrumentMap .values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
                    if(instrumentList.size() > 0) {

                        saver.persist(instrumentList);
                        instrumentMap .values().parallelStream().forEach(x -> x.clear());
                    aggregator.getLock().writeLock().unlock();
                }

这个解决方案几乎适用于我们测试的每个场景,除了有时我们看到一些记录丢失,即根本没有持久化,尽管它们在 Map 中添加得很好

我的问题是这段代码有什么问题? ConcurrentHashMap 不是最好的解决方案吗? 读/写锁的使用在这里有问题吗? 我应该使用顺序处理吗?

最佳答案

不,它不是线程安全的。

问题是您正在使用 ReadWriteLock 的读取 锁。这不保证进行更新的独占访问权限。为此,您需要使用写入 锁。

但您实际上根本不需要使用单独的锁。您可以简单地使用 ConcurrentHashMap.compute 方法:

instrumentMap.compute(threadName, (tn, instrumentList) -> {
  if (instrumentList == null) {
    instrumentList = new ArrayList<>();
  }

  if(instrumentList.size() >= batchSize -1) {
    instrumentList.addAll(entityList); 
    recordSaver.persist(instrumentList); 
    instrumentList.clear();
  } else {
    instrumentList.addAll(entityList);
  }

  return instrumentList;
});

这允许您更新列表中的项目,同时还保证对给定键的列表的独占访问。

我怀疑您可以将 compute 调用拆分为 computeIfAbsent(如果列表不存在则添加列表),然后是 computeIfPresent (更新/持久化列表):这里不需要这两个操作的原子性。但将它们分开并没有实际意义。


此外,instrumentMap 几乎肯定不应该是易变的。除非您真的想重新分配它的值(鉴于此代码,我对此表示怀疑),否则请删除 volatile 并使其成为最终值。

同样,非最终锁也值得怀疑。如果您坚持使用锁,那么也将其设置为 final。

关于java - 以下代码是否线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51720209/

相关文章:

java - 了解在 TableView 中添加 ChangeListener

java - Java中如何匹配反斜杠?

java - 线程和任务问题

python - 如何在多核多线程上运行 TensorFlow

java - 如果我在单核机器上运行应用程序,volatile 关键字是否有用?

集合中 put/get 中的 Java 并发

java - 在 J2me 应用程序中渲染 html/js?

java - Spring +Ehcache : Not able to get the cache back from cacheManager without key

java - hibernate 线程何时在 Java 中继续执行?

java - Java 的 java.util.concurrent 包的 .NET 等价物是什么?