问题描述
我想在 C# 中并行运行 7 个线程\任务。在 7 个线程中,我希望在特定超时内列表中的第一个线程的结果可以说 200 毫秒 (第一个线程没有必要先完成,只是如果它失败或花费的时间超过所需的时间,我不需要等待任何其他任务完成)。如果我在 200 毫秒内没有得到列表中第一个线程的结果,或者如果结果为空\无效,我想中止或取消所有任务。如果第一个线程的结果在超时范围内并且有效,我想查看是否有任何其他任务已完成并给出有效结果。如果是,那么我想获取结果并取消\终止所有剩余的任务。我想出的使用 Task 类的代码如下:
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private async void btnStart_Click(object sender,EventArgs e)
{
TaskWrapper _taskWrapper = new TaskWrapper();
Stopwatch _st = new Stopwatch();
_st.Start();
Threadobject t = await _taskWrapper.DoTasks();
_st.Stop();
if (t != null)
{
txtResult.Text += t._id.ToString() + " and " + t._idSec
+ " is completed in " + _st.ElapsedMilliseconds + " ms."
+ Environment.NewLine;
}
else
{
txtResult.Text += "All Failed in " + _st.ElapsedMilliseconds + " ms."
+ Environment.NewLine;
}
//_taskWrapper = null;
//_taskWrapper.dispose();
}
}
public class TaskWrapper
{
Task<Threadobject> _t1;
Task<Threadobject> _t2;
Task<Threadobject> _t3;
Task<Threadobject> _t4;
Task<Threadobject> _t5;
Task<Threadobject> _t6;
Task<Threadobject> _t7;
Threadobject _tO1;
Threadobject _tO2;
Threadobject _tO3;
Threadobject _tO4;
Threadobject _tO5;
Threadobject _tO6;
Threadobject _tO7;
public async Task<Threadobject> DoTasks()
{
var _cancellationTokenSource = new CancellationTokenSource();
CancellationToken _cancellationToken = _cancellationTokenSource.Token;
//_cancellationToken.Register(() => { this.dispose(); });
_tO1 = new Threadobject(1);
_t1 = new Task<Threadobject>(() => _tO1.DoAction(50),_cancellationToken);
_tO2 = new Threadobject(2);
_t2 = new Task<Threadobject>(() => _tO2.DoAction(10),_cancellationToken);
_tO3 = new Threadobject(3);
_t3 = new Task<Threadobject>(() => _tO3.DoAction(30),_cancellationToken);
_tO4 = new Threadobject(4);
_t4 = new Task<Threadobject>(() => _tO4.DoAction(40),_cancellationToken);
_tO5 = new Threadobject(5);
_t5 = new Task<Threadobject>(() => _tO5.DoAction(60),_cancellationToken);
_tO6 = new Threadobject(6);
_t6 = new Task<Threadobject>(() => _tO6.DoAction(70),_cancellationToken);
_tO7 = new Threadobject(7);
_t7 = new Task<Threadobject>(() => _tO7.DoAction(200),_cancellationToken);
var _tasks = new List<Task<Threadobject>> { _t1,_t2,_t3,_t4,_t5,_t6,_t7,new Task<Threadobject>(() => { Thread.Sleep(1); return null; }) };
var _completedTasks = new List<Task<Threadobject>>();
var _mainTask = new List<Task<Threadobject>>();
_tasks.AsParallel().WithCancellation(_cancellationToken).ForAll(o => o.Start());
while (_tasks.Count > 0)
{
var completed = await Task.WhenAny(_tasks);
if (completed.Result._isComplete)
{
if (completed.Result._id == 1)
{
_mainTask.Add(completed);
}
else
{
_completedTasks.Add(completed);
}
if (_completedTasks.Count > 0 && _mainTask.Count > 0)
{
_mainTask[0].Result._idSec = _completedTasks[0].Result._id;
_cancellationTokenSource.Cancel();
return await _mainTask[0];
}
}
else
{
_tasks.Remove(completed);
}
}
return null;
}
}
public class Threadobject
{
public bool _isComplete = false;
public int _id;
public int _idSec;
private Thread thread;
//private static readonly object lockObj;
public Threadobject(int id)
{
_id = id;
}
public Threadobject DoAction(int _milliseconds)
{
Thread.Sleep(_milliseconds);
_isComplete = true;
File.AppendAllText("E:\\ThreadingDemo2_" + _id + ".txt",DateTime.Now.ToString("dd-MM-yyyy HH:mm:ss.fff") + " " + _id
+ " Processing finished." + Environment.NewLine);
return this;
}
}
我不确定如何在上述方法中实现超时逻辑。因此,我使用 Thread
类实现了另一个逻辑来实现如下相同:
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void btnStart_Click(object sender,EventArgs e)
{
ThreadWrapper threadWrapper = new ThreadWrapper();
Stopwatch _st = new Stopwatch();
_st.Start();
Threadobject TO = threadWrapper.DoTasks();
_st.Stop();
if (TO != null)
{
if (TO._idSec == 0)
txtResult.Text += TO._id.ToString() + " is completed in "
+ _st.ElapsedMilliseconds + " ms." + Environment.NewLine;
else
txtResult.Text += TO._id.ToString() + " and " + TO._idSec
+ " is completed in " + _st.ElapsedMilliseconds + " ms."
+ Environment.NewLine;
}
else
{
txtResult.Text += "All Failed in " + _st.ElapsedMilliseconds + " ms."
+ Environment.NewLine;
}
}
}
public class ThreadWrapper
{
Thread _t1;
Thread _t2;
Thread _t3;
Thread _t4;
Thread _t5;
Thread _t6;
Thread _t7;
Threadobject _tO1;
Threadobject _tO2;
Threadobject _tO3;
Threadobject _tO4;
Threadobject _tO5;
Threadobject _tO6;
Threadobject _tO7;
Threadobject mainThread;
int SecId = 0;
public Threadobject DoTasks()
{
_tO1 = new Threadobject(1);
//_tO1.TaskCompleted += TaskCompleted;
_t1 = new Thread(() => mainThread = _tO1.DoAction(50));
_tO2 = new Threadobject(2);
_tO2.TaskCompleted += TaskCompleted;
_t2 = new Thread(() => _tO2.DoAction(10));
_tO3 = new Threadobject(3);
_tO3.TaskCompleted += TaskCompleted;
_t3 = new Thread(() => _tO3.DoAction(30));
_tO4 = new Threadobject(4);
_tO4.TaskCompleted += TaskCompleted;
_t4 = new Thread(() => _tO4.DoAction(40));
_tO5 = new Threadobject(5);
_tO5.TaskCompleted += TaskCompleted;
_t5 = new Thread(() => _tO5.DoAction(60));
_tO6 = new Threadobject(6);
_tO6.TaskCompleted += TaskCompleted;
_t6 = new Thread(() => _tO6.DoAction(70));
_tO7 = new Threadobject(7);
_tO7.TaskCompleted += TaskCompleted;
_t7 = new Thread(() => _tO7.DoAction(20));
List<Thread> threads = new List<Thread> { _t1,_t7 };
threads.AsParallel<Thread>().ForAll<Thread>(o => o.Start());
_t1.Join(200);
Task.Run(() =>
threads.AsParallel<Thread>().ForAll<Thread>(o => o.Abort()));
if (mainThread != null)
{
mainThread._idSec = SecId;
}
return mainThread;
}
private void TaskCompleted(int id,int idSec,bool isCompleted)
{
if (SecId == 0)
SecId = id;
}
}
public class Threadobject
{
public bool _isComplete = false;
public int _id;
public int _idSec;
public delegate void NotifyTaskCompletion(int id,bool isCompleted);
public event NotifyTaskCompletion TaskCompleted;
private static readonly object lockObj = 1;
//private static readonly object lockObj;
public Threadobject(int id)
{
_id = id;
}
public Threadobject DoAction(int _milliseconds)
{
Thread.Sleep(_milliseconds);
_isComplete = true;
//lock (lockObj)
//{
// File.AppendAllText("E:\\ThreadingDemo3_" + _id + ".txt"
//,DateTime.Now.ToString("dd-MM-yyyy HH:mm:ss.fff") + " " + _id
// + " Processing finished." + Environment.NewLine);
//}
TaskCompleted?.Invoke(_id,_idSec,_isComplete);
return this;
}
}
我之前使用 Parallel.Invoke
方法并行执行任务,但最近我注意到 Parallel.Invoke
实际上并没有并行处理任务,因此最终占用了不必要的时间。
现在到我的实际问题,是的,有多个,如下:
- 有没有办法在我的并行任务中添加超时?我已经知道一种方法,我可以在并行运行的任务集合中添加一个任务,该任务将暂停 200 毫秒或我想要的任何超时,然后返回 null。 (因为我使用
WhenAny
来检查完成,控制将返回给调用 UI 线程)。但这对我来说就像是在作弊,所以我想知道是否还有其他方法。 - 在这种情况下,当我从类创建
Task
时,TaskWrapper
会将TaskWrapper
对象处理或设置为 null 中止或销毁使用该对象创建/运行的任务吗? - 同样,对于线程,如果我在这种情况下从类内部创建了一个线程\线程
ThreadWrapper
,会将ThreadWrapper
对象处理或设置为空中止或销毁创建的线程\运行使用那个对象?
编辑:清除评论中提到的混淆。
解决方法
简单的方法是使用 CancelAfter 方法在 200 毫秒后取消取消令牌。另一种选择是运行 await Task.Delay(200)
。我还建议使用 Parallel.For
,因为这将是更紧凑的代码:
public static void CancelParallel()
{
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(200));
var po = new ParallelOptions()
{
CancellationToken = cts.Token
};
void ParallelMethod(int iterationIndex,ParallelLoopState state)
{
// Do whatever
// if the iteration is long running you can check if it should exit
if (state.ShouldExitCurrentIteration)
{
return;
}
}
var loopResult = Parallel.For(0,7,po,ParallelMethod);
if (loopResult.IsCompleted)
{
// Do something if all threads completed
}
else
{
// Do something else
}
}
如果你只关心第一次迭代的成功,如果成功,设置一些标志并检查它而不是 loopResult。
请注意,Parallel.For 将在令牌取消时停止运行新的迭代,但会让现有迭代运行完成,因此您可能希望也可能不想检查迭代中的令牌