如何异步查询两个 IAsyncEnumerables

问题描述

我有两个方法连接到两个不同的 Foo 源,它们返回两个 IAsyncEnumerable<Foo>。我需要从两个来源获取所有 Foo 才能处理它们。

问题:我想同时(异步)查询两个源,即。在开始枚举 Source1 之前不等待 Source2 完成枚举。根据我的理解,这就是下面的方法 SequentialSourcesQuery 示例中发生的情况,对吗?

对于常规任务,我会先开始第一个任务,然后是第二个,然后调用 await Task.WhenAll。但我对如何处理 IAsyncEnumerable 有点困惑。

public class FoosAsync
{
    public async IAsyncEnumerable<Foo> Source1() { }

    public async IAsyncEnumerable<Foo> Source2() { }

    public async Task<List<Foo>> SequentialSourcesQuery()
    {
        List<Foo> foos = new List<Foo>();

        await foreach (Foo foo1 in Source1())
        {
            foos.Add(foo1);
        }

        await foreach (Foo foo2 in Source2())
        { //doesn't start until Source1 completed the enumeration? 
            foos.Add(foo2);
        }

        return foos;
    }
}

解决方法

您可以利用库 System.Linq.AsyncSystem.Interactive.Async(由属于 .NET Foundation 的 RxTeam 所有)。它们包含可以轻松解决您的问题的 MergeToListAsync 等运算符。

// Merges elements from all of the specified async-enumerable sequences
// into a single async-enumerable sequence.
public static IAsyncEnumerable<TSource> Merge<TSource>(
    params IAsyncEnumerable<TSource>[] sources);

// Creates a list from an async-enumerable sequence.
public static ValueTask<List<TSource>> ToListAsync<TSource>(
    this IAsyncEnumerable<TSource> source,CancellationToken cancellationToken = default);

把所有东西放在一起:

public Task<List<Foo>> SequentialSourcesQuery()
{
    return AsyncEnumerableEx.Merge(Source1(),Source2()).ToListAsync().AsTask();
}

意识到这些库专注于提供丰富的功能集,而不是性能或效率。因此,如果一流的性能对您的用例很重要,niki.kante 的 solution 很可能会胜过上述基于运算符的方法。

,

如果您有两个 IAsyncEnumerable<T> 作为源并且不关心传入数据的顺序,您可以使用如下方法来交错您的数据。

public static class AsyncEnumerableExt
{
    public static async IAsyncEnumerable<T> Interleave<T>(this IAsyncEnumerable<T> first,IAsyncEnumerable<T> second)
    {
        var enum1 = first.GetAsyncEnumerator();
        var enum2 = second.GetAsyncEnumerator();

        var nextWait1 = enum1.MoveNextAsync().AsTask();
        var nextWait2 = enum2.MoveNextAsync().AsTask();

        do
        {
            var task = await Task.WhenAny(nextWait1,nextWait2).ConfigureAwait(false);

            if (task == nextWait1)
            {
                yield return enum1.Current;

                nextWait1 = !await task.ConfigureAwait(false) ? null : enum1.MoveNextAsync().AsTask();
            }
            else if (task == nextWait2)
            {
                yield return enum2.Current;

                nextWait2 = !await task.ConfigureAwait(false) ? null : enum2.MoveNextAsync().AsTask();
            }
        } while (nextWait1 != null && nextWait2 != null);

        while (nextWait1 != null)
        {
            if (!await nextWait1.ConfigureAwait(false))
            {
                nextWait1 = null;
            }
            else
            {
                yield return enum1.Current;
                nextWait1 = enum1.MoveNextAsync().AsTask();
            }
        }

        while (nextWait2 != null)
        {
            if (!await nextWait2.ConfigureAwait(false))
            {
                nextWait2 = null;
            }
            else
            {
                yield return enum2.Current;
                nextWait2 = enum2.MoveNextAsync().AsTask();
            }
        }
    }
}

然后你可以用一个 await foreach 消费数据并将数据存储在一个列表中。

,

您可以编写另一个返回 Task 的异步本地方法。

//dev/de

并这样称呼它:

Func<IAsyncEnumerable<Foo>,Task<List<Foo>>> readValues = async (values) => {
        List<Foo> foos = new List<Foo>();
        await foreach (Foo foo1 in values)
        {
            foos.Add(foo1);
        }        
        return foos;
};

整个代码将是:

Task<List<Foo>> task1 = readValues(Source1());
Task<List<Foo>> task2 = readValues(Source2());

await Task.WhenAll(task1,task2);

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...