问题描述
在我的 .Net Core 应用程序中,第三方库中有一个方法可以写入 forumla1 <- as.formula(Sepal.Length ~ Petal.Length)
compute_cooks_models <- function(x,eq){
cooks.distance(lm(eq,data = x$train,na.action = na.exclude))
}
result <- sapply(datasets,compute_cooks_models,eq=forumla1)
接口(它将流接口作为参数并写入它),但我希望该数据转到我的数据源期望数据为 System.IO.Stream
流。我开始编写实现 IAsyncEnumerable<bytes>
接口的代码,因此当调用 Stream
时它会写入 Write()
,然后认为“这之前一定已经完成了” - 似乎是通用的目的使用。
那么在 3rd 方库中是否有标准实现,或者我缺少任何“巧妙的技巧”?
解决方法
这是一个自定义的 Stream
实现,用于异步生产者-消费者场景。它是一个可写流,只能通过特殊的 GetConsumingEnumerable
方法读取(消耗)它。
public class ProducerConsumerStream : Stream
{
private readonly Channel<byte> _channel;
public ProducerConsumerStream(bool singleReader = true,bool singleWriter = true)
{
_channel = Channel.CreateUnbounded<byte>(new UnboundedChannelOptions()
{
SingleReader = singleReader,SingleWriter = singleWriter
});
}
public override bool CanRead { get { return false; } }
public override bool CanSeek { get { return false; } }
public override bool CanWrite { get { return true; } }
public override long Length { get { throw new NotSupportedException(); } }
public override void Flush() { }
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override long Seek(long offset,SeekOrigin origin)
=> throw new NotSupportedException();
public override void SetLength(long value)
=> throw new NotSupportedException();
public override int Read(byte[] buffer,int offset,int count)
=> throw new NotSupportedException();
public override void Write(byte[] buffer,int count)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
if (offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));
for (int i = offset; i < offset + count; i++)
_channel.Writer.TryWrite(buffer[i]);
}
public override void WriteByte(byte value)
{
_channel.Writer.TryWrite(value);
}
public override void Close()
{
base.Close();
_channel.Writer.Complete();
}
public IAsyncEnumerable<byte> GetConsumingEnumerable(
CancellationToken cancellationToken = default)
{
return _channel.Reader.ReadAllAsync(cancellationToken);
}
}
此实现基于 Channel<byte>
。如果您不熟悉频道,这里有教程 here。