反应性Mongo:已超过最大操作数maxQueueWaitSize500

问题描述

我正在使用ReactiveMongoTemplate MongoDB ChangeStream来监听对MongoDB集合的更新,执行一些查询,并将文档持久保存到另一个集合。 虽然它在本地运行良好,但是在部署到容量很大的UAT之后,它开始出现以下错误:

Too many operations are already waiting for a collection. Max number of operations (maxWaitQueueSize) of 500 has been exceeded.

有什么方法可以解决这个问题?

在application.yml文件中有以下内容

spring:
   data: 
      mongodb:
          uri: mongodb://host:port/db?authMechanism=<val1>&authSource=<val2>&authechanismProperties=<val3>

这是简化的更改流媒体的外观:

@Autowired
ReactiveMongoTemplate reactiveMongoTemplate;

reactiveMongoTemplate
  .changeStream(Sample.class)
  .watchCollection("sample_collection")
  .filter(
     new Criteria.orOperator(
        where("operationType").is("update"),where("operationType").is("insert")
     )
  )
  .listen()
  .flatMap(r->processMessage(r)). // processMessage does some queries to collections including this collection being listened to and upserts to same mongodb in a different collection
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe();

我知道我可能需要添加一些连接池才能处理多个连接?但是,如何使用Reactive MongoDB进行配置? 我是反应式编程的新手。任何指针都将非常有帮助。

解决方法

您可以在这里做几件事:

  1. 检查是否进行了长时间的阻塞调用,从而导致线程被阻塞,并导致创建大量连接,因为先前的连接仍被占用以执行繁重的任务。尝试检查阻止这些调用的代码是否有优化。 在反应式编程中,您可以使用BlockHound检查阻塞代码的存在。

  2. 通过指定waitQueueMultiplemaxPoolSize来增加连接限制- https://docs.mongodb.com/manual/reference/connection-string/#connection-pool-options

在此之前,您可以使用以下方法检查mongo数据库的统计信息,以查看当前和允许的连接

db.serverStatus().connections

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...