在C#中重构异步套接字编程

问题描述

我有一堆要通过C#套接字公开的异步方法。 MSDN documentaions中的常规模式具有以下形式:

 public static void StartListening()
 {
   ...
   IPEndPoint localEndPoint = new IPEndPoint(ipAddress,11000);
   ...
   listener.BeginAccept(new AsyncCallback(AcceptCallback),listener); 
   ... 
 }

 public static void AcceptCallback(IAsyncResult ar)
 {
    ...
    handler.BeginReceive( state.buffer,StateObject.BufferSize,new AsyncCallback(ReadCallback),state);  
 }

 public static void ReadCallback(IAsyncResult ar)
 { 
    ...
    StateObject state = (StateObject) ar.AsyncState;
    ...
    CalculateResult(state);  
    ...
    handler.BeginReceive(state.buffer,state);  
    ...
 }

因此,以一种简洁整洁的方式编写所有这些内容而不重复代码一个挑战。我正在沿着这些思路思考,但无法将点连接起来:

    public static void StartListeningMaster()
    {
        string ipAddress = "localhost";
        IPHostEntry ipHost = Dns.GetHostEntry(ipAddress);
        IPAddress address = ipHost.AddressList[0];

        StartListening(50000,address,AcceptCallback1);
        StartListening(50001,AcceptCallback2);
        StartListening(50002,AcceptCallback3);
        ...
    }
    
    public static void StartListening(int port,IPAddress ipAddress,Action<IAsyncResult> acceptCallback) {...}
    public static void AcceptCallback1(IAsyncResult ar)
    {
       ...
       handler.BeginReceive(state.buffer,new AsyncCallback1(ReadCallback1),state);  
    }
    ...

