如何最好地将 TaskCompletionSource 与 SemaphoreSlim 结合使用

问题描述

这是 my previous question 的演变。回顾一下,我有第 3 方 WebSocket,我需要使用它来在一种方法中发送请求并在另一种方法中给出响应。我正在尝试将其转换为 SemaphoreSlim

我阅读了 this MSDN Magazine article,其中显示一个使用 SemaphoreSlim.Release() 来控制条目的示例,但我的情况并不完全相同,我不确定最好的做法是什么。

>

我可以将 finally 放在 GetPositionsAsync()positionEnd() 块中,如文章中所示,但如果出现错误,则 finally 可以在 GetPositionsAsync() 之前被调用如果有另一个线程在等待信号量,则会在 SemaphoreSlim.Release() 中导致错误。另一方面,如果我将 positionEnd() 放在 GetPositionsAsync() 中,我能否可靠地假设 Positions.Clear() 会返回正确的结果?我担心如果释放发生,等待信号量的线程可能会在另一个线程将 Positions 的值保存到 result 之前调用 private taskcompletionsource<List<Position>> PositionsTaskSource { get; set; } private readonly SemaphoreSlim PositionsMutex = new(1,1); public async Task<List<Position>> GetPositionsAsync() { try { await PositionsMutex.WaitAsync().ConfigureAwait(false); Positions.Clear(); PositionsTaskSource = new(); IbWebSocket.reqPositions(); var result = await PositionsTaskSource.Task; //Write to Trace Log here return result; } finally { //I Could put the Release here as shown in the article //PositionsMutex.Release(); } } /// <summary> /// Provides a position to the reqPositions() method. When the last position has been received positionEnd() is called. /// </summary> /// <param name="account"></param> /// <param name="contract"></param> /// <param name="value"></param> public void position(string account,Contract contract,double value) { Positions.Add(new Position(account,contract,value)); } /// <summary> /// Indicates all the positions have been transmitted. /// </summary> public void positionEnd() { PositionsTaskSource.TrySetResult(Positions)) PositionsMutex.Release(); } 。因此,无论哪种方式,都存在同样的担忧。哪个更好还是有其他方法可以防止这个问题同时发生?

这是我目前所拥有的...

User#presence#activities

解决方法

无需在 PositionsMutex.Release(); 中调用 positionEnd()。一旦 positionEnd() 设置了任务结果,GetPositionsAsync 将继续并最终释放信号量,从而允许下一个调用者重新进入。

,

正如 MSDN 文章和 Nick 所述,在 ReleaseGetPositionsAsync 块内调用 finally 就足够了。

为了更好地表达您的意图,我建议您对代码稍作更改:

  • 而不是使用 TaskCompletionSource<List<Position>>
  • 我建议使用 TaskCompletionSource<object>
private TaskCompletionSource<object> PositionsHasBeenPopulated;
private readonly SemaphoreSlim GetPositionAsyncMutex = new(1,1);

public async Task<List<Position>> GetPositionsAsync()
{
    try
    {
        await GetPositionAsyncMutex.WaitAsync().ConfigureAwait(false);
        Positions.Clear();
        PositionsHasBeenPopulated = new();
        IbWebSocket.reqPositions();
        
        await PositionsHasBeenPopulated.Task;
        return Positions;
    }
    finally
    {
        GetPositionAsyncMutex.Release();
    }
}


public void position(string account,Contract contract,double value)
    => Positions.Add(new Position(account,contract,value));

public void positionEnd()
    => PositionsHasBeenPopulated.TrySetResult(null);
  • GetPositionAsyncMutex 将并发调用限制为一次
  • PositionsHasBeenPopulated 仅用于信令
    • 您可以将 TaskCompletionSource<object> 视为 ManualResetEvent 的异步版本

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...