c# - 仅当值不在本地缓存中时调用昂贵的 Reactive Extensions `IObservable` 函数

标签 c# .net system.reactive

我正在使用 Reactive Extensions (Rx) 和存储库模式来促进从相对较慢的数据源获取数据。我有以下(简化的)界面:

public interface IStorage
{
    IObservable<INode> Fetch(IObservable<Guid> ids);
}

创建 IStorage 的实现实例很慢——考虑创建一个网络服务或数据库连接。每个Guidids一对一的可观察结果 INode (或 null )在 return observable 中,每个结果都很昂贵。因此,对我来说只实例化 IStorage 才有意义仅当我至少有一个值要获取然后使用 IStorage 时为每个 Guid 只获取一次值.

将调用限制为IStorage我将结果缓存在我的 Repository 中看起来像这样的类:

public class Repository
{
    private Dictionary<Guid, INode> NodeCache { get; set; }

    private Func<IStorage> StorageFactory { get; set; }

    public IObservable<INode> Fetch(IObservable<Guid> ids)
    {
        var lazyStorage = new Lazy<IStorage>(this.StorageFactory);

        // from id in ids
        // if NodeCache contains id select NodeCache[id]
        // else select node from lazyStorage.Value.Fetch(...)
    }
}

Repository.Fetch(...)方法我已经包含了说明我正在尝试做什么的注释。

但本质上,如果 NodeCache包含所有正在获取的 ID IStorage永远不会实例化,并且几乎没有延迟地返回结果。但是,如果任何一个 id 不在缓存中,则 IStorage被实例化,所有未知 ID 都通过 IStorage.Fetch(...) 传递方法。

需要维护一对一映射,包括顺序保存。

有什么想法吗?

最佳答案

我花了一段时间才弄明白,但我终于找到了自己的解决方案。

我定义了两个名为 FromCacheOrFetch 的扩展方法带有这些签名:

IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source,
    Func<T, R> cache,
    Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
        where R : class

IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source,
    Func<T, Maybe<R>> cache,
    Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)

第一个使用标准的 CLR/Rx 类型,第二个使用 Maybe monad(可空类型不限于值类型)。

第一个刚转Func<T, R>进入Func<T, Maybe<R>>并调用第二个方法。

背后的基本思想是,当要查询源时,检查缓存中的每个值以查看结果是否已经存在,如果存在,则立即返回结果。但是,如果缺少任何结果,那么并且只有在那时才会通过传入 Subject<T> 来调用 fetch 函数。现在所有缓存未命中都通过获取函数传递。调用代码负责将结果添加到缓存中。代码通过获取函数异步处理所有值,并将结果与​​缓存的结果重新组合成正确的顺序。

像款待一样工作。 :-)

代码如下:

public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
    Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
        where R : class
{
    return source
        .FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
}

public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
    Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
{
    var results = new Subject<R>();

    var disposables = new CompositeDisposable();

    var loop = new EventLoopScheduler();
    disposables.Add(loop);

    var sourceDone = false;
    var pairsDone = true;
    var exception = (Exception)null;

    var fetchIn = new Subject<T>();
    var fetchOut = (IObservable<R>)null;
    var pairs = (IObservable<KeyValuePair<int, R>>)null;

    var lookup = new Dictionary<T, int>();
    var list = new List<Maybe<R>>();
    var cursor = 0;

    Action checkCleanup = () =>
    {
        if (sourceDone && pairsDone)
        {
            if (exception == null)
            {
                results.OnCompleted();
            }
            else
            {
                results.OnError(exception);
            }
            loop.Schedule(() => disposables.Dispose());
        }
    };

    Action dequeue = () =>
    {
        while (cursor != list.Count)
        {
            var mr = list[cursor];
            if (mr.HasValue)
            {
                results.OnNext(mr.Value);
                cursor++;
            }
            else
            {
                break;
            }
        }
    };

    Action<KeyValuePair<int, R>> nextPairs = kvp =>
    {
        list[kvp.Key] = Maybe<R>.Something(kvp.Value);
        dequeue();
    };

    Action<Exception> errorPairs = ex =>
    {
        fetchIn.OnCompleted();
        pairsDone = true;
        exception = ex;
        checkCleanup();
    };

    Action completedPairs = () =>
    {
        pairsDone = true;
        checkCleanup();
    };

    Action<T> sourceNext = t =>
    {
        var mr = cache(t);
        list.Add(mr);
        if (mr.IsNothing)
        {
            lookup[t] = list.Count - 1;
            if (fetchOut == null)
            {
                pairsDone = false;
                fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
                pairs = fetchIn
                    .Select(x => lookup[x])
                    .Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
                disposables.Add(pairs
                    .ObserveOn(loop)
                    .Subscribe(nextPairs, errorPairs, completedPairs));
            }
            fetchIn.OnNext(t);
        }
        else
        {
            dequeue();
        }
    };

    Action<Exception> errorSource = ex =>
    {
        sourceDone = true;
        exception = ex;
        fetchIn.OnCompleted();
        checkCleanup();
    };

    Action completedSource = () =>
    {
        sourceDone = true;
        fetchIn.OnCompleted();
        checkCleanup();
    };

    disposables.Add(source
        .ObserveOn(loop)
        .Subscribe(sourceNext, errorSource, completedSource));

    return results.ObserveOn(scheduler);
}

关于c# - 仅当值不在本地缓存中时调用昂贵的 Reactive Extensions `IObservable` 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5738664/

相关文章:

c# - LINQPad - 将辅助 DataContext 记录到 "SQL"选项卡

.net - 面向 Silverlight 人员的 WPF

c# - 将依赖的可观察值映射到子列表

c# - Windows 搜索 - 在 C# 中进行全文搜索

c# - http ://schemas. microsoft.com/winfx/2006/xaml/presentation 定义

c# - 如何将以下类转换为 IObservable?

C# Rx 如何在创建的 Observable 中正确处理源 Enumerable

C# 为什么我可以在常量上调用对象扩展方法?

c# - 为什么C#中没有const成员方法和const参数?

c# - LINQ 查询返回列表列表