处理大量有状态事件的 Java 最佳实践

标签 java events concurrency event-handling threadpool

我正在寻找一个实用程序类或最佳实践模式来处理我的应用程序中大量传入的有状态事件。

想象一个生产者产生许多事件,然后由对这些事件进行操作的应用程序使用这些事件。现在,在某些情况下,生产者生成的事件多于消费者实际可以处理的事件,但由于所有事件都是有状态的,因此是否会错过某些事件并不重要,因为最新的事件包含先前事件传达的所有信息。

我现在已经编写了以下 java 代码来处理这些情况,但我不确定这是否是正确的方法,以及是否没有更简单、更好、更安全的方法。

private static ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1);
private final static Object lock = new Object();
private static List<EventData> lastEventData = null;

static {
    executorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            synchronized(lock) {
                while(lastEventData == null && !executorService.isShutdown()) {
                    try {
                        lock.wait();
                    } catch (InterruptedException ex) { ... }
                }
                try {
                    actUponEvent(lastEventData);
                } catch (Throwable ex) { ... }
                lastEventData = null;
            }
        }
    }, 250, 250, TimeUnit.MILLISECONDS);
}


public synchronized update(final List<EventData> data) {
    synchronized(lock) {
        lastEventData = data;
        lock.notifyAll();
    }
}

public void dispose() {
    executorService.shutdown();
}

换句话说,我希望事件通知一到达就收到,但速率限制为每 250 毫秒一个事件,而且我只对最后一个传入事件感兴趣。

我浏览了 java.util.concurrent 来寻找一些提示/现有的解决方案,但找不到任何适合我的问题的东西。 BlockingQueue 起初似乎非常好,因为如果为空,它就会阻塞,但另一方面,队列本身对我来说并不重要,因为无论如何我只对最新事件感兴趣,并且如果已满则插入时阻塞并不重要我正在寻找什么。

最佳答案

以下模型可以支持非常高的更新率(每秒数千万),但您只需要在内存中保留最新的即可。

<小时/>

如果您每 N 毫秒拍摄一次快照,则可以使用此方法。

final AtomicReference<ConcurrentHashMap<Key, Event>> mapRef =

当您有更新时,将其添加到 ConcurrentMap 中。选择键以便替换前一个事件的事件具有相同的键。

Key key = keyFor(event);
mapRef.get().put(key, event);

这种映射方式可以立即获得任何键的最新更新。

有一个每 N 毫秒运行一次的任务。此任务运行时可以将 map 交换为另一个 map (或之前的空 map 以避免创建新 map )

ConcurrentMap<Key, Event> prev = mapRef.set(prevEmptyMap);

for(Event e: prev.values())
    process(e);
prev.clear();
this.prevEmptymap = prev;

关于处理大量有状态事件的 Java 最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24742040/

相关文章:

java - 无法单击 MacBook Pro 上 Safari 浏览器中的下拉菜单

检查输入是否是有效的 shell 命令,Linux

java - 为什么ThreadLocal的initialValue不会增加我的变量?

wpf - 为什么不触发用户控件加载事件

javascript - Promise 未在 process.exit 中运行

database - 如何使用 actor 进行数据库访问和 DDD?

java - Dijkstra算法中的无限循环?

java - 无法在 spock 中的空对象上调用方法 leftshift()

java - 什么是 NullPointerException,我该如何解决?

javascript - 谷歌地图javascript事件问题