我有一个缓存数据库查询结果的共享对象,其接口(interface)是“获取缓存结果”和“使缓存结果无效”。返回稍微陈旧的数据是可以接受的。
我当前的解决方案粘贴在这个问题的底部。每个缓存的 get
和 clear
方法可通过 CacheService
中的公共(public)方法访问.内 Cache
, lastUpdated
包含最近的查询结果; isValid
指示是否应更新结果; updateGuard
用于确保只有一个线程更新结果;和 updateWait
让线程等待另一个线程更新结果。为确保进度并因为返回稍微陈旧的数据是可以接受的,在 lastUpdated
之后更新后我立即从更新线程和等待更新的所有线程返回结果 - 我不检查是否 isValid
已设置为 false
再次。
主要问题:如果lastUpdated = getUpdate()
抛出异常(可能是尝试与数据库通信时网络故障的结果)然后我现在只是返回 lastUpdated
- 返回稍微陈旧的数据是可以接受的,但在 getUpdate()
期间重复出现 transient 故障可能导致极度陈旧的数据。我想包括一些逻辑
final int maxRetries = 5;
...
try {
updateWait.drainPermits();
int retryCount = 0;
while(true) {
try {
lastUpdated = getUpdate();
break;
} catch(Exception e) {
retryCount++;
if(retryCount == maxRetries) {
throw Exception e in all threads waiting on semaphore
}
}
}
isValid = true;
}
但是我不确定是否有一种好的方法来实现“在所有等待信号量的线程中抛出异常 e”,或者是否有更好的选择。我考虑过的一种选择是使用 Scala Try
,即 Try<ImmutableList<T>> lastUpdated
,但我尽可能不混合使用 Scala 和 Java 对象,以使代码维护更容易。
不太重要:现在我有三个同步变量(isValid、updateGuard、updateWait),这似乎过多 - 我正在寻找一种方法来安全地消除其中的一个或两个。
public class CacheService {
private final Cache<Foo> fooCache;
private final Cache<Bar> barCache;
// and so on
private abstract class Cache<T> {
private final AtomicBoolean updateGuard = new AtomicBoolean(false);
private final Semaphore updateWait = new Semaphore(Integer.MAX_VALUE);
private volatile boolean isValid = true;
private volatile ImmutableList<T> lastUpdated = getUpdate();
protected abstract ImmutableList<T> getUpdate();
public void clear() {
isValid = false;
}
public ImmutableList<T> get() {
if(isValid) {
return lastUpdated;
} else {
if(updateGuard.compareAndSet(false, true)) {
try {
updateWait.drainPermits();
lastUpdated = getUpdate();
isValid = true;
} finally {
updateGuard.set(false);
updateWait.release(Integer.MAX_VALUE);
}
} else {
while(updateGuard.get()) {
try {
updateWait.acquire();
} catch(InterruptedException e) {
break;
}
}
}
return lastUpdated;
}
}
}
public CacheService() {
fooCache = new Cache<Foo>() {
@Override
protected ImmutableList<Foo> getUpdate() {
return // database query
}
};
// Likewise when initializing barCache etc
}
}
最佳答案
一种方法是使用 CompletableFuture
和 completeExceptionally
private abstract static class Cache<T> {
private final AtomicReference<CompletableFuture<ImmutableList<T>>> value =
new AtomicReference<>();
private static final int MAX_TRIES = 5;
protected abstract ImmutableList<T> getUpdate();
public void clear() {
value.getAndUpdate(f -> f != null && f.isDone() ? null : f);
// or value.set(null); if you want the cache to be invalidated while it is being updated.
}
public ImmutableList<T> get() {
CompletableFuture<ImmutableList<T>> f = value.get();
if (f != null) {
try {
return f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
f = new CompletableFuture<>();
if (!value.compareAndSet(null, f)) {
return get();
}
for(int tries = 0; ; ){
try {
ImmutableList<T> update = getUpdate();
f.complete(update);
return update;
} catch (Exception e){
if(++tries == MAX_TRIES){
f.completeExceptionally(e);
throw new RuntimeException(e);
}
}
}
}
}
您可能希望以不同的方式处理异常,如果您想再次尝试获取更新,则需要在抛出异常后清除它。
关于java - 在更新共享变量时消除过度同步并改进错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31249704/