TCP 服务器的 IDuplexPipe 的示例实现

问题描述

我最近发现了 system.io.pipelines 命名空间,它看起来很有趣。我一直在尝试在一个简单的 TCP 服务器的上下文中实现 IDuplexPipe 接口,该服务器接受连接,然后与连接的客户端来回通信。

但是,我正在努力使其稳定。感觉好像我误解了一些基本的东西。我还一直在谷歌上搜索界面的参考实现,以指导我朝着正确的方向前进。

据我所知,这是关于 system.io.pipelines最完整的文档。我在下面提供的例子大量借鉴了它。

我的问题:在 TCP 服务器的上下文中,IDuplexPipe 接口的典型实现是什么样的?

顺便说一句,这就是我目前所拥有的。这个想法是通过提供一个已建立的 SslStream 来建立一个新的“双工通信”:

    public class DuplexCommunication : IDuplexPipe
    {
        public PipeReader Input => _receivePipe.Reader;
        public PipeWriter Output => _transmitPipe.Writer;

        private readonly SslStream _stream;

        // Data received from the SslStream will end up on this pipe
        private readonly Pipe _receivePipe = new Pipe();

        // Data that is to be transmitted over the SslStream ends up on this pipe
        private readonly Pipe _transmitPipe = new Pipe();

        private readonly CancellationToken _cts;
        private Task _receive;
        private Task _transmit;

        public DuplexCommunication(SslStream stream,CancellationToken cts)
        {
            _stream = stream;
            _cts = cts;

            _receive = Receive();
            _transmit = Transmit();
        }

        private async Task Receive()
        {
            Exception error = null;
            try
            {
                while (!_cts.IsCancellationRequested)
                {
                    var buffer = _receivePipe.Writer.GetMemory(1);
                    var bytes = await _stream.ReadAsync(buffer,_cts);
                    _receivePipe.Writer.Advance(bytes);
                    
                    if (bytes == 0) {
                        break;
                    }
                    
                    var flush = await _receivePipe.Writer.FlushAsync(_cts);
                    if (flush.IsCompleted || flush.IsCanceled)
                    {
                        break;
                    }
                }
            }
            catch (Exception ex)
            {
                // This might be "stream is closed" or similar,from when trying to read from the stream
                Console.WriteLine($"DuplexPipe ReceiveTask caugth an exception: {ex.Message}");
                error = ex;
            }
            finally
            {
                await _receivePipe.Writer.CompleteAsync(error);
            }
        }

        private async Task Transmit()
        {
            Exception error = null;
            try
            {
                while (!_cts.IsCancellationRequested)
                {
                    var read = await _transmitPipe.Reader.ReadAsync(_cts);
                    var buffer = read.Buffer;

                    if (buffer.IsEmpty && read.IsCompleted)
                    {
                        break;
                    }

                    foreach (var segment in buffer)
                    {
                        await _stream.WriteAsync(segment,_cts);
                    }

                    _transmitPipe.Reader.Advanceto(buffer.End);
                    await _stream.FlushAsync(_cts);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine($"DuplexPipe Transmit caught an exception: {e.Message}");
                error = e;
            }
            finally
            {
                await _transmitPipe.Reader.CompleteAsync(error);
            }
        }
    }

解决方法

所以,我花了更多时间在互联网上搜索并找到了一些可以解决我的问题的东西。我发现,除其他外,这个问题:How to handle incoming TCP messages with a Kestrel ConnectionHandler?

在这里,ConnectionHandler 类似乎提供了我所需要的,包括一些非常方便的管道,用于处理 SSL 证书、端口侦听等,这些都是在构建 ASP.NET 应用程序时免费提供的。