我正在寻找一个实用程序类或最佳实践模式来处理我的应用程序中大量传入的有状态事件。
想象一个生产者产生许多事件,然后由对这些事件进行操作的应用程序使用这些事件。现在,在某些情况下,生产者生成的事件多于消费者实际可以处理的事件,但由于所有事件都是有状态的,因此是否会错过某些事件并不重要,因为最新的事件包含先前事件传达的所有信息。
我现在已经编写了以下 java 代码来处理这些情况,但我不确定这是否是正确的方法,以及是否没有更简单、更好、更安全的方法。
private static ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1);
private final static Object lock = new Object();
private static List<EventData> lastEventData = null;
static {
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
synchronized(lock) {
while(lastEventData == null && !executorService.isShutdown()) {
try {
lock.wait();
} catch (InterruptedException ex) { ... }
}
try {
actUponEvent(lastEventData);
} catch (Throwable ex) { ... }
lastEventData = null;
}
}
}, 250, 250, TimeUnit.MILLISECONDS);
}
public synchronized update(final List<EventData> data) {
synchronized(lock) {
lastEventData = data;
lock.notifyAll();
}
}
public void dispose() {
executorService.shutdown();
}
换句话说,我希望事件通知一到达就收到,但速率限制为每 250 毫秒一个事件,而且我只对最后一个传入事件感兴趣。
我浏览了 java.util.concurrent 来寻找一些提示/现有的解决方案,但找不到任何适合我的问题的东西。 BlockingQueue 起初似乎非常好,因为如果为空,它就会阻塞,但另一方面,队列本身对我来说并不重要,因为无论如何我只对最新事件感兴趣,并且如果已满则插入时阻塞并不重要我正在寻找什么。
最佳答案
以下模型可以支持非常高的更新率(每秒数千万),但您只需要在内存中保留最新的即可。
<小时/>如果您每 N 毫秒拍摄一次快照,则可以使用此方法。
final AtomicReference<ConcurrentHashMap<Key, Event>> mapRef =
当您有更新时,将其添加到 ConcurrentMap 中。选择键以便替换前一个事件的事件具有相同的键。
Key key = keyFor(event);
mapRef.get().put(key, event);
这种映射方式可以立即获得任何键的最新更新。
有一个每 N 毫秒运行一次的任务。此任务运行时可以将 map 交换为另一个 map (或之前的空 map 以避免创建新 map )
ConcurrentMap<Key, Event> prev = mapRef.set(prevEmptyMap);
for(Event e: prev.values())
process(e);
prev.clear();
this.prevEmptymap = prev;
关于处理大量有状态事件的 Java 最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24742040/