如何在并行/多线程中使用dapper从sql数据库中获取数据?

问题描述

我正在尝试使用dapper从sql服务器获取数据。我需要导出存储在Azure sql数据库中的460K记录。我决定分批获取数据,因此每批获得1万条记录。我计划在Parallel中获取记录,因此我将异步方法添加到任务列表中并做了Task.WhenAll。当我在本地运行时,代码运行良好,但是在部署到k8s集群后,出现了某些记录的数据读取问题。我是多线程技术的新手,我不怎么处理这个问题。我尝试在该方法中进行锁定,但是系统崩溃了,下面是我的代码,该代码可能很笨拙,因为我正在尝试许多解决此问题的解决方案。

for (int i = 0; i < numberOfPages; i++)
{
  tableviewWithCondition.startRow = startRow;
  resultData.Add(_tableviewRepository.GetTableviewRowsByPagination(tableviewExportCondition.TableviewName,modelMappingGroups,tableviewWithCondition.startRow,builder,pageSize,appName,i));
  startRow += tableviewWithCondition.pageSize;
}

foreach(var task in resultData)
{ 
   if (task != null)
   {
      dataToExport.AddRange(task.Result);
   }
}

这是我使用dapper从Azure sql数据库获取数据的方法

public async Task<(IEnumerable<int> unprocessedData,IEnumerable<dynamic> rowData)> GetTableviewRowsByPagination(string tableName,IEnumerable<MappingGroup> tableviewAttributeDetails,int startRow,sqlBuilder builder,int pageSize = 100,AppNameEnum appName = AppNameEnum.OptiSoil,int taskNumber = 1)
        {
            var _unitOfWork = _unitOfWorkServices.Build(appName.ToString());
            List<int> unprocessedData = new List<int>();
            try
            {
                var columns = tableviewAttributeDetails.Select(c => { return $"{c.mapping_group_value} [{c.attribute}]"; });
                var joinedColumn = string.Join(",",columns);
                builder.Select(joinedColumn);
                var selector = builder.AddTemplate($"SELECT /**select**/ FROM {tableName} with (nolock) /**innerjoin**/ /**where**/ /**orderby**/ OFFSET {startRow} ROWS FETCH NEXT {(pageSize == 0 ? 100 : pageSize)} ROWS ONLY");

                using (var connection = _unitOfWork.Connection)
                {
                    connection.open();
                    var data = await connection.QueryAsync(selector.Rawsql,selector.Parameters);
                    Console.WriteLine($"data completed for task{taskNumber}");
                    return (unprocessedData,data);
                }
            }
            catch(Exception ex)
            {
                Console.WriteLine($"Exception: {ex.Message}");
                if (ex.InnerException != null)
                    Console.WriteLine($"InnerException: {ex.InnerException.Message}");

                Console.WriteLine($"Error in fetching from row {startRow}");
                unprocessedData.Add(startRow);
                return (unprocessedData,null);
            }
            finally
            {
                _unitOfWork.dispose();
            }
        }

上面的代码在本地工作正常,但是在服务器中出现以下问题。

异常:将请求发送到服务器时,发生了传输级错误。 (提供者:TCP提供程序,错误:35-捕获到内部异常)。

InnerException:当另一个写操作挂起时,无法调用WriteAsync方法

在并行任务中获取数据时如何避免此问题?

解决方法

您正在使用相同的连接并尝试在其上执行多个命令(由于命名原因,我假设是这样),还应该布置工作单元吗?

而不是:

 using (var connection = _unitOfWork.Connection)
                {
                    connection.Open();
                    var data = await connection.QueryAsync(selector.RawSql,selector.Parameters);
                    Console.WriteLine($"data completed for task{taskNumber}");
                    return (unprocessedData,data);
                }

为每个项目创建一个新的连接(如果您确实要这样做)。我想,这是有根据的猜测,由于时机原因,它在本地运行。

也可以查看Task.WhenAll,这是一种收集所有结果的更好方法。而不是:

foreach(var task in resultData)
{ 
   if (task != null)
   {
      dataToExport.AddRange(task.Result);
   }
}

在任务上调用结果通常是不好的做法。