立即捕获异常不适用于Task.WhenAll

问题描述

我有一个类的两个实例,这些实例创建一个UDP套接字以从UDP客户端接收数据。如果其中一个实例引发异常,我想立即在更高的层中处理它。在我的程序中,它们以await Task.WhenAll(recv1.StartAsync(),recv2.StartAsync)开始。但是,这将等待所有任务完成,然后引发第一个异常。关于如何解决此问题的任何想法?

static async Task Main(string[] args)
{
  var udpReceiver1 = new UdpReceiver(localEndpoint1);
  var udpReceiver2 = new UdpReceiver(localEndpoint2);

  var cts = new CancellationTokenSource();

  try
  {
    await Task.WhenAll(udpReceiver1.StartAsync(cts.Token),udpReceiver2.StartAsync(cts.Token));
  }
  catch (Exception e)
  {
    // Handle Exception...

    cts.Cancel();
  }
}

class UdpReceiver
{
  public UdpReceiver(IPEndPoint endpoint)
  {
    udpClient = new UdpClient(endpoint);
  }

  public async Task StartAsync(CancellationToken cancellationToken)
  {
    try
    {
      while (!cancellationToken.IsCancellationRequested)
      {
        var result = await ReceiveAsync(cancellationToken);
        var message = Encoding.UTF8.GetString(result.Buffer);
        Trace.WriteLine($"UdpClient1 received message:{Encoding.UTF8.GetString(result.Buffer)}");
      
        // throw new Exception("UdpClient1 raising exception");
      }
    }
  }

  private async Task<UdpReceiveResult> ReceiveAsync(CancellationToken cancellationToken)
  {
    var tcs = new taskcompletionsource<UdpReceiveResult>();
    using (cancellationToken.Register(() => tcs.TrySetCanceled(),false))
    {
      var task = udpClient.ReceiveAsync();

      var completedTask = await Task.WhenAny(task,tcs.Task);

      var result = await completedTask.ConfigureAwait(false);

      return result;
    }
  }

  private UdpClient udpClient;
}

更新1:任务。何时实施都是可行的解决方案。谢谢@CamiloTerevinto

try
{
  await await Task.WhenAny(udpReceiver1.StartAsync(cts.Token),udpReceiver2.StartAsync(cts.Token));
}
catch (Exception e)
{
  // Handle Exception...

  cts.Cancel();
}

更新2:为了更精细地处理所有任务,我将使用@Servy提出的我自己改编的Task.WhenAll实现。

解决方法

该行为与框架WhenAll实现的差异足够大,您最好只编写自己的适应版本,幸运的是,实现起来并不是特别困难。只需对每个任务附加延续,如果任何任务被取消或出错,则结果任务执行相同的操作,如果成功存储结果,并且如果最后一个任务成功,则用所有存储的结果完成任务

public static Task<IEnumerable<TResult>> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks)
{
    var listOfTasks = tasks.ToList();
    if (listOfTasks.Count == 0)
    {
        return Task.FromResult(Enumerable.Empty<TResult>());
    }
    var tcs = new TaskCompletionSource<IEnumerable<TResult>>();
    var results = new TResult[listOfTasks.Count];
    int completedTasks = 0;
    for (int i = 0; i < listOfTasks.Count; i++)
    {
        int taskIndex = i;
        Task<TResult> task = listOfTasks[i];
        task.ContinueWith(_ =>
        {
            if (task.IsCanceled)
                tcs.TrySetCanceled();
            else if (task.IsFaulted)
                tcs.TrySetException(task.Exception.InnerExceptions);
            else
            {
                results[taskIndex] = task.Result;
                if (Interlocked.Increment(ref completedTasks) == listOfTasks.Count)
                {
                    tcs.TrySetResult(results);
                }
            }
        });
    }
    return tcs.Task;
}

与许多基于任务的通用操作一样,您还需要一个没有结果的版本,并且如果您不想处理显着的开销,则确实需要只复制粘贴基于结果的方法,但是所有的结果都破译了,这并不难,只是微不足道。将所有这些任务转换成具有结果的任务也可以,但是对于这样的操作,开销可能是有问题的。

public static Task WhenAll(IEnumerable<Task> tasks)
{
    var listOfTasks = tasks.ToList();
    if (listOfTasks.Count == 0)
    {
        return Task.CompletedTask;
    }
    var tcs = new TaskCompletionSource<bool>();
    int completedTasks = 0;
    for (int i = 0; i < listOfTasks.Count; i++)
    {
        int taskIndex = i;
        Task task = listOfTasks[i];
        task.ContinueWith(_ =>
        {
            if (task.IsCanceled)
                tcs.TrySetCanceled();
            else if (task.IsFaulted)
                tcs.TrySetException(task.Exception.InnerExceptions);
            else
            {
                if (Interlocked.Increment(ref completedTasks) == listOfTasks.Count)
                {
                    tcs.TrySetResult(true);
                }
            }
        });
    }
    return tcs.Task;
}
,

也许有一种方法可以做到,但是如果不使您的代码变得凌乱,我想不出来。最好在实际任务中处理异常。如果需要使用通用代码进行处理,请使用处理程序委托。

static async Task Main(string[] args)
{
    var cts = new CancellationTokenSource();

    //This is our common error handler
    void HandleException(Exception ex)
    {
        Log("Exception!" + ex.Message);
        cts.Cancel();
    }

    var udpReceiver1 = new UdpReceiver(localEndpoint1);
    var udpReceiver2 = new UdpReceiver(localEndpoint1);

    //We pass the handler as one of the arguments
    await Task.WhenAll(udpReceiver1.StartAsync(cts.Token,HandleException),udpReceiver2.StartAsync(cts.Token,HandleException));
}

class UdpReceiver
{
  public async Task StartAsync(CancellationToken cancellationToken,Action<Exception> errorHandler)
  {
      try
      {
          while (!cancellationToken.IsCancellationRequested)
          {
              //Main logic goes here
          }
      }
      catch(Exception ex)
      {
          errorHandler(ex);  //Call common error handling code
      }
  }
,

您可以分两步等待任务。第一步,等待其中的任何人完成,并在失败的情况下启动取消。在此步骤中不要处理异常。等待所有任务完成后,将异常处理延迟到第二步。这两个任务可能都失败了,所以您可能要分别处理每个例外。

Task task1 = udpReceiver1.StartAsync(cts.Token);
Task task2 = udpReceiver2.StartAsync(cts.Token);

// Await any task to complete
Task firstCompletedTask = await Task.WhenAny(task1,task2);
if (firstCompletedTask.IsFaulted) cts.Cancel();

try
{
    // Await them all to complete
    await Task.WhenAll(task1,task2);
}
catch
{
    if (task1.IsFaulted) HandleException(task1.Exception.InnerException);
    if (task2.IsFaulted) HandleException(task2.Exception.InnerException);
}