问题描述
我有一个 IAsyncEnumerable<string>
流,其中包含从 Web 下载的数据,我想将每条数据异步保存在 sql 数据库中。所以我使用了 ForEachAwaitAsync
库中的 System.Linq.Async 扩展方法。我的问题是下载和保存每条数据是按顺序进行的,而我更希望它同时发生。
澄清一下,我不想同时下载多条数据,也不想同时保存多条数据。我想要的是,当我在数据库中保存一条数据时,应该同时从网络上下载下一条数据。
以下是我当前解决方案的最小(人为)示例。下载五个项目,然后保存在数据库中。下载每个项目需要 1 秒,保存又需要 1 秒:
async IAsyncEnumerable<string> GetDataFromWeb()
{
foreach (var item in Enumerable.Range(1,5))
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Downloading #{item}");
await Task.Delay(1000); // Simulate an I/O-bound operation
yield return item.ToString();
}
}
var stopwatch = Stopwatch.StartNew();
await GetDataFromWeb().ForEachAwaitAsync(async item =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Saving #{item}");
await Task.Delay(1000); // Simulate an I/O-bound operation
});
Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");
代码正在运行,但不是我想要的方式。总持续时间约为 10 秒,而不是理想的约 6 秒。
实际不良输出:
04:55:50.526 > Downloading #1
04:55:51.595 > Saving #1
04:55:52.598 > Downloading #2
04:55:53.609 > Saving #2
04:55:54.615 > Downloading #3
04:55:55.616 > Saving #3
04:55:56.617 > Downloading #4
04:55:57.619 > Saving #4
04:55:58.621 > Downloading #5
04:55:59.622 > Saving #5
Duration: 10,115 msec
假设的理想输出:
04:55:50.000 > Downloading #1
04:55:51.000 > Saving #1
04:55:51.000 > Downloading #2
04:55:52.000 > Saving #2
04:55:52.000 > Downloading #3
04:55:53.000 > Saving #3
04:55:53.000 > Downloading #4
04:55:54.000 > Saving #4
04:55:54.000 > Downloading #5
04:55:55.000 > Saving #5
Duration: 6,000 msec
我正在考虑实现一个名为 ForEachConcurrentAsync
的自定义扩展方法,该方法与上述 ForEachAwaitAsync
方法具有相同的签名,但其行为允许同时发生枚举和操作项目。下面是这个方法的一个存根:
/// <summary>
/// Invokes and awaits an asynchronous action on each element in the source sequence.
/// Each action is awaited concurrently with fetching the sequence's next element.
/// </summary>
public static Task ForEachConcurrentAsync<T>(
this IAsyncEnumerable<T> source,Func<T,Task> action,CancellationToken cancellationToken = default)
{
// What to do?
}
如何实现此功能?
其他要求:
- 在取消或失败的情况下泄漏正在运行的任务是不可接受的。当方法完成时,所有启动的任务都应该完成。
- 在枚举和操作都失败的极端情况下,应该只传播两个异常中的一个,并且任何一个都可以。
- 该方法应该是真正的异步方法,并且不应阻塞当前线程(除非
action
参数包含阻塞代码,但调用者有责任防止)。
说明:
-
如果保存数据的时间比从网上下载的时间长,则该方法应该不要提前下载更多项目。最多只能提前下载一份数据,保存前一份。
-
带有网络数据的
IAsyncEnumerable<string>
是这个问题的起点。我不想更改IAsyncEnumerable<string>
的生成器方法。我想对它的元素采取行动(通过将它们保存到数据库中),同时枚举可枚举。
解决方法
听起来您只需要跟踪上一个操作的 Task 并在下一个操作 Task 之前等待它。
public static async Task ForEachConcurrentAsync<T>(
this IAsyncEnumerable<T> source,Func<T,Task> action,CancellationToken cancellationToken = default)
{
Task previous = null;
try
{
await source.ForEachAwaitAsync(async item =>
{
if(previous != null)
{
await previous;
}
previous = action(item);
});
}
finally
{
if(previous != null)
{
await previous;
}
}
}
剩下的就是撒上取消代码。
,这是我的解决方案。
我必须将序列更改为数组才能访问下一个元素。
不确定填充数组是否符合您的要求。
这个想法是在返回当前项目之前开始下载下一个项目。
private static async Task Main(string[] args)
{
var stopwatch = Stopwatch.StartNew();
await foreach (var item in GetDataFromWebAsync())
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Saving #{item}");
await Task.Delay(1000); // Simulate an I/O-bound operation
}
Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
private static async IAsyncEnumerable<string> GetDataFromWebAsync()
{
var items = Enumerable
.Range(1,5)
.Select(x => x.ToString())
.ToArray();
Task<string> next = null;
for (var i = 0; i < items.Length; i++)
{
var current = next is null
? await DownloadItemAsync(items[i])
: await next;
var nextIndex = i + 1;
next = StarNextDownloadAsync(items,nextIndex);
yield return current;
}
}
private static async Task<string> StarNextDownloadAsync(IReadOnlyList<string> items,int nextIndex)
{
return nextIndex < items.Count
? await DownloadItemAsync(items[nextIndex])
: null;
}
private static async Task<string> DownloadItemAsync(string item)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Downloading #{item}");
await Task.Delay(1000);
return item;
}
控制台输出:
15:57:26.226 > Downloading #1
15:57:27.301 > Downloading #2
15:57:27.302 > Saving #1
15:57:28.306 > Downloading #3
15:57:28.307 > Saving #2
15:57:29.312 > Downloading #4
15:57:29.340 > Saving #3
15:57:30.344 > Downloading #5
15:57:30.347 > Saving #4
15:57:31.359 > Saving #5
Duration: 6 174 msec