到目前为止,此方法仍然有效。但是为了正确地重构它,我希望有一个AcceptCallback方法,该方法一个通用的ReadCallback作为其参数,并将CalculateResult方法作为其参数。这样,我就不会重复任何代码。但是,如果我修改我的AcceptCallback方法以接受比IAsyncResult更多的参数(例如:

  public static void StartListening(int port,Action<IAsyncResult,Action<IAsyncResult>> acceptCallback) {...}

  public static void AcceptCallback(IAsyncResult ar,Action<IAsyncResult> readCallback) {}
 

我违反了AsyncCallback委托合同。

public delegate void AsyncCallback(IAsyncResult ar);

然后,我研究了扩展现有接口以允许该功能方法。我研究了扩展

public interface IAsyncResult

但这似乎也不是正确的方法。因此,如何编写此代码,以免在整个地方都复制和粘贴几乎相同的代码

解决方法

所以我解决这个问题的方法是将基本组件移到它们自己的抽象对象中。然后在这些对象上构建。例如,服务器仅需要接受/跟踪连接。因此,我将使服务器对象看起来像这样:

namespace MultiServerExample.Base
{
    public interface IAsyncServerBase
    {
        void StartListening();
        bool IsListening { get; }
        void StopListening();
        void WriteDataToAllClients(byte[] data);
    }

    public abstract class AsyncServerBase<TClientBase> : IAsyncServerBase
        where TClientBase : IAsyncClientBase,new()
    {
        // implement a TcpListener to gain access to Active property
        private sealed class ActiveTcpListener : TcpListener
        {
            public ActiveTcpListener(IPAddress localaddr,int port)
                : base(localaddr,port) { }
            public bool IsActive => Active;
        }

        // our listener object
        private ActiveTcpListener Listener { get; }
        // our clients
        private ConcurrentDictionary<string,TClientBase> Clients { get; }

        // construct with a port
        public AsyncServerBase(int port)
        {
            Clients = new ConcurrentDictionary<string,TClientBase>();
            Listener = new ActiveTcpListener(IPAddress.Any,port);
        }

        // virtual methods for client action
        public virtual void OnClientConnected(TClientBase client) { }
        public virtual void OnClientDisconnected(TClientBase client,Exception ex) { }

        // start the server
        public void StartListening()
        {
            if(!IsListening)
            {
                Listener.Start();
                Listener.BeginAcceptTcpClient(OnAcceptedTcpClient,this);
            }
        }

        // check if the server is running
        public bool IsListening =>
            Listener.IsActive;

        // stop the server
        public void StopListening()
        {
            if (IsListening)
            {
                Listener.Stop();
                Parallel.ForEach(Clients,x => x.Value.DetachClient(null));
                Clients.Clear();
            }
        }

        // async callback for when a client wants to connect
        private static void OnAcceptedTcpClient(IAsyncResult res)
        {
            var me = (AsyncServerBase<TClientBase>)res.AsyncState;

            if (!me.IsListening) { return; }

            try
            {
                TcpClient client = null;
                try
                {
                    client = me.Listener.EndAcceptTcpClient(res);
                }
                catch(Exception ex)
                {
                    System.Diagnostics.Debug.WriteLine($"Warning: unable to accept client:\n{ex}");
                }

                if(client != null)
                {
                    // create a new client
                    var t = new TClientBase();
                    // set up error callbacks
                    t.Error += me.OnClientBaseError;
                    // notify client we have attached
                    t.AttachClient(client);
                    // track the client
                    me.Clients[t.Id] = t;
                    // notify we have a new connection
                    me.OnClientConnected(t);
                }
            }
            finally
            {
                // if we are still listening,wait for another connection
                if(me.IsListening)
                {
                    me.Listener.BeginAcceptSocket(OnAcceptedTcpClient,me);
                }
            }
        }

        // Event callback from a client that an error has occurred
        private void OnClientBaseError(object sender,AsyncClientBaseErrorEventArgs e)
        {
            var client = (TClientBase)sender;
            client.Error -= OnClientBaseError;

            OnClientDisconnected(client,e.Exception);

            client.DetachClient(e.Exception);
            Clients.TryRemove(client.Id,out _);
        }

        // utility method to write data to all clients connected
        public void WriteDataToAllClients(byte[] data)
        {
            Parallel.ForEach(Clients,x => x.Value.WriteData(data));
        }
    }
}

至此,已经考虑了运行服务器的所有基础知识。现在,对于在服务器上运行的客户端:

namespace MultiServerExample.Base
{
    public interface IAsyncClientBase
    {
        event EventHandler<AsyncClientBaseErrorEventArgs> Error;
        void AttachClient(TcpClient client);
        void WriteData(byte[] data);
        void DetachClient(Exception ex);
        string Id { get; }
    }

    public abstract class AsyncClientBase : IAsyncClientBase
    {
        protected virtual int ReceiveBufferSize { get; } = 1024;
        private TcpClient Client { get; set; }
        private byte[] ReceiveBuffer { get; set; }
        public event EventHandler<AsyncClientBaseErrorEventArgs> Error;
        public string Id { get; }

        public AsyncClientBase()
        {
            Id = Guid.NewGuid().ToString();
        }

        public void AttachClient(TcpClient client)
        {
            if(ReceiveBuffer != null) { throw new InvalidOperationException(); }

            ReceiveBuffer = new byte[ReceiveBufferSize];
            Client = client;

            try
            {
                Client.GetStream().
                    BeginRead(ReceiveBuffer,ReceiveBufferSize,OnDataReceived,this);
                OnAttachedToServer();
            }
            catch (Exception ex)
            {
                Error?.Invoke(this,new AsyncClientBaseErrorEventArgs(ex,"BeginRead"));
            }
        }

        public void DetachClient(Exception ex)
        {
            try
            {
                Client.Close();
                OnDetachedFromServer(ex);
            }
            catch { /* intentionally swallow */ }
        
            Client = null;
            ReceiveBuffer = null;
        }

        public virtual void OnDataReceived(byte[] buffer) { }
        public virtual void OnAttachedToServer() { }
        public virtual void OnDetachedFromServer(Exception ex) { }

        public void WriteData(byte[] data)
        {
            try
            {
                Client.GetStream().BeginWrite(data,data.Length,OnDataWrote,this);
            }
            catch(Exception ex)
            {
                Error?.Invoke(this,"BeginWrite"));
            }
        }

        private static void OnDataReceived(IAsyncResult iar)
        {
            var me = (AsyncClientBase)iar.AsyncState;

            if(me.Client == null) { return; }

            try
            {
                var bytesRead = me.Client.GetStream().EndRead(iar);
                var buf = new byte[bytesRead];
                Array.Copy(me.ReceiveBuffer,buf,bytesRead);

                me.OnDataReceived(buf);
            }
            catch (Exception ex)
            {
                me.Error?.Invoke(me,"EndRead"));
            }
        }

        private static void OnDataWrote(IAsyncResult iar)
        {
            var me = (AsyncClientBase)iar.AsyncState;
            try
            {
                me.Client.GetStream().EndWrite(iar);
            }
            catch(Exception ex)
            {
                me.Error?.Invoke(me,"EndWrite"));
            }
        }
    }
}

现在,所有基本代码都已编写。您无需进行任何更改。您只需实现自己的客户端和服务器即可做出相应响应。例如,这是一个基本的服务器实现:

public class MyServer : AsyncServerBase<MyClient>
{
    public MyServer(int port) : base(port)
    {
    }

    public override void OnClientConnected(MyClient client)
    {
        Console.WriteLine($"* MyClient connected with Id: {client.Id}");
        base.OnClientConnected(client);
    }

    public override void OnClientDisconnected(MyClient client,Exception ex)
    {
        Console.WriteLine($"***** MyClient disconnected with Id: {client.Id} ({ex.Message})");
        base.OnClientDisconnected(client,ex);
    }
}

这是上面的服务器用于通信的客户端:

public class MyClient : AsyncClientBase
{
    public override void OnAttachedToServer()
    {
        base.OnAttachedToServer();

        Console.WriteLine($"{Id}: {GetType().Name} attached. Waiting for data...");
    }

