问题描述
我在下面创建了一个最小的复制示例:
class Program
{
private static async Task<Stream> GetStream()
{
var text =
@"Multi-line
string";
await Task.Yield();
var bytes = Encoding.UTF8.GetBytes(text);
return new MemoryStream(bytes);
}
private static async Task<T> StreamData<T>(Func<Stream,T> streamAction)
{
await using var stream = await GetStream();
return streamAction(stream);
}
private static async Task StreamData(Func<Stream,Task> streamAction)
{
await using var stream = await GetStream();
await streamAction(stream);
}
private static async IAsyncEnumerable<string> GetTextLinesFromStream(Stream stream)
{
using var reader = new StreamReader(stream);
var line = await reader.ReadLineAsync();
while (line != null)
{
yield return line;
line = await reader.ReadLineAsync();
}
}
private static async Task Test1()
{
async Task GetRecords(Stream str)
{
await foreach(var line in GetTextLinesFromStream(str))
Console.WriteLine(line);
}
await StreamData(GetRecords);
}
private static async Task Test2()
{
await foreach(var line in await StreamData(GetTextLinesFromStream))
Console.WriteLine(line);
}
static async Task Main(string[] args)
{
await Test1();
await Test2();
}
}
在这里,方法 Test1
工作正常,而 Test2
没有,失败了 Stream is not readable
。问题在于,在第二种情况下,当代码开始处理实际流时,流已经被释放了。
大概这两个例子之间的区别在于,对于第一个例子,读取流是在仍处于一次性 stream
的上下文中时执行的,而在第二个例子中,我们已经出局了。
但是,我认为第二种情况也可能有效 - 至少我觉得它非常符合 C# 习惯。我还缺少什么让第二个案例也能正常工作吗?
解决方法
Test2
方法的问题在于 Stream
在创建 IAsyncEnumerable<string>
时被释放,而不是在其枚举完成时释放。
Test2
方法使用第一个 StreamData
重载,即返回 Task<T>
的重载。在这种情况下,T
是一个 IAsyncEnumerable<string>
。因此,StreamData
方法返回一个生成异步序列的任务,然后立即处理流(在生成序列之后)。显然,这不是处理流的合适时机。正确的时刻应该是在 await foreach
循环完成之后。
为了使 Test2
透明地工作,您应该添加 StreamData
方法的第三个重载,该方法返回 Task<IAsyncEnumerable<T>>
(而不是 Task
或 {{1 }})。此重载应返回一个与一次性资源相关联的专用异步序列,并在其枚举完成后处理此资源。下面是这样一个序列的实现:
Task<T>
您可以像这样在 public class AsyncEnumerableDisposable<T> : IAsyncEnumerable<T>
{
private readonly IAsyncEnumerable<T> _source;
private readonly IAsyncDisposable _disposable;
public AsyncEnumerableDisposable(IAsyncEnumerable<T> source,IAsyncDisposable disposable)
{
// Arguments validation omitted
_source = source;
_disposable = disposable;
}
async IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator(
CancellationToken cancellationToken)
{
await using (_disposable.ConfigureAwait(false))
await foreach (var item in _source
.WithCancellation(cancellationToken)
.ConfigureAwait(false)) yield return item;
}
}
方法中使用它:
StreamData
请记住,通常一个 private static async Task<IAsyncEnumerable<T>> StreamData<T>(
Func<Stream,IAsyncEnumerable<T>> streamAction)
{
var stream = await GetStream();
return new AsyncEnumerableDisposable<T>(streamAction(stream),stream);
}
在其生命周期内可以被枚举多次,通过将它包装到一个 IAsyncEnumerable<T>
中,它本质上被简化为一个单一的枚举序列(因为资源将在第一次枚举后处理)。
替代方案: System.Interactive.Async 包包含 AsyncEnumerableEx.Using
运算符,可用于代替自定义 AsyncEnumerableDisposable<T>
类:
AsyncEnumerableDisposable
不同之处在于 private static async Task<IAsyncEnumerable<T>> StreamData<T>(
Func<Stream,IAsyncEnumerable<T>> streamAction)
{
var stream = await GetStream();
return AsyncEnumerableEx.Using(() => stream,streamAction);
}
将通过其 Stream
方法同步处理。 AFAICS 不支持在此包中处理 Dispose
。
这是 IAsyncDisposable
方法的签名:
AsyncEnumerableEx.Using