C# LanguageExt - 将多个异步调用组合成一个分组调用

标签 c# functional-programming monads language-ext

我有一个从数据存储中异步查找项目的方法;

class MyThing {}
Task<Try<MyThing>> GetThing(int thingId) {...}

我想从数据存储中查找多个项目,并编写了一个新方法来执行此操作。我还写了一个辅助方法,它需要多个 Try<T>并将他们的结果合并成一个 Try<IEnumerable<T>> .


public static class TryExtensions
{
    Try<IEnumerable<T>> Collapse<T>(this IEnumerable<Try<T>> items)
    {
        var failures = items.Fails().ToArray();
        return failures.Any() ?
            Try<IEnumerable<T>>(new AggregateException(failures)) :
            Try(items.Select(i => i.Succ(a => a).Fail(Enumerable.Empty<T>())));
    }
}

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var results = new List<Try<Things>>();

    foreach (var id in ids)
    {
        var thing = await GetThing(id);
        results.Add(thing);
    }

    return results.Collapse().Map(p => p.ToArray());
}

另一种方法是这样的;

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var tasks = ids.Select(async id => await GetThing(id)).ToArray();
    await Task.WhenAll(tasks);
    return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}

问题是所有任务都将并行运行,我不想用大量并行请求来破坏我的数据存储。我真正想要的是使用 monadic 原则和 LanguageExt 的特性使我的代码具有功能性.有谁知道如何实现这一点?


更新

感谢@MatthewWatson 的建议,这就是 SemaphoreSlim 的样子;

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var mutex = new SemaphoreSlim(1);
    var results = ids.Select(async id =>
    {
        await mutex.WaitAsync();
        try { return await GetThing(id); }
        finally { mutex.Release(); }
    }).ToArray();
    await Task.WhenAll(tasks);
    return tasks.Select(t => t.Result).Collapse().Map(Enumerable.ToArray);
    return results.Collapse().Map(p => p.ToArray());
}

问题是,这仍然不是一元化/函数式的,最终代码行数比我原来的代码行数多,带有 foreach。 block 。

最佳答案

在“另一种方式”中,您调用时几乎达到了目标:

var tasks = ids.Select(async id => await GetThing(id)).ToArray();

除了 Tasks 没有顺序运行,所以您最终会遇到许多查询访问您的数据存储,这是由 .ToArray() 引起的和 Task.WhenAll .一旦你调用.ToArray()它已经分配并启动了任务,所以如果你能“容忍”一个foreach实现顺序任务运行,像这样:

public static class TaskExtensions
{
    public static async Task RunSequentially<T>(this IEnumerable<Task<T>> tasks)
    {
        foreach (var task in tasks) await task;
    }
}

Despite that running a "loop" of queries is not a quite good practice in general, unless you have in some background service and some special scenario, leveraging this to the Database engine through WHERE thingId IN (...) in general is a better option. Even you have big amount of thingIds we can slice it into small 10s, 100s.. to narrow the WHERE IN footprint.

回到我们的RunSequentially ,我让它更像这样的功能,例如:

tasks.ToList().ForEach(async task => await task);

但遗憾的是,这仍然会运行有点“并行”的任务。

所以最终的用法应该是:

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var tasks = ids.Select(id => GetThing(id));// remember don't use .ToArray or ToList...
    await tasks.RunSequentially();
    return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}

另一个矫枉过正的函数式解决方案是在队列递归Lazy!!

改为GetThing , 得到一个懒人 GetLazyThing返回 Lazy<Task<Try<MyThing>>>只需包装 GetThing :

new Lazy<Task<Try<MyThing>>>(() => GetThing(id))

现在使用几个扩展/功能:

public static async Task RecRunSequentially<T>(this IEnumerable<Lazy<Task<T>>> tasks)
{
    var queue = tasks.EnqueueAll();
    await RunQueue(queue);
}

public static Queue<T> EnqueueAll<T>(this IEnumerable<T> list)
{
    var queue = new Queue<T>();
    list.ToList().ForEach(m => queue.Enqueue(m));
    return queue;
}

public static async Task RunQueue<T>(Queue<Lazy<Task<T>>> queue)
{
    if (queue.Count > 0)
    {
        var task = queue.Dequeue();
        await task.Value; // this unwraps the Lazy object content
        await RunQueue(queue);
    }
}

最后:

var lazyTasks = ids.Select(id => GetLazyThing(id));
await lazyTasks.RecRunSequentially();
// Now collapse and map as you like

更新

但是,如果您不喜欢 EnqueueAllRunQueue不是“纯粹的”,我们可以采取以下方法与相同的 Lazy技巧

public static async Task AwaitSequentially<T>(this Lazy<Task<T>>[] array, int index = 0)
{
    if (array == null || index < 0 || index >= array.Length - 1) return;
    await array[index].Value;
    await AwaitSequentially(array, index + 1); // ++index is not pure :)
}

现在:

var lazyTasks = ids.Select(id => GetLazyThing(id));
await tasks.ToArray().AwaitSequentially();
// Now collapse and map as you like

关于C# LanguageExt - 将多个异步调用组合成一个分组调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63844098/

相关文章:

monads - Idris - 惰性评估问题

c# - 单色触摸 : Loading screen

c# - 如何将 Umbraco 项目插入 cshtml 文件中的 url,razor?

java - RxJava : merge() changes the order of the emitted items?

functional-programming - 当中间的 promise 中的 promise 检查授权时,如何管道功能?

haskell - 一个函数中的两个多态类

c# - 如何在 C# 中为任何用户创建 "AppData\Roaming"中的文件文本

c# - Windows 窗体调用 Web 服务但需要模拟不同的 Windows 用户

python - 解决 python 3 与 python 2 中的 map 函数问题

haskell - 在 newtype 中实现 >>=