假设我有一些项目流:
Observable<Item> stream = ...
我想达到什么目的?
- Stream 有任意数量的操作。开始事务之前的所有操作都应该在事务之外运行
- 以某种方式在流的中间开始事务
db.beginTransaction()
- 开始事务后的所有操作符都应该在事务内部运行
- 交易必须在成功操作的情况下完成
db.setTransactionSuccessful
- 事务必须始终结束
db.endTransaction
- 最好有两个片段:为下游操作中的所有项目打开一个交易;为流中的每个项目打开和关闭事务
//some upstream operators
stream.doOnNext(i -> ...)
.map(i -> ...)
//somehow start transaction here
//operator inside transaction. All database changes will be reverted in case error
.doOnNext(i -> /*database ops*/)
.subscribe()
PS: db 是可写SQLiteDatabase
我现在有一个解决方案。但也许你对更清洁的方式有什么建议吗?
最佳答案
1) 对于在单个事务中处理所有项目的情况:
stream
.doOnSubscribe(d -> database.beginTransaction())
. ...
.subscribe(v -> {...},
e -> database.endTransaction(),
() -> { database.setTransactionSuccessful(); database.endTransaction(); })
2) 对于每件商品单独交易的情况:
class ItemWithTransaction {
Item item;
Connection conn; // connection associated with this item
boolean rollback;
}
stream
.map(i -> new ItemWithTransaction(i, openTransaction()))
.map(i -> i.conn.executeSql(..., i.item.prop1))
. ...
.map(i -> {
if (...) i.rollback = true; // error related to this item
return i;
})
. ...
.subscribe(i -> {
...
if (i.rollback) i.conn.rollback();
else i.conn.commit();
i.conn.close();
},
e -> rollbackAndCloseAllOpenConnections(),
() -> {...})
通常我不会采用第二种方法,因为它可能需要太多(不受控制的)并发数据库连接。
3) 你最好重组你的代码,这样你就可以先收集所有需要的信息,然后在短事务中一次性更新数据库。我会这样做:
stream
. ... // processing
.buffer(...) // collect updates all or in batches
.subscribe(Collection<ItemUpdate> batch -> {
database.beginTransaction();
try {
... // update multiple items
database.setTransactionSuccessful();
} finally {
database.endTransaction();
}
},
e -> {...},
() -> {...});
关于java - 使用 rxJava 的下游运算符的数据库(SQLite)事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41738535/