我想实现以下逻辑:
-将使用以下结构
//Map<String, CopyOnWriteArrayList> keeping the pending updates
//grouped by the id of the updated object
final Map<String, List<Update>> updatesPerId = new ConcurrentHashMap<>();
-n个生产者会向updatesPerId map添加更新(对于同一个id,可以同时添加2个更新)
-one TimerThread 会时不时地运行,并且必须处理接收到的更新。像这样的东西:
final Map<String, List<Update>> toBeProcessed = new HashMap<>(updatesPerId);
updatesPerId.clear();
// iterate over toBeProcessed and process them
有什么方法可以让这个逻辑线程安全,而不需要同步来自生产者的添加逻辑和来自timerThread(consumer)的逻辑?我正在考虑原子清除+获取,但似乎 ConcurrentMap 不提供类似的东西。 另外,我必须提到更新应该由更新的对象 ID 保存,所以我不能用队列或其他东西替换 map 。
有什么想法吗? 谢谢!
最佳答案
您可以利用 ConcurrentHashMap.compute
executes atomically .
你可以像这样放入updatesPerId
:
updatesPerId.compute(k, (k, list) -> {
if (list == null) list = new ArrayList<>();
// ... add to the list
// Return a non-null list, so the key/value pair is stored in the map.
return list;
});
这不是使用 computeIfAbsent
然后添加到返回值,这不是原子的。
然后在你的线程中删除东西:
for (String key : updatesPerId.keySet()) {
List<Update> list = updatesPerId.put(key, null);
updatesPerId.compute(key, (k, list) -> {
// ... Process the contents of the list.
// Removes the key/value pair from the map.
return null;
});
}
因此,如果您碰巧尝试同时在两个地方处理键,则将键添加到列表(或处理该键的所有值)可能会阻塞;否则不会被屏蔽。
编辑:正如@StuartMarks 所指出的,最好先简单地从 map 中取出所有东西,然后再处理它们,以避免阻塞其他试图添加的线程:
Map<String, List<Update>> newMap = new HashMap<>();
for (String key : updatesPerId.keySet()) {
newMap.put(key, updatesPerId.remove(key));
}
// ... Process entries in newMap.
关于Java: get+clear atomic for map,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54091824/