_raise_connection_failure中的AutoReconnectpymongo.pool

问题描述

我使用python mongo驱动程序连接到MongoDB。尽管它也可以正常执行其他任务,但过了一段时间,我还是遇到了以下例外:

其他信息:

  • mongodb具有三个分片群集
  • bulk_write 方法中引发的异常
AutoReconnectpymongo.pool in _raise_connection_failure error
mongo-router-3:27017: [Errno 104] Connection reset by peer

这是我的代码

import mongoengine
from celery.schedules import crontab
from celery.task import Task,PeriodicTask
from pymongo import UpdateOne


def chunker(array,n=10000):
    for i in range(0,len(array),n):
        yield array[i:i + n]


class StatCalculator(Task):
    name = "StatCalculator"
    routing_key = 'my-routing-key'
    soft_time_limit = 3600 * 2
    time_limit = 3600 * 2
    max_retries = 1

    def __init__(self):
        self.col = None

        super(StatCalculator,self).__init__()

    def run(self,ids):
        self._initialize()
        self._process(ids)
        self._persist()
        self._finalize()

    def _initialize(self):
        self.col = mongoengine.connection._get_db().my_collection
        self.calculated_times = {}

    def _process(self,ids):
        """
        This method fetch the items and append them to `calculated_times`

        @param ids:
        @return:
        """
        pass

    def _persist(self):
        bulk_ops = []

        for key,stat in self.calculated_times.items():
            bulk_ops.append(UpdateOne(
                {'some_id': key['some_id'],'created_at': self.Now},{'$set': stat},upsert=True
            ))

        if bulk_ops:
            # raise AutoReconnectpymongo.pool in _raise_connection_failure
            # mongo-router:27017: [Errno 104] Connection reset by peer
            self.col.bulk_write(bulk_ops,ordered=False)

    def _finalize(self):
        pass


class PeriodicStatCalculator(PeriodicTask):
    name = "PeriodicStatCalculator"
    run_every = crontab(hour="3",minute="0")
    routing_key = 'my-routing-key'
    soft_time_limit = 3600 * 1
    time_limit = 3600 * 1
    max_retries = 1

    def run(self,*args,**kwargs):
        ids = []  # a list of some ids

        for chunk in chunker(ids):
            StatCalculator().delay(chunk)

定期任务每天运行,并从Postgres表中获取一些ID, 然后定期任务将调用一个celery任务,以分别获取,处理和存储一些信息到传递的ID。

最后,处理后的信息将通过 bulk_write 方法

存储在Mongo集合中

环境:

  • django-rest-framework-mongoengine == 3.4.1

  • mongoengine == 0.20.0

  • pymongo == 3.7.0

  • 芹菜== 4.3.0rc1

  • django-celery == 3.3.1

问题:

  • 为什么会有这个例外?
  • 我如何克服这个例外?

解决方法

可能有多种原因:

  • 应用程序和数据库之间的网络不可靠。将应用程序移近数据库,使数据库移近应用程序,或者寻求更好的服务提供商。
  • 您正在使用具有较高TCP Keepalive设置的旧驱动程序。升级到使用较低设置的最新驱动程序。
  • 服务器资源不足,并中止操作和/或关闭连接。查看服务器日志以获取线索。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...