问题描述
当我们在总线上放置许多消息,并且它们在业务逻辑中调用流程时,就会发生此错误:
在从服务器获取连接之前经过的超时时间 池。这可能是因为所有池化连接都在 使用和达到最大池大小。
当有15条消息调用我们的流程时,不会发生这种情况。但是,当调用80或130个进程时,确实会发生这种情况。
我们正在使用工作单元模式,并且连接在使用后被关闭。因此,我很难理解为什么下一个过程无法在池中使用它。
这是在我们的应用中使用工作单元的方式:
using (var uow = _uowFactory.Create(true))
{
await uow.AccrualRepo.AddAccrualHistoriesAsync(histories);
await uow.CommitAsync();
}
这是工厂返回欠款的方式:
public class UnitOfWorkFactory : IUnitOfWorkFactory
{
private readonly IConfiguration _configuration;
private readonly IMediator _mediator;
private readonly IStateAccessor _stateAccessor;
private readonly ITimeProvider _timeProvider;
private readonly IDbConnection _connection;
private readonly IAccrualMapper _accrualMapper;
private readonly ILogger<RepoBase> _logger;
public UnitOfWorkFactory(IConfiguration configuration,IDbConnection sqlConnection,IMediator mediator,IStateAccessor stateAccessor,ITimeProvider timeProvider,IAccrualMapper accrualMapper,ILogger<RepoBase> logger)
{
_configuration = configuration;
_mediator = mediator;
_stateAccessor = stateAccessor;
_timeProvider = timeProvider;
_connection = sqlConnection;
_accrualMapper = accrualMapper;
_logger = logger;
}
public IUnitOfWork Create(bool useTransaction)
{
return new UnitOfWork(_configuration,_connection,_mediator,_stateAccessor,_timeProvider,_accrualMapper,_logger,useTransaction);
}
我们的Startup.cs
文件通过以下方式设置了依赖项注入:
services.AddTransient<IUnitOfWorkFactory,UnitOfWorkFactory>();
services.AddTransient<IDbConnection,sqlConnection>();
我现在有很多代码,但是我们的样子如下。请注意,在调用CommitAsync()
之后和进行处置时,连接已关闭。
public class UnitOfWork : IUnitOfWork,Idisposable
{
private readonly IConfiguration _configuration;
private readonly IMediator _mediator;
private readonly IStateAccessor _stateAccessor;
private readonly ITimeProvider _timeProvider;
private readonly IAccrualMapper _accrualMapper;
private readonly ILogger<RepoBase> _logger;
private IDbConnection _connection;
private IDbTransaction _transaction;
private IAccrualRepo _accrualRepo;
private bool _disposed;
private bool _commitOccurred;
private bool _useTransaction;
public UnitOfWork(IConfiguration configuration,ILogger<RepoBase> logger,bool useTransaction = true)
{
_configuration = configuration;
_mediator = mediator;
_stateAccessor = stateAccessor;
_timeProvider = timeProvider;
_useTransaction = useTransaction;
_accrualMapper = accrualMapper;
_logger = logger;
_connection = sqlConnection;
_connection.ConnectionString = _configuration["ConnectionString"];
_connection.open();
if (useTransaction)
{
_transaction = _connection.BeginTransaction();
}
}
public IAccrualRepo AccrualRepo
{
get => _accrualRepo ?? (_accrualRepo = new AccrualRepo(_configuration,_transaction,_logger));
set => _accrualRepo = value;
}
public async Task CommitAsync()
{
if (!_useTransaction)
{
throw new InvalidOperationException("Attempting to call commit on a unit of work that isn't using a transaction");
}
try
{
_transaction.Commit();
_commitOccurred = true;
await InvokePostCommitOnReposAsync();
}
catch
{
_transaction.Rollback();
throw;
}
finally
{
_connection.Close();
_transaction.dispose();
ResetRepositories();
}
}
private async Task InvokePostCommitOnReposAsync()
{
var repos = new List<RepoBase>();
if (_accrualRepo != null) { repos.Add((RepoBase)_accrualRepo); }
try
{
foreach (var repo in repos)
{
await repo.PostCommitAsync();
}
}
catch (Exception ex)
{
_logger.LogError(ex,"Exception occurred while invoking post commit on a repo.");
}
}
private void ResetRepositories()
{
_accrualRepo = null; // Note: there are more repos here,but removed for clarity.
}
public void dispose()
{
dispose(true);
GC.SuppressFinalize(this); // Already disposed; no need for the GC to finalize.
}
protected virtual void dispose(bool calledFromdisposeAndNotFromFinalizer)
{
if (_disposed) { return; }
if (calledFromdisposeAndNotFromFinalizer)
{
// If the user never called commit,but we are using a transaction,then roll back.
if (!_commitOccurred && _useTransaction && _transaction != null) { _transaction.Rollback(); }
if (_transaction != null) { _transaction.dispose(); _transaction = null; }
if (_connection != null) { _connection.dispose(); _connection = null; }
}
_disposed = true;
}
}
那么为什么会有连接池问题呢?这里做错了什么吗?也许我们需要增加连接池的大小?
解决方法
连接池大小是允许的同时连接数。例如,对于SQL Server,默认值为100。如果一次连接的数量超过该数量,则必须等待先前的连接关闭。
如果您有很多消息可以一次发送,建议您增加连接池的大小。
https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql-server-connection-pooling
但是,如果您发现一定时间的运行时间后收到此消息。可能是因为代码有问题,并且某些连接没有关闭。