发送到 IAsyncEnumerable<bytes> 的 `Stream` 的实现

问题描述

在我的 .Net Core 应用程序中,第三方库中有一个方法可以写入 forumla1 <- as.formula(Sepal.Length ~ Petal.Length) compute_cooks_models <- function(x,eq){ cooks.distance(lm(eq,data = x$train,na.action = na.exclude)) } result <- sapply(datasets,compute_cooks_models,eq=forumla1) 接口(它将流接口作为参数并写入它),但我希望该数据转到我的数据源期望数据为 System.IO.Stream 流。我开始编写实现 IAsyncEnumerable<bytes> 接口的代码,因此当调用 Stream 时它会写入 Write(),然后认为“这之前一定已经完成了” - 似乎是通用的目的使用。

那么在 3rd 方库中是否有标准实现,或者我缺少任何“巧妙的技巧”?

解决方法

这是一个自定义的 Stream 实现,用于异步生产者-消费者场景。它是一个可写流,只能通过特殊的 GetConsumingEnumerable 方法读取(消耗)它。

public class ProducerConsumerStream : Stream
{
    private readonly Channel<byte> _channel;

    public ProducerConsumerStream(bool singleReader = true,bool singleWriter = true)
    {
        _channel = Channel.CreateUnbounded<byte>(new UnboundedChannelOptions()
        {
            SingleReader = singleReader,SingleWriter = singleWriter
        });
    }

    public override bool CanRead { get { return false; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return true; } }
    public override long Length { get { throw new NotSupportedException(); } }
    public override void Flush() { }

    public override long Position
    {
        get { throw new NotSupportedException(); }
        set { throw new NotSupportedException(); }
    }

    public override long Seek(long offset,SeekOrigin origin)
        => throw new NotSupportedException();

    public override void SetLength(long value)
        => throw new NotSupportedException();

    public override int Read(byte[] buffer,int offset,int count)
        => throw new NotSupportedException();

    public override void Write(byte[] buffer,int count)
    {
        if (buffer == null) throw new ArgumentNullException(nameof(buffer));
        if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset));
        if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
        if (offset + count > buffer.Length)
            throw new ArgumentOutOfRangeException(nameof(count));

        for (int i = offset; i < offset + count; i++)
            _channel.Writer.TryWrite(buffer[i]);
    }

    public override void WriteByte(byte value)
    {
        _channel.Writer.TryWrite(value);
    }

    public override void Close()
    {
        base.Close();
        _channel.Writer.Complete();
    }

    public IAsyncEnumerable<byte> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        return _channel.Reader.ReadAllAsync(cancellationToken);
    }
}

此实现基于 Channel<byte>。如果您不熟悉频道,这里有教程 here

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...