问题描述
这是 my previous question 的演变。回顾一下,我有第 3 方 WebSocket,我需要使用它来在一种方法中发送请求并在另一种方法中给出响应。我正在尝试将其转换为 SemaphoreSlim
。
我阅读了 this MSDN Magazine article,其中显示了一个使用 SemaphoreSlim.Release()
来控制条目的示例,但我的情况并不完全相同,我不确定最好的做法是什么。
我可以将 finally
放在 GetPositionsAsync()
的 positionEnd()
块中,如文章中所示,但如果出现错误,则 finally 可以在 GetPositionsAsync()
之前被调用如果有另一个线程在等待信号量,则会在 SemaphoreSlim.Release()
中导致错误。另一方面,如果我将 positionEnd()
放在 GetPositionsAsync()
中,我能否可靠地假设 Positions.Clear()
会返回正确的结果?我担心如果释放发生,等待信号量的线程可能会在另一个线程将 Positions
的值保存到 result
之前调用 private taskcompletionsource<List<Position>> PositionsTaskSource { get; set; }
private readonly SemaphoreSlim PositionsMutex = new(1,1);
public async Task<List<Position>> GetPositionsAsync()
{
try
{
await PositionsMutex.WaitAsync().ConfigureAwait(false);
Positions.Clear();
PositionsTaskSource = new();
IbWebSocket.reqPositions();
var result = await PositionsTaskSource.Task;
//Write to Trace Log here
return result;
}
finally
{
//I Could put the Release here as shown in the article
//PositionsMutex.Release();
}
}
/// <summary>
/// Provides a position to the reqPositions() method. When the last position has been received positionEnd() is called.
/// </summary>
/// <param name="account"></param>
/// <param name="contract"></param>
/// <param name="value"></param>
public void position(string account,Contract contract,double value)
{
Positions.Add(new Position(account,contract,value));
}
/// <summary>
/// Indicates all the positions have been transmitted.
/// </summary>
public void positionEnd()
{
PositionsTaskSource.TrySetResult(Positions))
PositionsMutex.Release();
}
。因此,无论哪种方式,都存在同样的担忧。哪个更好还是有其他方法可以防止这个问题同时发生?
这是我目前所拥有的...
User#presence#activities
解决方法
无需在 PositionsMutex.Release();
中调用 positionEnd()
。一旦 positionEnd()
设置了任务结果,GetPositionsAsync
将继续并最终释放信号量,从而允许下一个调用者重新进入。
正如 MSDN 文章和 Nick 所述,在 Release
的 GetPositionsAsync
块内调用 finally
就足够了。
为了更好地表达您的意图,我建议您对代码稍作更改:
- 而不是使用
TaskCompletionSource<List<Position>>
- 我建议使用
TaskCompletionSource<object>
:
private TaskCompletionSource<object> PositionsHasBeenPopulated;
private readonly SemaphoreSlim GetPositionAsyncMutex = new(1,1);
public async Task<List<Position>> GetPositionsAsync()
{
try
{
await GetPositionAsyncMutex.WaitAsync().ConfigureAwait(false);
Positions.Clear();
PositionsHasBeenPopulated = new();
IbWebSocket.reqPositions();
await PositionsHasBeenPopulated.Task;
return Positions;
}
finally
{
GetPositionAsyncMutex.Release();
}
}
public void position(string account,Contract contract,double value)
=> Positions.Add(new Position(account,contract,value));
public void positionEnd()
=> PositionsHasBeenPopulated.TrySetResult(null);
-
GetPositionAsyncMutex
将并发调用限制为一次- 我建议使用另一个重载,您可以在其中指定 timeout in milliseconds 或者您也可以提供 cancellationToken
-
PositionsHasBeenPopulated
仅用于信令- 您可以将
TaskCompletionSource<object>
视为ManualResetEvent
的异步版本
- 您可以将