问题描述
我是发送到 api 的对象列表。我在下面使用了并行线程。
代码
List<object> data ;//contain data
result =new Dictionary<decimal,object>();
var threadCount = 4;
if (data.Count < threadCount)
{
threadCount = data.Count;
}
var pageSize = threadCount > 0 ? Convert.ToInt32(Math.Ceiling((Convert.ToDecimal(data.Count) / threadCount))) : 0;
var pageCount = threadCount;
for (int j = 0; j < threadCount; j++)
{
var temp = data.Skip(j * pageSize).Take(pageSize).ToList();
var tempLength = temp?.Count;
Parallel.ForEach(temp,item =>
{
result.Add(item.ID,null);
//call Api and get resultApi
if (resultApi != null && resultApi.Result != null)
{
result[item.ID] = resultApi.Result.id;
}
else if (resultApi != null && resultApi .Message != null)
{
result[item.ID] = null;
}
else
{
result[item.ID] = null;
}
});
}
问题
在顶部的结束操作中,当检查结果时,我看到一些项目与其 ID 无关并且已被移动。如果退出并行模式时没有移动,则所有标识符都设置正确
如何解决问题?
解决方法
我的建议是使用 PLINQ 而不是 Parallel
类。对于入门级多线程来说,它是一种更安全的工具。 PLINQ 类似于 LINQ,但它以 .AsParallel()
开头。它包括几乎所有熟悉的 LINQ 运算符,如 Select
、Where
、Take
、ToList
等。
Dictionary<decimal,object> dictionary = data
.AsParallel()
.WithDegreeOfParallelism(4)
.Cast<Item>()
.Select(item => (item.ID,CallAPI(item).Result))
.Where(entry => entry.Result != null)
.ToDictionary(entry => entry.ID,entry => (object)entry.Result.Message);
假设 CallAPI
方法具有以下签名:Task<APIResult> CallAPI(Item item);
这个 PLINQ 查询将以 4 的并发级别处理您的数据。这意味着 4 个操作将同时进行,当一项完成后,下一项将自动启动。
ID
应该是唯一的,否则 ToDictionary
运算符将抛出异常。
建议使用这种方法是因为它的简单性,而不是它的效率。 PLINQ 并不是真正用于处理 I/O 绑定的工作负载,并且在这样做时会不必要地阻塞 ThreadPool
线程。您可以查看 here 以了解更有效的方法来限制异步 I/O 绑定操作。
我已经找到了答案。如下。
代码
List<object> data ;//contain data
result =new Dictionary<decimal,object>();
var threadCount = 4;
if (data.Count < threadCount)
{
threadCount = data.Count;
}
var pageSize = threadCount > 0 ? Convert.ToInt32(Math.Ceiling((Convert.ToDecimal(data.Count) / threadCount))) : 0;
var pageCount = threadCount;
for (int j = 0; j < threadCount; j++)
{
var temp = data.Skip(j * pageSize).Take(pageSize).ToList();
var tempLength = temp?.Count;
Parallel.ForEach(temp,item =>
{
lock (result)
{
var temp = callapi(item.ID);
result.Add(temp.Item1,temp.Item2);
}
});
}
private (decimal,object) callapi(decimal id){
//call Api and get resultApi
if (resultApi != null && resultApi.Result != null)
{
result[item.ID] = resultApi.Result.id;
return (id,result);
}
else if (resultApi != null && resultApi .Message != null)
{
result[item.ID] = null;
return (id,result);
}
else
{
result[item.ID] = null;
return (id,result);
}
}