状态机和网络套接字 - 如何处理竞争条件

问题描述

我在我的 C# 网络项目中使用无状态,主要是因为它是添加功能的好方法,例如套接字连接后的线级授权、重新连接的延迟等。

话虽如此,我自己遇到了一些竞争条件和僵局 - 就以下状态处理此问题的最佳方法寻求建议:

enum State { Stopped,disconnected,Connecting,Connected,Resetting }
enum Trigger { Start,Stop,Connect,SetConnectComplete,Reset,SetResetComplete }

class StateMachine : StateMachine<State,Trigger>
{
    public StateMachine(Action Ondisconnected,Action OnConnecting,Action OnConnected,Action OnResetting) : base(State.Stopped)
    {
        this.Configure(State.Stopped)
            .Permit(Trigger.Start,State.disconnected);

        this.Configure(State.disconnected)
            .OnEntry(Ondisconnected)
            .Permit(Trigger.Connect,State.Connecting);

        this.Configure(State.Connecting)
            .OnEntry(OnConnecting)
            .Permit(Trigger.SetConnectComplete,State.Connected)
            .Permit(Trigger.Reset,State.Resetting);

        this.Configure(State.Connected)
            .OnEntry(OnConnected)
            .Permit(Trigger.Reset,State.Resetting);

        this.Configure(State.Resetting)
            .OnEntry(OnResetting)
            .Permit(Trigger.SetResetComplete,State.disconnected);
    }
}

它的功能套接字将自动重新连接,并在连接时启动接收循环。如果发生套接错误,它应该返回以释放资源,然后再循环返回以重新启动。

但是,当我处理该对象时,连接的套接字会中止,这也会释放资源并尝试自行等待。

我相信这与等待自身的线程有关,所以我的设计/状态结构绝对是从根本上关闭的,并感谢有关可以完全避免死锁的更好结构的指针。

public class ManagedWebSocket : Idisposable
{
    readonly StateMachine stateMachine;
    Task backgroundReaderTask;

    private ClientWebSocket webSocket;
    private readonly ITargetBlock<byte[]> target;
    private readonly ILogger<ManagedWebSocket> logger;
    private CancellationTokenSource cancellationTokenSource;
    bool isdisposing;

    public ManagedWebSocket(string uri,ITargetBlock<byte[]> target,ILogger<ManagedWebSocket> logger)
    {
        this.stateMachine = new StateMachine(Ondisconnected,OnConnecting,OnConnected,OnResetting);
        this.target = target;
        this.logger = logger;
    }

    private void OnConnecting()
    {
        this.backgroundReaderTask = Task.Run(async () =>
        {
            this.cancellationTokenSource = new CancellationTokenSource();
            this.webSocket = new ClientWebSocket();
            webSocket.Options.KeepAliveInterval = KeepAliveInterval;

            try
            {
                await this.webSocket.ConnectAsync(this.uri,cancellationTokenSource.Token);
            }
            catch(WebSocketException ex)
            {
                this.logger.LogError(ex.Message,ex);
                await this.stateMachine.FireAsync(Trigger.Reset);
            }

            this.stateMachine.Fire(Trigger.SetConnectComplete);
        });
    }
    
    private void Ondisconnected()
    {
        if (isdisposing == false)
            this.stateMachine.Fire(Trigger.Connect);
    }

    private void OnResetting()
    {
        FreeResources();
        this.stateMachine.Fire(Trigger.SetResetComplete);
    }

    private void OnConnected()
    {
        this.backgroundReaderTask = Task.Run( async () => {
            try
            {
                // returns when the internal frame loop completes with websocket close,or by throwing an exception
                await this.webSocket.ReceiveFramesLoopAsync(target.SendAsync,2048,this.cancellationTokenSource.Token);
            }
            catch (Exception ex)
            {
                this.logger.LogError(ex.Message,ex);
            }

            await this.stateMachine.FireAsync(Trigger.Reset);
        });
    }

    public async Task SendAsync(byte[] data,WebSocketMessageType webSocketMessageType)
    {
        if (this.stateMachine.State != State.Connected)
            throw new Exception($"{nameof(ManagedWebSocket)} is not yet connected.");

        try
        {
            await webSocket
                    .SendAsChunksAsync(data,webSocketMessageType,this.cancellationTokenSource.Token)
                    .ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            this.logger.LogError(ex,ex.Message);
            await this.stateMachine.FireAsync(Trigger.Reset);
        }
    }

    public void Start()
    {
        this.stateMachine.Fire(Trigger.Start);
    }    

    public void FreeResources()
    {
        this.logger.LogDebug($"{nameof(ManagedWebSocket.FreeResources)}");
        this.cancellationTokenSource?.Cancel();
        this.backgroundReaderTask?.Wait();
        this.cancellationTokenSource?.dispose();
        this.backgroundReaderTask?.dispose();
    }

    public void dispose()
    {
        if (isdisposing)
            return;

        isdisposing = true;
        FreeResources();
    }
}

解决方法

我猜死锁是由在 FreeResources(); 中调用 OnResetting() 引起的,因为 FreeResources(); 正在等待 backgroundReaderTask 但在 backgroundReaderTask 内您正在等待 {{1 }} 通过OnResetting()

作为某种解决方法,您可以省略触发重置的“await”关键字,因为它无论如何都会处理整个对象。

另请注意,如果之前在 await this.stateMachine.FireAsync(Trigger.Reset); 中抛出异常,似乎没有理由调用 this.stateMachine.Fire(Trigger.SetConnectComplete); - 只需将其移动到 try 块中即可。

此外,作为某种最佳实践和旁注,请尝试遵循 recommended dispose pattern