    public override void OnDataReceived(byte[] buffer)
    {
        base.OnDataReceived(buffer);

        Console.WriteLine($"{Id}: {GetType().Name} recieved {buffer.Length} bytes. Writing 5 bytes back.");
        WriteData(new byte[] { 1,2,3,4,5 });
    }

    public override void OnDetachedFromServer(Exception ex)
    {
        base.OnDetachedFromServer(ex);

        Console.WriteLine($"{Id}: {GetType().Name} detached.");
    }
}

为使这一点成为现实,这是另一个客户端,只需将其插入相同的服务器实现中,即可提供不同的特征:

public class MyOtherClient : AsyncClientBase
{
    public override void OnAttachedToServer()
    {
        base.OnAttachedToServer();

        Console.WriteLine($"{Id}: {GetType().Name} attached. Writing 4 bytes back.");
        WriteData(new byte[] { 1,4 });
    }

    public override void OnDataReceived(byte[] buffer)
    {
        base.OnDataReceived(buffer);

        Console.WriteLine($"{Id}: {GetType().Name} recieved {buffer.Length} bytes.");
    }

    public override void OnDetachedFromServer(Exception ex)
    {
        base.OnDetachedFromServer(ex);

        Console.WriteLine($"{Id}: {GetType().Name} detached.");
    }
}

就此而言,这是一个小的测试程序,可以对它进行压力测试:

    class Program
{
    static void Main(string[] args)
    {
        var servers = new IAsyncServerBase[]
        {
            new MyServer(50000),new MyServer(50001),new MyOtherServer(50002)
        };

        foreach (var s in servers)
        {
            s.StartListening();
        }

        RunTestUsingMyServer("1",89,50000);
        RunTestUsingMyServer("2",127,50001);
        RunTestUsingMyOtherServer("3",88,50002);

        Console.Write("Press any key to exit... ");
        Console.ReadKey(true);

        foreach (var s in servers)
        {
            s.WriteDataToAllClients(new byte[] { 1,5 });
            s.StopListening();
        }
    }

    private static void RunTestUsingMyServer(string name,int clientCount,int port)
    {
        Parallel.For(0,clientCount,x =>
        {
            using (var t = new TcpClient())
            {
                t.Connect(IPAddress.Loopback,port);
                t.GetStream().Write(new byte[] { 1,5 },5);
                t.GetStream().Read(new byte[512],512);
                t.Close();
            }
            Console.WriteLine($"FINISHED PASS {name} #{x}");
        });
    }

    private static void RunTestUsingMyOtherServer(string name,port);
                t.GetStream().Read(new byte[512],512);
                t.GetStream().Write(new byte[] { 1,5,6 },6);
                t.Close();
            }
            Console.WriteLine($"FINISHED PASS {name} #{x}");
        });
    }
}

如果有兴趣,这里是full source code,您可以签出。希望这能使您到达与重用代码有关的位置。

,

我不知道这是否可以帮助您。您可以使用与每个端口有关的所有信息来定义状态对象:

public class StateObject
{
    public string Name;
    public Socket Listener;
    public IPEndPoint LocalEndPoint;
    //...

    public StateObject(Socket listener,IPEndPoint endPoint,string name)
    {
        Listener = listener;
        LocalEndPoint = endPoint;
        Name = name;
    }
}

然后,您可以根据需要使用它:

    public static void StartListeningMaster()
    {
        string ipAddress = "localhost";
        IPHostEntry ipHost = Dns.GetHostEntry(ipAddress);
        IPAddress address = ipHost.AddressList[0];

        StartListening(50000,address,"Main port");
        StartListening(50001,"Service port");
        StartListening(50002,"Extra");
        //...
    }

    public static void StartListening(int port,IPAddress ipAddress,string name = "")
    {
        //...
        IPEndPoint localEndPoint = new IPEndPoint(ipAddress,port);
        //...
        StateObject state = new StateObject(listener,localEndPoint);
        listener.BeginAccept(AcceptCallback,state);
        //...
    }

    public static void AcceptCallback(IAsyncResult ar)
    {
        StateObject state = (StateObject)ar.AsyncState;
        //...
        handler.BeginReceive(client.buffer,StateObject.BufferSize,new AsyncCallback(ReadCallback),state);
    }

    public static void ReadCallback(IAsyncResult ar)
    {
        StateObject state = (StateObject)ar.AsyncState;

        // Always have the info related to every socket
        Socket listener = state.Listener;
        string address = state.LocalEndPoint.Address.ToString();
        int port = state.LocalEndPoint.Port;
        string name = state.Name;

        //...
        StateObject state = (StateObject)ar.AsyncState;
        //...
        CalculateResult(state);
        //...
        handler.BeginReceive(client.buffer,state);
        //...
    }

CalculateResult(state)方法将具有执行所有操作所需的所有必要信息。这样,您要管理的所有端口只有一个StartListening(),一个AcceptCallback()和一个ReadCallback()