使用Parallel.Invoke运行两个任务并添加超时,以防一个任务花费更长的时间

问题描述

我正在调用两个依赖于某些外部Web服务的函数。目前,它们并行运行,并且当它们都完成时,恢复执行。但是,如果外部服务器花费太多时间来处理请求,则可能会锁定我的代码一段时间。

我想添加一个超时时间,以便如果服务器需要10秒钟以上的时间来响应,则只需继续操作即可。这就是我所拥有的,如何添加超时时间?

Parallel.Invoke(

    () => FunctionThatCallsServer1(TheParameter),() => FunctionThatCallsServer2(TheParameter)            
);

RunThisFunctionNoMatterWhatAfter10Seconds();

解决方法

您将需要创建CancellationTokenSource的实例,并在创建时立即配置超时时间,例如

var cts = new CancellationTokenSource(timeout);

然后,您将需要创建ParallelOptions的实例,在其中将ParallelOptions.CancellationToken设置为CancellationTokenSource的令牌,例如

var options = new ParallelOptions {
    CancellationToken = cts.Token,};

然后,您可以使用选项和操作致电Parallel.Invoke

try 
{
    Parallel.Invoke(
        options,() => FunctionThatCallsServer1(token),() => FunctionThatCallsServer2(token)            
    );
}
catch (OperationCanceledException ex)
{
    // timeout reached
    Console.WriteLine("Timeout");
    throw;
}

但是您还需要将令牌交给被调用的Server函数,并且还要处理这些操作中的超时。

这是因为Parallel.Invoke仅在之前进行检查,如果它获得的令牌被取消,它将开始执行操作。这意味着,如果所有操作在超时发生之前开始,则Parallel.Invoke调用将在需要完成的操作被阻止。

更新

测试取消的一种好方法是定义FunctionThatCallsServer1

static void FunctionThatCallsServer1(CancellationToken token) {
    var endTime = DateTime.Now.AddSeconds(5);

    while (DateTime.Now < endTime) {
        token.ThrowIfCancellationRequested();

        Thread.Sleep(1);
    }
}
,

我认为没有一种简单的方法可以使Parallel超时,一旦功能启动,就调用它,很明显,它们将在十秒钟后完成。即使您取消,Parallel.Invoke也会等待功能完成,因此您必须找到一种尽早完成功能的方法。

但是,在幕后,Parallel.Invoke使用任务,如果直接使用任务而不是Parallel.Invoke,则可以提供超时。下面的代码显示了如何:

Task task1 = Task.Run(() => FunctionThatCallsServer1(TheParameter));
Task task2 = Task.Run(() => FunctionThatCallsServer2(TheParameter));
// 10000 is timeout in ms,allTasksCompleted is true if they completed,false if timed out
bool allTasksCompleted = Task.WaitAll(new[] { task1,task2 },10000);
RunThisFunctionNoMatterWhatAfter10Seconds();

此代码与Parallel.Invoke的一个细微差别是,如果您有非常大量的函数,那么Parallel.Invoke将更好地管理Task的创建,而不仅仅是像此处那样为每个函数盲目的创建Task。 Parallel.Invoke将创建有限数量的任务,并在功能完成时重新使用它们。只需如上所述几个函数调用就不会有问题。

,

下面是代码:

using System;
using System.Threading.Tasks;
namespace Algorithums
{
public class Program
{
    public static void Main(string[] args)
    {
        ParelleTasks();
        Console.WriteLine("Main");
        Console.ReadLine();
    }

    private static void ParelleTasks()
    {
        Task t = Task.Run(() => {
            FunctionThatCallsServers();
            Console.WriteLine("Task ended after 20 Seconds");
        });

        try
        {
            Console.WriteLine("About to wait for 10 sec completion of task {0}",t.Id);
            bool result = t.Wait(10000);
            Console.WriteLine("Wait completed normally: {0}",result);
            Console.WriteLine("The task status:  {0:G}",t.Status);
        }
        catch (OperationCanceledException e)
        {
            Console.WriteLine("Error: " + e.ToString());
           
        }

        RunThisFunctionNoMatterWhatAfter10Seconds();
    }


    private static bool FunctionThatCallsServers()
    {
        Parallel.Invoke(
                         () => FunctionThatCallsServer1(),() => FunctionThatCallsServer2()
                    );
        return true;
    }

    private static void FunctionThatCallsServer1()
    {
        System.Threading.Thread.Sleep(20000);
        Console.WriteLine("FunctionThatCallsServer1");
    }

    private static void FunctionThatCallsServer2()
    {
        System.Threading.Thread.Sleep(20000);
        Console.WriteLine("FunctionThatCallsServer2");
    }

    private static void RunThisFunctionNoMatterWhatAfter10Seconds()
    {
        Console.WriteLine("RunThisFunctionNoMatterWhatAfter10Seconds");
    }
}


}

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...