C#LanguageExt-将多个异步调用合并为一个分组的调用

问题描述

我有一种方法可以从数据存储中异步查找项目;

class MyThing {}
Task<Try<MyThing>> GetThing(int thingId) {...}

我想从数据存储中查找多个项目,并编写了一个方法来执行此操作。我还编写了一个辅助方法,该方法将使用多个Try<T>并将其结果合并为一个Try<IEnumerable<T>>


public static class TryExtensions
{
    Try<IEnumerable<T>> Collapse<T>(this IEnumerable<Try<T>> items)
    {
        var failures = items.Fails().ToArray();
        return failures.Any() ?
            Try<IEnumerable<T>>(new AggregateException(failures)) :
            Try(items.Select(i => i.Succ(a => a).Fail(Enumerable.Empty<T>())));
    }
}

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var results = new List<Try<Things>>();

    foreach (var id in ids)
    {
        var thing = await GetThing(id);
        results.Add(thing);
    }

    return results.Collapse().Map(p => p.ToArray());
}

另一种方式是这样的

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var tasks = ids.Select(async id => await GetThing(id)).ToArray();
    await Task.WhenAll(tasks);
    return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}

问题是所有任务都将并行运行,而我不想让大量并行请求重载我的数据存储区。我真正想要的是使用LanguageExt的monadic原理和功能使我的代码正常运行。有人知道如何实现这一目标吗?


更新

感谢@MatthewWatson的建议,这就是SemaphoreSlim的样子;

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var mutex = new SemaphoreSlim(1);
    var results = ids.Select(async id =>
    {
        await mutex.WaitAsync();
        try { return await GetThing(id); }
        finally { mutex.Release(); }
    }).ToArray();
    await Task.WhenAll(tasks);
    return tasks.Select(t => t.Result).Collapse().Map(Enumerable.ToArray);
    return results.Collapse().Map(p => p.ToArray());
}

问题是,这仍然不是很单调/实用的功能,并且比起带有foreach块的原始代码,最终得到的代码行更多。

解决方法

通过“另一种方式”,您打电话时几乎实现了目标:

var tasks = ids.Select(async id => await GetThing(id)).ToArray();

除了Tasks不能按顺序运行,因此您最终将遇到许多查询,这些查询是由.ToArray()Task.WhenAll引起的。调用.ToArray()后,它就已经分配并启动了任务,因此,如果您可以“容忍”一个foreach来实现顺序任务的运行,如下所示:

public static class TaskExtensions
{
    public static async Task RunSequentially<T>(this IEnumerable<Task<T>> tasks)
    {
        foreach (var task in tasks) await task;
    }
}

尽管运行查询的“循环”不是一个很好的习惯 通常,除非您有一些后台服务和一些 特殊情况,通过以下方式将其用于数据库引擎 WHERE thingId IN (...) 一般是一个更好的选择。连你 有很多的东西,我们可以把它切成10到100的小块。 缩小WHERE IN的覆盖范围。

回到我们的RunSequentially,我希望例如使它更具功能:

tasks.ToList().ForEach(async task => await task);

但是遗憾的是,这仍然会运行“并行”任务。

所以最终用法应该是:

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var tasks = ids.Select(id => GetThing(id));// remember don't use .ToArray or ToList...
    await tasks.RunSequentially();
    return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}

另一个过大的功能性解决方案是递归地将 Lazy 放入队列

代替GetThing,获得一个懒惰的GetLazyThing,只需包装Lazy<Task<Try<MyThing>>>就可以返回GetThing

new Lazy<Task<Try<MyThing>>>(() => GetThing(id))

现在使用几个扩展名/功能:

public static async Task RecRunSequentially<T>(this IEnumerable<Lazy<Task<T>>> tasks)
{
    var queue = tasks.EnqueueAll();
    await RunQueue(queue);
}

public static Queue<T> EnqueueAll<T>(this IEnumerable<T> list)
{
    var queue = new Queue<T>();
    list.ToList().ForEach(m => queue.Enqueue(m));
    return queue;
}

public static async Task RunQueue<T>(Queue<Lazy<Task<T>>> queue)
{
    if (queue.Count > 0)
    {
        var task = queue.Dequeue();
        await task.Value; // this unwraps the Lazy object content
        await RunQueue(queue);
    }
}

最后:

var lazyTasks = ids.Select(id => GetLazyThing(id));
await lazyTasks.RecRunSequentially();
// Now collapse and map as you like

更新

但是,如果您不喜欢EnqueueAllRunQueue不是“纯”的事实,我们可以采用以下方法使用相同的Lazy技巧

public static async Task AwaitSequentially<T>(this Lazy<Task<T>>[] array,int index = 0)
{
    if (array == null || index < 0 || index >= array.Length - 1) return;
    await array[index].Value;
    await AwaitSequentially(array,index + 1); // ++index is not pure :)
}

现在:

var lazyTasks = ids.Select(id => GetLazyThing(id));
await tasks.ToArray().AwaitSequentially();
// Now collapse and map as you like