给定一个包含很多项目的结果流,我想存储它们并处理潜在的并发冲突:
public void onTriggerEvent(/* params */) {
Stream<Result> results = customThreadPool.submit(/*...complex parallel computation on multiple servers...*/).get();
List<Result> conflicts = store(results);
resolveConflictsInNewTransaction(conflicts);
}
关于如何有效地实现
store(...)
,我在 上卡住了 。 Result
由两个不可变且分离的对象组成,这些对象描述了需要在各自的数据库表中更新的数据。@Value
public static class Result {
A a; // describes update for row in table a
B b; // describes update for row in table b
}
A
和 B
分别引用了两个用户,其中 (u1, u2)
是各自 DB 表上的键。@Value
public static class A {
long u1;
long u2;
// ... computed data fields ...
}
// B accordingly
流计算本身可能会同时触发(多个
onTriggerEvent
并行调用),这通常很好,但有时可能会导致某些结果发生冲突(大约 0.1% 发生冲突,例如,流的结果为 (53,21)
和另一个同时调用也更新了 (53,21)
)。 A
和/或 B
的冲突由它们的 updatedAt
字段指示,该字段与操作的开始相比会有所不同。当然,在这里,我们不想丢弃所有结果而只是重试,而只想解决冲突的行。所以我想知道(1)存储所有不冲突的
Result.a
和 Result.b
以及(2)获得 List
的 Result
冲突并需要特殊处理的好方法是什么。public List<Result> store(Stream<Result> results) {
// store all a
// store all b (ideally without using results * 2 RAM)
// do update other stuff if a and b are not in conflict and do it in the same ACID transaction as the update of the related a and b.
// return those in Conflict
}
如何在不解包每个结果的情况下实现它,在自己的事务中将其发送到数据库等?理想情况下,我需要一次全部发送到数据库并获取尚未存储的冲突列表(另一个应该被持久化)。我也对不同的方法持开放态度。
如果相关,我们使用 JPA/Hibernate。
最佳答案
最简单的方法是将持久性简化为 FIFO 队列(存在很多技术,但通常这会变成“每个事务单个条目”的方式,这是不希望的方法)。
所以对于第二个选项,我会将并发冲突定义的逻辑从数据库持久化操作中移到单独的服务中。
您可以实现类似 UserId 到可重入锁的内存映射(与同步块(synchronized block)相比,这些操作非常快)。
在第一次调用持久化锁期间,锁被锁定;成功坚持后,锁被释放。与此同时(在一个单独的线程中),您可以检查锁的状态,然后通过它过滤掉,或者等到锁被释放。注意等待状态:您有流,因此处理流的整个线程将进入等待状态。
就个人而言,我会坚持第一个“每个事务单个条目”,中间有一些(持久的)消息队列,并提供单独的锁定检查服务。首先,这可以让我们轻松配置写操作的并发性;第二个很容易在编写器中使用等待状态,因为只有一个条目会被锁定。
关于java - 通过每个项目的乐观锁定有效地将结果流存储在多个表中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61758277/