我正在使用 Reactive Extensions (Rx) 和存储库模式来促进从相对较慢的数据源获取数据。我有以下(简化的)界面:
public interface IStorage
{
IObservable<INode> Fetch(IObservable<Guid> ids);
}
创建 IStorage
的实现实例很慢——考虑创建一个网络服务或数据库连接。每个Guid
在ids
一对一的可观察结果 INode
(或 null
)在 return observable 中,每个结果都很昂贵。因此,对我来说只实例化 IStorage
才有意义仅当我至少有一个值要获取然后使用 IStorage
时为每个 Guid
只获取一次值.
将调用限制为IStorage
我将结果缓存在我的 Repository
中看起来像这样的类:
public class Repository
{
private Dictionary<Guid, INode> NodeCache { get; set; }
private Func<IStorage> StorageFactory { get; set; }
public IObservable<INode> Fetch(IObservable<Guid> ids)
{
var lazyStorage = new Lazy<IStorage>(this.StorageFactory);
// from id in ids
// if NodeCache contains id select NodeCache[id]
// else select node from lazyStorage.Value.Fetch(...)
}
}
在Repository.Fetch(...)
方法我已经包含了说明我正在尝试做什么的注释。
但本质上,如果 NodeCache
包含所有正在获取的 ID IStorage
永远不会实例化,并且几乎没有延迟地返回结果。但是,如果任何一个 id 不在缓存中,则 IStorage
被实例化,所有未知 ID 都通过 IStorage.Fetch(...)
传递方法。
需要维护一对一映射,包括顺序保存。
有什么想法吗?
最佳答案
我花了一段时间才弄明白,但我终于找到了自己的解决方案。
我定义了两个名为 FromCacheOrFetch
的扩展方法带有这些签名:
IObservable<R> FromCacheOrFetch<T, R>(
this IObservable<T> source,
Func<T, R> cache,
Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
where R : class
IObservable<R> FromCacheOrFetch<T, R>(
this IObservable<T> source,
Func<T, Maybe<R>> cache,
Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
第一个使用标准的 CLR/Rx 类型,第二个使用 Maybe
monad(可空类型不限于值类型)。
第一个刚转Func<T, R>
进入Func<T, Maybe<R>>
并调用第二个方法。
背后的基本思想是,当要查询源时,检查缓存中的每个值以查看结果是否已经存在,如果存在,则立即返回结果。但是,如果缺少任何结果,那么并且只有在那时才会通过传入 Subject<T>
来调用 fetch 函数。现在所有缓存未命中都通过获取函数传递。调用代码负责将结果添加到缓存中。代码通过获取函数异步处理所有值,并将结果与缓存的结果重新组合成正确的顺序。
像款待一样工作。 :-)
代码如下:
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
where R : class
{
return source
.FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
}
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
{
var results = new Subject<R>();
var disposables = new CompositeDisposable();
var loop = new EventLoopScheduler();
disposables.Add(loop);
var sourceDone = false;
var pairsDone = true;
var exception = (Exception)null;
var fetchIn = new Subject<T>();
var fetchOut = (IObservable<R>)null;
var pairs = (IObservable<KeyValuePair<int, R>>)null;
var lookup = new Dictionary<T, int>();
var list = new List<Maybe<R>>();
var cursor = 0;
Action checkCleanup = () =>
{
if (sourceDone && pairsDone)
{
if (exception == null)
{
results.OnCompleted();
}
else
{
results.OnError(exception);
}
loop.Schedule(() => disposables.Dispose());
}
};
Action dequeue = () =>
{
while (cursor != list.Count)
{
var mr = list[cursor];
if (mr.HasValue)
{
results.OnNext(mr.Value);
cursor++;
}
else
{
break;
}
}
};
Action<KeyValuePair<int, R>> nextPairs = kvp =>
{
list[kvp.Key] = Maybe<R>.Something(kvp.Value);
dequeue();
};
Action<Exception> errorPairs = ex =>
{
fetchIn.OnCompleted();
pairsDone = true;
exception = ex;
checkCleanup();
};
Action completedPairs = () =>
{
pairsDone = true;
checkCleanup();
};
Action<T> sourceNext = t =>
{
var mr = cache(t);
list.Add(mr);
if (mr.IsNothing)
{
lookup[t] = list.Count - 1;
if (fetchOut == null)
{
pairsDone = false;
fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
pairs = fetchIn
.Select(x => lookup[x])
.Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
disposables.Add(pairs
.ObserveOn(loop)
.Subscribe(nextPairs, errorPairs, completedPairs));
}
fetchIn.OnNext(t);
}
else
{
dequeue();
}
};
Action<Exception> errorSource = ex =>
{
sourceDone = true;
exception = ex;
fetchIn.OnCompleted();
checkCleanup();
};
Action completedSource = () =>
{
sourceDone = true;
fetchIn.OnCompleted();
checkCleanup();
};
disposables.Add(source
.ObserveOn(loop)
.Subscribe(sourceNext, errorSource, completedSource));
return results.ObserveOn(scheduler);
}
关于c# - 仅当值不在本地缓存中时调用昂贵的 Reactive Extensions `IObservable` 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5738664/