C#-为高度并发的方法设置CancellationTokens的正确方法

问题描述

我有一个函数,每次对象引发事件时都会刷新UI。但是,任何单个对象都可以在1秒内引发多个事件,并且集合中可能有10,000+个对象在执行此操作。我的想法是捕获最后一个事件,并在1秒间隔后丢弃所有未决事件。

每次任何对象引发任何事件时,都会调用以下RefreshCollection()函数。

SemaphoreSlim _semaphoreUpdatingList = new SemaphoreSlim(1);
SemaphoreSlim _semaphoreRefreshingView = new SemaphoreSlim(1);
CancellationTokenSource _ctsRefreshView = null;

internal void RefreshCollection()
{
    // if we're in the process of changing the collection,return
    if (_semaphoreUpdatingList.CurrentCount == 0)
    {
        return;
    }
    if (_ctsRefreshView != null)
    {
        _ctsRefreshView.Cancel();
    }
    Task.Run(async () =>
    {
        if (_ctsRefreshView == null)
        {
            _ctsRefreshView = new CancellationTokenSource();
        }
        var ct = _ctsRefreshView.Token;
        try
        {
            await _semaphoreRefreshingView.WaitAsync(ct);
            var stopWatch = new Stopwatch();
            stopWatch.Start();
            Application.Current?.Dispatcher?.Invoke(() =>
            {
                CollectionView.Refresh();
            });
            stopWatch.Stop();

            // Only refresh every 1 sec
            if (stopWatch.ElapsedMilliseconds < 1000)
            {
                await Task.Delay(1000 - (int)stopWatch.ElapsedMilliseconds);
            }
            _semaphoreRefreshingView.Release();
        }
        catch (OperationCanceledException)
        {
            return;
        }
        finally
        {
            _ctsRefreshView = null;
        }
    });
}

问题是,当我将其称为_ctsRefreshView is null时,很少在任务内出现var ct = _ctsRefreshView.Token;错误。我为这发生的原因scratch之以鼻。

非常感谢您的帮助。

解决方法

您应该使用Microsoft的Reactive Framework(又名Rx)-NuGet System.Reactive.Windows.Threading(用于WPF位)并添加using System.Reactive.Linq;

如果您有collection个此类:

public class MyObject
{
    public event EventHandler Ping;
}

然后您可以执行以下操作:

IObservable<EventPattern<EventArgs>> query =
    collection
        .ToObservable()
        .SelectMany(x =>
            Observable
                .FromEventPattern<EventHandler,EventArgs>(
                    h => x.Ping += h,h => x.Ping -= h))
        .Sample(TimeSpan.FromSeconds(0.1))
        .ObserveOnDispatcher();
        
IDisposable subscription = query.Subscribe(x => CollectionView.Refresh());

CollectionView.Refresh()秒最多可以给您打电话一次0.1

这比捣毁取消令牌来源要容易得多。

只需调用subscription.Dispose();即可停止所有操作。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...