java - 在更新共享变量时消除过度同步并改进错误处理

标签 java multithreading caching exception synchronization

我有一个缓存数据库查询结果的共享对象,其接口(interface)是“获取缓存结果”和“使缓存结果无效”。返回稍微陈旧的数据是可以接受的。

我当前的解决方案粘贴在这个问题的底部。每个缓存的 getclear方法可通过 CacheService 中的公共(public)方法访问.内 Cache , lastUpdated包含最近的查询结果; isValid指示是否应更新结果; updateGuard用于确保只有一个线程更新结果;和 updateWait让线程等待另一个线程更新结果。为确保进度并因为返回稍微陈旧的数据是可以接受的,在 lastUpdated 之后更新后我立即从更新线程和等待更新的所有线程返回结果 - 我不检查是否 isValid已设置为 false再次。


主要问题:如果lastUpdated = getUpdate()抛出异常(可能是尝试与数据库通信时网络故障的结果)然后我现在只是返回 lastUpdated - 返回稍微陈旧的数据是可以接受的,但在 getUpdate() 期间重复出现 transient 故障可能导致极度陈旧的数据。我想包括一些逻辑

final int maxRetries = 5;
...
try {
  updateWait.drainPermits();
  int retryCount = 0;
  while(true) {
    try {
      lastUpdated = getUpdate();
      break;
    } catch(Exception e) {
      retryCount++;
      if(retryCount == maxRetries) {
        throw Exception e in all threads waiting on semaphore
      }
    }
  }
  isValid = true;
}

但是我不确定是否有一种好的方法来实现“在所有等待信号量的线程中抛出异常 e”,或者是否有更好的选择。我考虑过的一种选择是使用 Scala Try ,即 Try<ImmutableList<T>> lastUpdated ,但我尽可能不混合使用 Scala 和 Java 对象,以使代码维护更容易。


不太重要:现在我有三个同步变量(isValid、updateGuard、updateWait),这似乎过多 - 我正在寻找一种方法来安全地消除其中的一个或两个。


public class CacheService {
  private final Cache<Foo> fooCache;
  private final Cache<Bar> barCache;
  // and so on

  private abstract class Cache<T> {
    private final AtomicBoolean updateGuard = new AtomicBoolean(false);
    private final Semaphore updateWait = new Semaphore(Integer.MAX_VALUE);

    private volatile boolean isValid = true;
    private volatile ImmutableList<T> lastUpdated = getUpdate();

    protected abstract ImmutableList<T> getUpdate();

    public void clear() {
      isValid = false;
    }

    public ImmutableList<T> get() {
      if(isValid) {
        return lastUpdated;
      } else {
        if(updateGuard.compareAndSet(false, true)) {
          try {
            updateWait.drainPermits();
            lastUpdated = getUpdate();
            isValid = true;
          } finally {
            updateGuard.set(false);
            updateWait.release(Integer.MAX_VALUE);
          }
        } else {
          while(updateGuard.get()) {
            try {
              updateWait.acquire();
            } catch(InterruptedException e) {
              break;
            }
          }
        }
        return lastUpdated;
      }
    }
  }

  public CacheService() {
    fooCache = new Cache<Foo>() {
      @Override
      protected ImmutableList<Foo> getUpdate() {
        return // database query
      }
    };
    // Likewise when initializing barCache etc
  }
}

最佳答案

一种方法是使用 CompletableFuturecompleteExceptionally

private abstract static class Cache<T> {
    private final AtomicReference<CompletableFuture<ImmutableList<T>>> value = 
        new AtomicReference<>();
    private static final int MAX_TRIES = 5;

    protected abstract ImmutableList<T> getUpdate();

    public void clear() {
        value.getAndUpdate(f -> f != null && f.isDone() ? null : f);
        // or value.set(null); if you want the cache to be invalidated while it is being updated.
    }

    public ImmutableList<T> get() {
        CompletableFuture<ImmutableList<T>> f = value.get();
        if (f != null) {
            try {
                return f.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
        f = new CompletableFuture<>();
        if (!value.compareAndSet(null, f)) {
            return get();
        }
        for(int tries = 0; ; ){
            try {
                ImmutableList<T> update = getUpdate();
                f.complete(update);
                return update;
            } catch (Exception e){
                if(++tries == MAX_TRIES){
                    f.completeExceptionally(e);
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

您可能希望以不同的方式处理异常,如果您想再次尝试获取更新,则需要在抛出异常后清除它。

关于java - 在更新共享变量时消除过度同步并改进错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31249704/

相关文章:

java - 父类(super class)对象数组。如何将它们作为子类进行管理?

java - Java线程和操作系统线程的区别?

HTTP缓存困惑

java - 在客户端使用 Axis 1.4 + Web 服务响应缓存进行缓存

caching - IE6 和缓存

java - JMS 在 jboss 5.x 中是否需要 hsqldb 文件?

java - 空格和整数的正则表达式,但没有 'double negative' 符号

java - 主线程中的异常 - java.util.InputMismatchException

python - 套接字和线程python

multithreading - Rust 2021 与 2018 : trait `std::marker::Send` is not implemented - what is the correct way to do this in 2021 edition?