问题描述
我正在尝试使用dapper从sql服务器获取数据。我需要导出存储在Azure sql数据库中的460K记录。我决定分批获取数据,因此每批获得1万条记录。我计划在Parallel中获取记录,因此我将异步方法添加到任务列表中并做了Task.WhenAll
。当我在本地运行时,代码运行良好,但是在部署到k8s集群后,出现了某些记录的数据读取问题。我是多线程技术的新手,我不怎么处理这个问题。我尝试在该方法中进行锁定,但是系统崩溃了,下面是我的代码,该代码可能很笨拙,因为我正在尝试许多解决此问题的解决方案。
for (int i = 0; i < numberOfPages; i++)
{
tableviewWithCondition.startRow = startRow;
resultData.Add(_tableviewRepository.GetTableviewRowsByPagination(tableviewExportCondition.TableviewName,modelMappingGroups,tableviewWithCondition.startRow,builder,pageSize,appName,i));
startRow += tableviewWithCondition.pageSize;
}
foreach(var task in resultData)
{
if (task != null)
{
dataToExport.AddRange(task.Result);
}
}
这是我使用dapper从Azure sql数据库中获取数据的方法。
public async Task<(IEnumerable<int> unprocessedData,IEnumerable<dynamic> rowData)> GetTableviewRowsByPagination(string tableName,IEnumerable<MappingGroup> tableviewAttributeDetails,int startRow,sqlBuilder builder,int pageSize = 100,AppNameEnum appName = AppNameEnum.OptiSoil,int taskNumber = 1)
{
var _unitOfWork = _unitOfWorkServices.Build(appName.ToString());
List<int> unprocessedData = new List<int>();
try
{
var columns = tableviewAttributeDetails.Select(c => { return $"{c.mapping_group_value} [{c.attribute}]"; });
var joinedColumn = string.Join(",",columns);
builder.Select(joinedColumn);
var selector = builder.AddTemplate($"SELECT /**select**/ FROM {tableName} with (nolock) /**innerjoin**/ /**where**/ /**orderby**/ OFFSET {startRow} ROWS FETCH NEXT {(pageSize == 0 ? 100 : pageSize)} ROWS ONLY");
using (var connection = _unitOfWork.Connection)
{
connection.open();
var data = await connection.QueryAsync(selector.Rawsql,selector.Parameters);
Console.WriteLine($"data completed for task{taskNumber}");
return (unprocessedData,data);
}
}
catch(Exception ex)
{
Console.WriteLine($"Exception: {ex.Message}");
if (ex.InnerException != null)
Console.WriteLine($"InnerException: {ex.InnerException.Message}");
Console.WriteLine($"Error in fetching from row {startRow}");
unprocessedData.Add(startRow);
return (unprocessedData,null);
}
finally
{
_unitOfWork.dispose();
}
}
上面的代码在本地工作正常,但是在服务器中出现以下问题。
在并行任务中获取数据时如何避免此问题?
解决方法
您正在使用相同的连接并尝试在其上执行多个命令(由于命名原因,我假设是这样),还应该布置工作单元吗?
而不是:
using (var connection = _unitOfWork.Connection)
{
connection.Open();
var data = await connection.QueryAsync(selector.RawSql,selector.Parameters);
Console.WriteLine($"data completed for task{taskNumber}");
return (unprocessedData,data);
}
为每个项目创建一个新的连接(如果您确实要这样做)。我想,这是有根据的猜测,由于时机原因,它在本地运行。
也可以查看Task.WhenAll,这是一种收集所有结果的更好方法。而不是:
foreach(var task in resultData)
{
if (task != null)
{
dataToExport.AddRange(task.Result);
}
}
在任务上调用结果通常是不好的做法。