获取由多线程应用程序产生的当前活动任务的计数

问题描述

您好,我是一名实习生,几乎没有 C# 经验,我遇到了接管使用 taskcompletionsourceBlockingCollection 实现多线程的 Windows 服务的情况。我从来没有做过 C#。我正在尝试优化服务处理其处理日志文件任务的方式。

我的问题是,使用 BlockingCollection 创建一个执行 WorkiItem 的线程队列,如何获取队列中活动线程的数量?这意味着 EnqueueTask() 命令调用的项目有多少仍处于运行状态?我不想要 _taskQ.Count 返回的队列积压计数。我想要活动线程数。我想将线程数保持在 4,并且只在前一个项目完成后将一个项目排入队列。我不希望我的队列中出现大量项目。

public class ProducerConsumerQueue
{
    public CancellationTokenSource Token { get; set; }
    private BlockingCollection<WorkItem> _taskQ;

    public ProducerConsumerQueue(int workerCount)
    {
        _taskQ = new BlockingCollection<WorkItem>();
        for(int i = 0; i <workerCount; i++)
        {
            Task.Factory.StartNew(Consume);
        }
    }

    
    public Task EnqueueTask(Action action,CancellationToken? cancelToken)
    {
        var tcs = new taskcompletionsource<object>();
        _taskQ.Add(new WorkItem(tcs,action,cancelToken));
        return tcs.Task;
    }

    public void Consume()
    {
        foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
        {
            if (workItem.CancelToken.HasValue &&
                workItem.CancelToken.Value.IsCancellationRequested)
            {
                workItem.TaskSource.SetCanceled();
            }
            else
            {
                try
                {
                    workItem.Action();
                    workItem.TaskSource.SetResult(null);
                }
                catch (OperationCanceledException ex)
                {
                    if (ex.CancellationToken == workItem.CancelToken)
                    {
                        workItem.TaskSource.SetCanceled();
                    }
                    else
                    {
                        workItem.TaskSource.SetException(ex);
                    }
                }
                catch (Exception ex)
                {
                    workItem.TaskSource.SetException(ex);
                }
            }
        }
    }
}

这个 ProducerCOnsumer 队列在服务启动时被调用,它的队列在每个服务轮询间隔重新加载。我想通过将其设置为线程安全数据库表中的文件限制来限制产生的这些线程的数量。因此,如果线程数为 4,则 db 表中的文件数将为 4。在 1 个文件完成之前,队列不应产生额外的线程或将文件排入队列。为此,我认为一个简单的解决方案是计算活动线程的数量(这意味着活动文件数量正在处理,并且在线程减少 1 之前不要添加任何新文件

protected override void OnStart(string[] args)
{
    ProducerConsumerQueue = new ProducerConsumerQueue(Constants.THREAD_COUNT);
    InitializeLogging();
    PollOnServiceStart();

    _timer.Elapsed += Onelapsedtime;
    _timer.Enabled = true;
    _timer.Interval = _interval;
}

public void Onelapsedtime(object source,ElapsedEventArgs args)
{
    try
    {
        //InitializeLogging();
        Poll();
    }
    catch (Exception ex)
    {
        Logger.Error(ex.Message.ToString());
    }
}


public void PollOnServiceStart()
{
    foreach (var handler in handlers)
    {
        ProducerConsumerQueue.EnqueueTask(handler.Execute,CancellationTokenSource.Token);
    }
}

解决方法

如果您想要一个线程安全计数器来检查活动任务的数量,您可以使用 int _counter 字段和 Interlocked.Increment(ref _counter)/Interlocked.Decrement(ref _counter)

请记住在 try 块中的 finally 块减量中的第一行递增,以便在引发异常时不会丢失任何调用。

https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked.increment?view=net-5.0

https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked.decrement?view=net-5.0

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...