我正在尝试公开一个可观察序列,为观察者提供数据库表中所有现有记录以及任何 future 项目。为了争论起见,假设它是日志条目。因此,我会有这样的事情:
public class LogService
{
private readonly Subject<LogEntry> entries;
public LogService()
{
this.entries = new Subject<LogEntry>();
this.entries
.Buffer(...)
.Subscribe(async x => WriteLogEntriesToDatabaseAsync(x));
}
public IObservable<LogEntry> Entries
{
get { return this.entries; }
}
public IObservable<LogEntry> AllLogEntries
{
get
{
// how the heck?
}
}
public void Log(string message)
{
this.entries.OnNext(new LogEntry(message));
}
private async Task<IEnumerable<LogEntry>> GetLogEntriesAsync()
{
// reads existing entries from DB table and returns them
}
private async Task WriteLogEntriesToDatabaseAsync(IList<LogEntry> entries)
{
// writes entries to the database
}
}
我最初对 AllLogEntries
的实现的想法是这样的:
return Observable.Create<LogEntry>(
async observer =>
{
var existingEntries = await this.GetLogEntriesAsync();
foreach (var existingEntry in existingEntries)
{
observer.OnNext(existingEntry);
}
return this.entries.Subscribe(observer);
});
但这样做的问题是可能存在已缓冲但尚未写入数据库的日志条目。因此,这些条目将被遗漏,因为它们不在数据库中并且已经通过 entries
observable。
我的下一个想法是将缓冲条目与非缓冲条目分开,并在实现 AllLogEntries
时使用缓冲条目:
return Observable.Create<LogEntry>(
async observer =>
{
var existingEntries = await this.GetLogEntriesAsync();
foreach (var existingEntry in existingEntries)
{
observer.OnNext(existingEntry);
}
return this.bufferedEntries
.SelectMany(x => x)
.Subscribe(observer);
});
这有两个问题:
- 这意味着
AllLogEntries
的客户端在接收日志条目之前也必须等待缓冲区时间跨度过去。我希望他们立即看到日志条目。 - 在我完成读取现有日志条目和返回 future 条目之间的时间点之间,日志条目可能会写入数据库,这仍然存在竞争条件。
所以我的问题是:我实际上如何在没有竞争条件的情况下实现我的要求,并避免任何重大的性能损失?
最佳答案
要通过客户端代码执行此操作,您可能必须使用轮询来实现解决方案,然后查找调用之间的差异。可能将解决方案与
- Observable.Interval() : http://rxwiki.wikidot.com/101samples#toc28 , 和
- Observable.DistinctUntilChanged()
会给你足够的解决方案。
或者,我建议您尝试找到一种解决方案,在数据库/表更新时通知客户端。在 Web 应用程序中,您可以使用 SignalR 之类的工具来执行此操作。
例如:http://techbrij.com/database-change-notifications-asp-net-signalr-sqldependency
如果它不是网络应用程序,则可以通过套接字使用类似的更新机制。
查看这些链接(这些链接来自 SignalR polling database for updates 的已接受答案):
关于c# - 如何从数据库中播种可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22655537/