在C#8 IAsyncEnumerable <T>中并行化收益率返回

问题描述

我有一个返回异步枚举器的方法

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        foreach (var item in listofWorkItems)
        {
            yield return DoWork(item);
        }
    }

呼叫者:

    public async Task LogResultsAsync()
    {
        await foreach (var result in DoWorkAsync())
        {
            Console.WriteLine(result);
        }
    }

由于DoWork是一项昂贵的操作,我希望以某种方式对其进行并行化,因此它的工作方式类似于:

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        Parallel.ForEach(listofWorkItems,item =>
        {
            yield return DoWork(item);
        });
    }

但是我无法从Parallel.Foreach内部获得收益收益,所以想知道最好的方法是什么?

返回结果的顺序无关紧要。

谢谢。

编辑:抱歉,我在DoWorkAsync中遗漏了一些代码,确实是在等待我没有将其放在上面代码中的某些内容,因为这与问题无关。现在更新

Edit2 DoWork在我的情况下主要是I / O绑定,它是从数据库中读取数据。

解决方法

根据canton7的建议,您可以使用AsParallel代替Parallel.ForEach

这可以在标准foreach循环中使用,您可以在其中产生结果:

public async IAsyncEnumerable<IResult> DoWorkAsync()
{
    await Something();
    foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
    {
        yield return result;
    }
}

正如西奥多·祖利亚斯(Theodor Zoulias)所述,返回的可枚举实际上根本不是异步的。

如果您只需要使用await foreach来使用它就不成问题,但是更明确地说,您可以返回IEnumerable并让调用者对其进行并行化:

public async Task<IEnumerable<Item>> DoWorkAsync()
{
    await Something();
    return ListOfWorkItems;
}

// Caller...
Parallel.ForEach(await DoWorkAsync(),item => 
{
    var result = DoWork(item);
    //...
});

尽管如果需要在多个位置调用它,则可能难以维护

,

这是一个使用TransformBlock库中的TPL Dataflow的基本实现:

public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
    // Define the dataflow block
    var block = new TransformBlock<IWorkItem,IResult>(async item =>
    {
        return await TransformAsync(item);
    },new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 10,// the default is 1
        EnsureOrdered = false // the default is true
    });

    // Feed the block with input data
    foreach (var item in workItems)
    {
        block.Post(item);
    }
    block.Complete();

    // Stream the block's output as IAsyncEnumerable
    while (await block.OutputAvailableAsync())
    {
        while (block.TryReceive(out var result))
        {
            yield return result;
        }
    }

    // Propagate possible exceptions
    await block.Completion;
}

此实现不是完美的,因为如果IAsyncEnumerable的使用者过早放弃枚举,则TransformBlock将继续在后台工作,直到处理完所有工作项为止。此外,它也不支持取消,所有受人尊敬的IAsyncEnumerable生成方法都应支持取消。这些缺少的功能可以相对容易地添加。如果您有兴趣添加它们,请查看this问题。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...