`sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL SYSCALL 错误:在 Ubuntu 上检测到 EOF,但在 Mac OS 上没有

问题描述

我正在使用 python3.7asynciomultiprocessing 中编写一个简单的网络爬虫。总体架构如下所示:

for i in range(self.number_processes - len(processes)):
    p = Process(target=AsyncProcessWrapper().run_main_loop)
    time.sleep(0.3)
    p.start()
    start_time = time.time()

其中 AsyncProcessWrapper 定义为:

class AsyncProcessWrapper:

    def __init__(self):
        resource_database = Database()
        # This is where the async logic takes place,following a producer-consumer pattern.
        # I will hide this logic for simplicity.
        self.main = Main(database=resource_database)

    def run(self):
        asyncio.run(self.main.run_main_loop())

数据库连接建立一次,永不关闭(因为这是一个scraper,下载数据需要一段时间)。

数据库类init如下所示:

class Database:

    def __init__(self):
        db_url = os.getenv('DatabaseUrl')
        self.engine = create_engine(db_url,encoding='utf8',poolclass=NullPool)

        Session = sessionmaker()
        Session.configure(bind=self.engine)
        self.session = Session()

    def create_url_entity(self,urls):

        for url in urls:
            url_entity_obj = URLEntity(
                url=url,engine_version=self.engine_version
            )
            to_insert.append(url_entity_obj)

        self.session.bulk_save_objects(to_insert)

对于 processes=2(或任何更高的数字),当我在具有 2 个物理内核的 MacOS 上运行上述代码片段时,我没有收到任何错误。 但是,当我在具有 8 个物理内核的 Ubuntu 18.05 机器上设置 processes=2 时,我不断收到以下错误

david@bob:~$ screen -r myproject

    cursor,statement,parameters,context
  File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/default.py",line 593,in do_execute
    cursor.execute(statement,parameters)
psycopg2.OperationalError: SSL SYSCALL error: EOF detected


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/process.py",line 297,in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py",line 99,in run
    self._target(*self._args,**self._kwargs)
  File "/home/david/myproject/myproject/engine/core.py",line 25,in run_main_loop
    asyncio.run(self.main.run_main_loop())
  File "/usr/lib/python3.7/asyncio/runners.py",line 43,in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.7/asyncio/base_events.py",line 579,in run_until_complete
    return future.result()
  File "/home/david/myproject/myproject/core/main.py",line 150,in run_main_loop
    self.resource_database.create_markup_record(self.buffer_markup_records)
  File "/home/david/myproject/myproject/resources/db.py",line 281,in create_markup_record
    .join(RawMarkup,isouter=True) \
  File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/orm/query.py",line 3373,in all
    return list(self)
  File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/orm/query.py",line 3535,in __iter__
    return self._execute_and_instances(context)
  File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/orm/query.py",line 3560,in _execute_and_instances
    result = conn.execute(querycontext.statement,self._params)
  File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py",line 1011,in execute
    return meth(self,multiparams,params)
  File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/sql/elements.py",line 298,in _execute_on_connection
    return connection._execute_clauseelement(self,params)
  File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py",line 1130,in _execute_clauseelement
    distilled_params,File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py",line 1317,in _execute_context
    e,cursor,context
  File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py",line 1511,in _handle_dbapi_exception
    sqlalchemy_exception,with_traceback=exc_info[2],from_=e
  File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/util/compat.py",line 182,in raise_
    raise exception
  File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py",line 1277,in _execute_context
    cursor,parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[sql: SELECT url.id AS url_id,url.url AS url_url,raw_markup.id AS raw_markup_id 
FROM url LEFT OUTER JOIN raw_markup ON url.id = raw_markup.url_id 
WHERE url.url IN (%(url_1)s,%(url_2)s,%(url_3)s,%(url_4)s,%(url_5)s,%(url_6)s,%(url_7)s,%(url_8)s,%(url_9)s,%(url_10)s,%(url_11)s,%(url_12)s,%(url_13)s,%(url_14)s,%(url_15)s,%(url_16)s,%(url_17)s,%(url_18)s)]
[parameters: {'url_1': 'placeholder_string_1','url_2': 'placeholder_string_2','url_3': 'placeholder_string_3',...'url_18': 'placeholder_string_18'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)

我的 Ubuntu 上的数据库要大得多(ubuntu 机器在其最大的表中有 34'744'900 行,而我的本地机器有大约 1'000'000 行)。

附加信息:

  • 我正在我的 mac(版本 10.15)上对我的应用程序进行原型设计。然后我上传我的代码以在集群上执行。现在,两台机器的 Postgres 版本是等效的(Postgres 10)
  • 我将集群的 Postgres 版本升级到 12。这没有任何改变。
  • 我尝试更频繁地提交,但这并没有改变任何事情。
  • 当我生成更多进程(即 processes=7)时,错误不断增加
  • 我使用 sqlalchemy==1.3
  • 增加友好度 nice -n 17 python -m run 不会改变任何东西(感觉上会让异步工作线程运行得更快)。
  • 我在某处读到这可能是由于 libpq 在 Ubuntu 上表现得很奇怪。如果这可能导致它,有什么想法吗?

知道什么可能导致此错误,以及我可以尝试解决什么问题吗?


编辑:

我刚刚在 MacOS 上使用 bash 和以下命令启动了两个 (无多处理)异步进程:

trap 'kill %1' SIGINT
python -m myproject.core.main | tee 1.log | sed -e 's/^/[Command1] /' & python -m myproject.core.main | tee 2.log | sed -e 's/^/[Command2] /'

现在也只是完全阻止执行并返回 Postgres SSL 错误

SSL error in data received
protocol: <asyncio.sslproto.SSLProtocol object at 0x115f0b828>
transport: <_SelectorSocketTransport fd=15 read=polling write=<idle,bufsize=0>>
Traceback (most recent call last):
  File "/Library/Developer/CommandLinetools/Library/Frameworks/python3.framework/Versions/3.7/lib/python3.7/asyncio/sslproto.py",line 526,in data_received
    ssldata,appdata = self._sslpipe.Feed_ssldata(data)
  File "/Library/Developer/CommandLinetools/Library/Frameworks/python3.framework/Versions/3.7/lib/python3.7/asyncio/sslproto.py",line 207,in Feed_ssldata
    self._sslobj.unwrap()
  File "/Library/Developer/CommandLinetools/Library/Frameworks/python3.framework/Versions/3.7/lib/python3.7/ssl.py",line 767,in unwrap
    return self._sslobj.shutdown()
ssl.SSLError: [SSL: DECRYPTION_Failed_OR_BAD_RECORD_MAC] decryption Failed or bad record mac (_ssl.c:2609)

同样的 bash 问题也发生在 Ubuntu 系统上。


编辑 2:

配置日志后,我得到以下错误日志(仅最后几行)。我在 python 中使用 7 个进程只是为了使错误显示得更快:

 2021-01-05 12:22:56.597 UTC,"postgres","scraper",3626,"localhost.localdomain:56422",5ff45a1e.e2a,16,"idle in transaction",2021-01-05 12:22:54 UTC,6/17,LOG,00000,"statement:   UPDATE url_queue SET crawler_processing_sentinel=true WHERE url_queue.idexec_simple_query,postgres.c:1045",""
 2021-01-05 12:22:56.921 UTC,17,77447627,"statement: SELECT count(*) AS count_1
 FROM (SELECT raw_markup.id AS raw_markup_id,raw_markup.url_id AS raw_markup_url_id,raw_markup.markup AS raw_markup_markup,raw_markup.spider_processing_sentinel AS            raw_markup_spider_processing_sentinel,raw_markup.spider_processed_sentinel AS raw_markup_spider_processed_sentinel,raw_markup.spider_skip AS raw_markup_spider_skip,raw_markup.version_spider AS raw_markup_version_spider,raw_markup.updated_at AS raw_markup_updated_at
 FROM raw_markup) AS anon_1",""
 2021-01-05 12:22:56.954 UTC,18,"statement: SELECT count(*) AS count_1
 FROM (SELECT url_queue.id AS url_queue_id,url_queue.url_id AS url_queue_url_id,url_queue.crawler_processing_sentinel AS url_queue_crawler_processing_sentinel,url_queue.      crawler_processed_sentinel AS url_queue_crawler_processed_sentinel,url_queue.crawler_skip AS url_queue_crawler_skip,url_queue.retries AS url_queue_retries,url_queue.         occurrences AS url_queue_occurrences,url_queue.version_crawl_frontier AS url_queue_version_crawl_frontier,url_queue.created_at AS url_queue_created_at
 FROM url_queue) AS anon_1",""
 2021-01-05 12:22:57.153 UTC,19,"statement: SELECT url.url AS url_url,url_queue.id AS url_queue_id
 FROM url_queue,url LEFT OUTER JOIN raw_markup ON url.id = raw_markup.url_id
 WHERE url.id = url_queue.url_id AND raw_markup.id IS NULL AND url_queue.retries < 4 AND url_queue.crawler_processing_sentinel = false AND ((url.url LIKE '%' || 'thomasnet.com'  || '%') OR (url.url LIKE '%' || 'go4worldbusiness.com' || '%'))
  LIMIT 512 OFFSET 31636",""
 2021-01-05 12:22:58.721 UTC,20,"statement: UPDATE url_queue SET crawler_processing_sentinel=true WHERE url_queue.id
 2021-01-05 12:23:01.722 UTC,3614,"localhost.localdomain:56084",5ff45a1c.e1e,21,2021-01-05 12:22:52 UTC,5/57,77447626,"statement: ROLLBACK",""
 2021-01-05 12:23:05.919 UTC,22,"idle",5/58,"statement:                  ROLLBACK",23,"ROLLBACK",WARNING,25P01,"there is no         transaction in progress","UserAbortTransactionBlock,xact.c:3946",""
 2021-01-05 12:23:05.920 UTC,3606,"localhost.localdomain:55748",5ff45a1a.e16,2021-01-05 12:22:50 UTC,"disconnection: session time: 0: 00:15.745 user=postgres database=scraper host=localhost.localdomain port=55748","log_disconnections,postgres.c:4677",""
 2021-01-05 12:23:05.952 UTC,"disconnection: session time: 0: 00:10.994 user=postgres database=scraper host=localhost.localdomain port=56422",24,"disconnection: session time: 0: 00:13.606 user=postgres database=scraper host=localhost.localdomain port=56084",""

SQLAlchemy documentation 写入 It’s critical that when using a connection pool,and by extension when using an Engine created via create_engine(),that the pooled connections are not shared to a forked process. TCP connections are represented as file descriptors,which usually work across process boundaries,meaning this will cause concurrent access to the file descriptor on behalf of two or more entirely independent Python interpreter states.,也许池是跨进程共享的?


编辑 3:当我使用 python3.8 构建多进程 spawn creation differently

时,我遇到了同样的错误

编辑 4:当我将查询更改为多个提交非常频繁的简单插入语句(而不是批量插入语句,并且提交频率不高)时,此错误发生的频率

解决方法

代替

class AsyncProcessWrapper:

    def __init__(self):
        resource_database = Database()
        self.main = Main(database=resource_database)

    def run(self):
        asyncio.run(self.main.run_main_loop())

你应该这样做:

class AsyncProcessWrapper:

    def __init__(self):
        self.name = 'PROC:' + ''.join(random.choice(string.ascii_uppercase) for _ in range(4))

    def run_main_loop(self):

        self.resource_database = Database() 
        self.resource_database.engine.dispose()
        self.main = Main(name=self.name,database=self.resource_database)
        asyncio.run(self.main.run_main_loop())

因为连接池否则会在父进程中产生,根据 this 您希望避免这种情况(__init__ 不被子进程调用,但仍在父循环中)>