问题描述
我具有以下函数,该函数以运行System.Diagnostics.Process
的结果作为异步流返回标准输出数据。该方法中当前的所有操作均按预期进行。我可以在await foreach()
循环中调用它,并获得由外部exe生成的每一行输出。
private static async IAsyncEnumerable<string> ProcessAsyncStream (
processstartinfo processstartinfo)
{
// Ensure that process is destroyed when this method exits
using var process = new Process() { StartInfo = processstartinfo };
// Buffer used to pass data from event-handler back to this method
BufferBlock<string> dataBuffer = new BufferBlock<string>();
process.OutputDataReceived += (s,e) =>
{
if (e.Data is null)
{
dataBuffer.Complete();
}
else
{
dataBuffer.Post(e.Data);
}
};
// Start process and redirect output streams
process.Start();
process.BeginoutputReadLine();
// Return data line by line
while (await dataBuffer.OutputAvailableAsync())
yield return dataBuffer.Receive();
}
我的问题是,现在我需要它同时返回标准输出和标准错误结果。我做了这个类来保存每个流中的数据。
public class ProcessData
{
public string Error { get; set; } = "";
public string Output { get; set; } = "";
}
并将ProcessAsyncStream()
更改为如下形式
private static async IAsyncEnumerable<ProcessData> ProcessAsyncStream (
processstartinfo processstartinfo)
{
// Ensure that process is destroyed when this method exits
using var process = new Process() { StartInfo = processstartinfo };
// Buffer used to pass data from event-handlers back to this method
BufferBlock<string> outputDataBuffer = new BufferBlock<string>();
BufferBlock<string> errorDataBuffer = new BufferBlock<string>();
process.OutputDataReceived += (s,e) =>
{
if (e.Data is null)
{
outputDataBuffer.Complete();
}
else
{
outputDataBuffer.Post(e.Data);
}
};
process.ErrorDataReceived += (s,e) =>
{
if (e.Data is null)
{
errorDataBuffer.Complete();
}
else
{
errorDataBuffer.Post(e.Data);
}
};
// Start process and redirect output streams
process.Start();
process.BeginoutputReadLine();
// Return data line by line
while (await outputDataBuffer.OutputAvailableAsync()
|| await errorDataBuffer.OutputAvailableAsync())
yield return new ProcessData()
{
Error = errorDataBuffer.Receive(),Output = outputDataBuffer.Receive()
}
}
问题在于,如果一个缓冲区在方法挂起之前就完成了,因为该缓冲区的.Receive()
没有任何数据要接收。如果将while
条件更改为&&
,则不会从另一个缓冲区获取所有数据。
有什么建议吗?
解决方法
关于实际问题,读取块的过程流程存在问题。最简单的解决方案是只使用带有多个生产者的单个缓冲区和单个消费者以及消息包
您尝试使用 DataFlow块解决的概念性问题是事件和异步流的基本本质。推送事件,并提取异步流。
有几种解决方案可以将它们映射在一起,尽管我认为最优雅的方法是使用 Unbounded Channel 作为 buffer 。
与DataFlow相比,渠道是更现代的方法,与BufferBlock
相比,其自由度更低,更笨拙,并且非常轻巧且经过高度优化。另外,我只需要为不同的响应类型传递包装器。
忽略任何其他问题(概念性或其他问题)。
给予
public enum MessageType
{
Output,Error
}
public class Message
{
public MessageType MessageType { get; set; }
public string Data { get; set; }
public Message(string data,MessageType messageType )
{
Data = data;
MessageType = messageType;
}
}
用法
private async IAsyncEnumerable<Message> ProcessAsyncStreamAsync(
ProcessStartInfo processStartInfo,CancellationToken cancellationToken)
{
using var process = new Process() { StartInfo = processStartInfo };
var ch = Channel.CreateUnbounded<Message>();
var completeCount = 0;
void OnReceived(string data,MessageType type)
{
// The Interlocked memory barrier is likely overkill here
if (data is null && Interlocked.Increment(ref completeCount) == 2)
ch?.Writer.Complete();
else
ch?.Writer.WriteAsync(new Message(data,type),cancellationToken);
}
process.OutputDataReceived += (_,args) => OnReceived(args.Data,MessageType.Output);
process.ErrorDataReceived += (_,MessageType.Error);
// start the process
// ...
await foreach (var message in ch.Reader
.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
yield return message;
// cleanup
// ...
}
注意:完全未经测试
,改为在退出时完成。
void HandleData(object sender,DataReceivedEventArgs e)
{
if (e.Data != null) dataBuffer.Post(e.Data);
}
process.OutputDataReceived += HandleData;
process.ErrorDataReceived += HandleData;
process.Exited += (s,e) =>
{
process.WaitForExit();
dataBuffer.Complete();
};
,
您可以使用ProcessData
个项目的单个缓冲区:
var buffer = new BufferBlock<ProcessData>();
然后,当两个事件都传播了Complete
值时,使用自定义null
机制来完成缓冲区:
process.OutputDataReceived += (s,e) =>
{
if (e.Data is null) Complete(1);
else buffer.Post(new ProcessData() { Output = e.Data });
};
process.ErrorDataReceived += (s,e) =>
{
if (e.Data is null) Complete(2);
else buffer.Post(new ProcessData() { Error = e.Data });
};
这是Complete
方法的实现:
bool[] completeState = new bool[2];
void Complete(int index)
{
bool completed;
lock (completeState.SyncRoot)
{
completeState[index - 1] = true;
completed = completeState.All(v => v);
}
if (completed) buffer.Complete();
}