问题描述
class MyThing {}
Task<Try<MyThing>> GetThing(int thingId) {...}
我想从数据存储中查找多个项目,并编写了一个新方法来执行此操作。我还编写了一个辅助方法,该方法将使用多个Try<T>
并将其结果合并为一个Try<IEnumerable<T>>
。
public static class TryExtensions
{
Try<IEnumerable<T>> Collapse<T>(this IEnumerable<Try<T>> items)
{
var failures = items.Fails().ToArray();
return failures.Any() ?
Try<IEnumerable<T>>(new AggregateException(failures)) :
Try(items.Select(i => i.Succ(a => a).Fail(Enumerable.Empty<T>())));
}
}
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var results = new List<Try<Things>>();
foreach (var id in ids)
{
var thing = await GetThing(id);
results.Add(thing);
}
return results.Collapse().Map(p => p.ToArray());
}
另一种方式是这样的
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var tasks = ids.Select(async id => await GetThing(id)).ToArray();
await Task.WhenAll(tasks);
return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}
问题是所有任务都将并行运行,而我不想让大量并行请求重载我的数据存储区。我真正想要的是使用LanguageExt
的monadic原理和功能使我的代码正常运行。有人知道如何实现这一目标吗?
更新
感谢@MatthewWatson的建议,这就是SemaphoreSlim
的样子;
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var mutex = new SemaphoreSlim(1);
var results = ids.Select(async id =>
{
await mutex.WaitAsync();
try { return await GetThing(id); }
finally { mutex.Release(); }
}).ToArray();
await Task.WhenAll(tasks);
return tasks.Select(t => t.Result).Collapse().Map(Enumerable.ToArray);
return results.Collapse().Map(p => p.ToArray());
}
问题是,这仍然不是很单调/实用的功能,并且比起带有foreach
块的原始代码,最终得到的代码行更多。
解决方法
通过“另一种方式”,您打电话时几乎实现了目标:
var tasks = ids.Select(async id => await GetThing(id)).ToArray();
除了Tasks不能按顺序运行,因此您最终将遇到许多查询,这些查询是由.ToArray()
和Task.WhenAll
引起的。调用.ToArray()
后,它就已经分配并启动了任务,因此,如果您可以“容忍”一个foreach
来实现顺序任务的运行,如下所示:
public static class TaskExtensions
{
public static async Task RunSequentially<T>(this IEnumerable<Task<T>> tasks)
{
foreach (var task in tasks) await task;
}
}
尽管运行查询的“循环”不是一个很好的习惯 通常,除非您有一些后台服务和一些 特殊情况,通过以下方式将其用于数据库引擎
WHERE thingId IN (...)
一般是一个更好的选择。连你 有很多的东西,我们可以把它切成10到100的小块。 缩小WHERE IN
的覆盖范围。
回到我们的RunSequentially
,我希望例如使它更具功能:
tasks.ToList().ForEach(async task => await task);
但是遗憾的是,这仍然会运行“并行”任务。
所以最终用法应该是:
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var tasks = ids.Select(id => GetThing(id));// remember don't use .ToArray or ToList...
await tasks.RunSequentially();
return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}
另一个过大的功能性解决方案是递归地将 Lazy 放入队列 !
代替GetThing
,获得一个懒惰的GetLazyThing
,只需包装Lazy<Task<Try<MyThing>>>
就可以返回GetThing
:
new Lazy<Task<Try<MyThing>>>(() => GetThing(id))
现在使用几个扩展名/功能:
public static async Task RecRunSequentially<T>(this IEnumerable<Lazy<Task<T>>> tasks)
{
var queue = tasks.EnqueueAll();
await RunQueue(queue);
}
public static Queue<T> EnqueueAll<T>(this IEnumerable<T> list)
{
var queue = new Queue<T>();
list.ToList().ForEach(m => queue.Enqueue(m));
return queue;
}
public static async Task RunQueue<T>(Queue<Lazy<Task<T>>> queue)
{
if (queue.Count > 0)
{
var task = queue.Dequeue();
await task.Value; // this unwraps the Lazy object content
await RunQueue(queue);
}
}
最后:
var lazyTasks = ids.Select(id => GetLazyThing(id));
await lazyTasks.RecRunSequentially();
// Now collapse and map as you like
更新
但是,如果您不喜欢EnqueueAll
和RunQueue
不是“纯”的事实,我们可以采用以下方法使用相同的Lazy
技巧
public static async Task AwaitSequentially<T>(this Lazy<Task<T>>[] array,int index = 0)
{
if (array == null || index < 0 || index >= array.Length - 1) return;
await array[index].Value;
await AwaitSequentially(array,index + 1); // ++index is not pure :)
}
现在:
var lazyTasks = ids.Select(id => GetLazyThing(id));
await tasks.ToArray().AwaitSequentially();
// Now collapse and map as you like