问题描述
我有两个方法连接到两个不同的 Foo
源,它们返回两个 IAsyncEnumerable<Foo>
。我需要从两个来源获取所有 Foo
才能处理它们。
问题:我想同时(异步)查询两个源,即。在开始枚举 Source1
之前不等待 Source2
完成枚举。根据我的理解,这就是下面的方法 SequentialSourcesQuery
示例中发生的情况,对吗?
对于常规任务,我会先开始第一个任务,然后是第二个,然后调用 await Task.WhenAll
。但我对如何处理 IAsyncEnumerable
有点困惑。
public class FoosAsync
{
public async IAsyncEnumerable<Foo> Source1() { }
public async IAsyncEnumerable<Foo> Source2() { }
public async Task<List<Foo>> SequentialSourcesQuery()
{
List<Foo> foos = new List<Foo>();
await foreach (Foo foo1 in Source1())
{
foos.Add(foo1);
}
await foreach (Foo foo2 in Source2())
{ //doesn't start until Source1 completed the enumeration?
foos.Add(foo2);
}
return foos;
}
}
解决方法
您可以利用库 System.Linq.Async 和 System.Interactive.Async(由属于 .NET Foundation 的 RxTeam 所有)。它们包含可以轻松解决您的问题的 Merge
和 ToListAsync
等运算符。
// Merges elements from all of the specified async-enumerable sequences
// into a single async-enumerable sequence.
public static IAsyncEnumerable<TSource> Merge<TSource>(
params IAsyncEnumerable<TSource>[] sources);
// Creates a list from an async-enumerable sequence.
public static ValueTask<List<TSource>> ToListAsync<TSource>(
this IAsyncEnumerable<TSource> source,CancellationToken cancellationToken = default);
把所有东西放在一起:
public Task<List<Foo>> SequentialSourcesQuery()
{
return AsyncEnumerableEx.Merge(Source1(),Source2()).ToListAsync().AsTask();
}
意识到这些库专注于提供丰富的功能集,而不是性能或效率。因此,如果一流的性能对您的用例很重要,niki.kante 的 solution 很可能会胜过上述基于运算符的方法。
,如果您有两个 IAsyncEnumerable<T>
作为源并且不关心传入数据的顺序,您可以使用如下方法来交错您的数据。
public static class AsyncEnumerableExt
{
public static async IAsyncEnumerable<T> Interleave<T>(this IAsyncEnumerable<T> first,IAsyncEnumerable<T> second)
{
var enum1 = first.GetAsyncEnumerator();
var enum2 = second.GetAsyncEnumerator();
var nextWait1 = enum1.MoveNextAsync().AsTask();
var nextWait2 = enum2.MoveNextAsync().AsTask();
do
{
var task = await Task.WhenAny(nextWait1,nextWait2).ConfigureAwait(false);
if (task == nextWait1)
{
yield return enum1.Current;
nextWait1 = !await task.ConfigureAwait(false) ? null : enum1.MoveNextAsync().AsTask();
}
else if (task == nextWait2)
{
yield return enum2.Current;
nextWait2 = !await task.ConfigureAwait(false) ? null : enum2.MoveNextAsync().AsTask();
}
} while (nextWait1 != null && nextWait2 != null);
while (nextWait1 != null)
{
if (!await nextWait1.ConfigureAwait(false))
{
nextWait1 = null;
}
else
{
yield return enum1.Current;
nextWait1 = enum1.MoveNextAsync().AsTask();
}
}
while (nextWait2 != null)
{
if (!await nextWait2.ConfigureAwait(false))
{
nextWait2 = null;
}
else
{
yield return enum2.Current;
nextWait2 = enum2.MoveNextAsync().AsTask();
}
}
}
}
然后你可以用一个 await foreach
消费数据并将数据存储在一个列表中。
您可以编写另一个返回 Task 的异步本地方法。
//dev/de
并这样称呼它:
Func<IAsyncEnumerable<Foo>,Task<List<Foo>>> readValues = async (values) => {
List<Foo> foos = new List<Foo>();
await foreach (Foo foo1 in values)
{
foos.Add(foo1);
}
return foos;
};
整个代码将是:
Task<List<Foo>> task1 = readValues(Source1());
Task<List<Foo>> task2 = readValues(Source2());
await Task.WhenAll(task1,task2);