问题描述
我正在使用 python3.7
和 asyncio
在 multiprocessing
中编写一个简单的网络爬虫。总体架构如下所示:
for i in range(self.number_processes - len(processes)):
p = Process(target=AsyncProcessWrapper().run_main_loop)
time.sleep(0.3)
p.start()
start_time = time.time()
其中 AsyncProcessWrapper
定义为:
class AsyncProcessWrapper:
def __init__(self):
resource_database = Database()
# This is where the async logic takes place,following a producer-consumer pattern.
# I will hide this logic for simplicity.
self.main = Main(database=resource_database)
def run(self):
asyncio.run(self.main.run_main_loop())
数据库连接建立一次,永不关闭(因为这是一个scraper,下载数据需要一段时间)。
数据库类init如下所示:
class Database:
def __init__(self):
db_url = os.getenv('DatabaseUrl')
self.engine = create_engine(db_url,encoding='utf8',poolclass=NullPool)
Session = sessionmaker()
Session.configure(bind=self.engine)
self.session = Session()
def create_url_entity(self,urls):
for url in urls:
url_entity_obj = URLEntity(
url=url,engine_version=self.engine_version
)
to_insert.append(url_entity_obj)
self.session.bulk_save_objects(to_insert)
对于 processes=2
(或任何更高的数字),当我在具有 2 个物理内核的 MacOS 上运行上述代码片段时,我没有收到任何错误。
但是,当我在具有 8 个物理内核的 Ubuntu 18.05 机器上设置 processes=2
时,我不断收到以下错误。
david@bob:~$ screen -r myproject
cursor,statement,parameters,context
File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/default.py",line 593,in do_execute
cursor.execute(statement,parameters)
psycopg2.OperationalError: SSL SYSCALL error: EOF detected
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/lib/python3.7/multiprocessing/process.py",line 297,in _bootstrap
self.run()
File "/usr/lib/python3.7/multiprocessing/process.py",line 99,in run
self._target(*self._args,**self._kwargs)
File "/home/david/myproject/myproject/engine/core.py",line 25,in run_main_loop
asyncio.run(self.main.run_main_loop())
File "/usr/lib/python3.7/asyncio/runners.py",line 43,in run
return loop.run_until_complete(main)
File "/usr/lib/python3.7/asyncio/base_events.py",line 579,in run_until_complete
return future.result()
File "/home/david/myproject/myproject/core/main.py",line 150,in run_main_loop
self.resource_database.create_markup_record(self.buffer_markup_records)
File "/home/david/myproject/myproject/resources/db.py",line 281,in create_markup_record
.join(RawMarkup,isouter=True) \
File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/orm/query.py",line 3373,in all
return list(self)
File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/orm/query.py",line 3535,in __iter__
return self._execute_and_instances(context)
File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/orm/query.py",line 3560,in _execute_and_instances
result = conn.execute(querycontext.statement,self._params)
File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py",line 1011,in execute
return meth(self,multiparams,params)
File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/sql/elements.py",line 298,in _execute_on_connection
return connection._execute_clauseelement(self,params)
File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py",line 1130,in _execute_clauseelement
distilled_params,File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py",line 1317,in _execute_context
e,cursor,context
File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py",line 1511,in _handle_dbapi_exception
sqlalchemy_exception,with_traceback=exc_info[2],from_=e
File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/util/compat.py",line 182,in raise_
raise exception
File "/home/david/myproject/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py",line 1277,in _execute_context
cursor,parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL SYSCALL error: EOF detected
[sql: SELECT url.id AS url_id,url.url AS url_url,raw_markup.id AS raw_markup_id
FROM url LEFT OUTER JOIN raw_markup ON url.id = raw_markup.url_id
WHERE url.url IN (%(url_1)s,%(url_2)s,%(url_3)s,%(url_4)s,%(url_5)s,%(url_6)s,%(url_7)s,%(url_8)s,%(url_9)s,%(url_10)s,%(url_11)s,%(url_12)s,%(url_13)s,%(url_14)s,%(url_15)s,%(url_16)s,%(url_17)s,%(url_18)s)]
[parameters: {'url_1': 'placeholder_string_1','url_2': 'placeholder_string_2','url_3': 'placeholder_string_3',...'url_18': 'placeholder_string_18'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
我的 Ubuntu 上的数据库要大得多(ubuntu 机器在其最大的表中有 34'744'900
行,而我的本地机器有大约 1'000'000
行)。
附加信息:
- 我正在我的 mac(版本 10.15)上对我的应用程序进行原型设计。然后我上传我的代码以在集群上执行。现在,两台机器的 Postgres 版本是等效的(Postgres 10)
- 我将集群的 Postgres 版本升级到 12。这没有任何改变。
- 我尝试更频繁地提交,但这并没有改变任何事情。
- 当我生成更多进程(即
processes=7
)时,错误不断增加。 - 我使用
sqlalchemy==1.3
。 - 增加友好度
nice -n 17 python -m run
不会改变任何东西(感觉上会让异步工作线程运行得更快)。 - 我在某处读到这可能是由于
libpq
在 Ubuntu 上表现得很奇怪。如果这可能导致它,有什么想法吗?
编辑:
我刚刚在 MacOS 上使用 bash 和以下命令启动了两个 单(无多处理)异步进程:
trap 'kill %1' SIGINT
python -m myproject.core.main | tee 1.log | sed -e 's/^/[Command1] /' & python -m myproject.core.main | tee 2.log | sed -e 's/^/[Command2] /'
现在也只是完全阻止执行并返回 Postgres SSL 错误。
SSL error in data received
protocol: <asyncio.sslproto.SSLProtocol object at 0x115f0b828>
transport: <_SelectorSocketTransport fd=15 read=polling write=<idle,bufsize=0>>
Traceback (most recent call last):
File "/Library/Developer/CommandLinetools/Library/Frameworks/python3.framework/Versions/3.7/lib/python3.7/asyncio/sslproto.py",line 526,in data_received
ssldata,appdata = self._sslpipe.Feed_ssldata(data)
File "/Library/Developer/CommandLinetools/Library/Frameworks/python3.framework/Versions/3.7/lib/python3.7/asyncio/sslproto.py",line 207,in Feed_ssldata
self._sslobj.unwrap()
File "/Library/Developer/CommandLinetools/Library/Frameworks/python3.framework/Versions/3.7/lib/python3.7/ssl.py",line 767,in unwrap
return self._sslobj.shutdown()
ssl.SSLError: [SSL: DECRYPTION_Failed_OR_BAD_RECORD_MAC] decryption Failed or bad record mac (_ssl.c:2609)
同样的 bash 问题也发生在 Ubuntu 系统上。
编辑 2:
配置日志后,我得到以下错误日志(仅最后几行)。我在 python 中使用 7 个进程只是为了使错误显示得更快:
2021-01-05 12:22:56.597 UTC,"postgres","scraper",3626,"localhost.localdomain:56422",5ff45a1e.e2a,16,"idle in transaction",2021-01-05 12:22:54 UTC,6/17,LOG,00000,"statement: UPDATE url_queue SET crawler_processing_sentinel=true WHERE url_queue.idexec_simple_query,postgres.c:1045",""
2021-01-05 12:22:56.921 UTC,17,77447627,"statement: SELECT count(*) AS count_1
FROM (SELECT raw_markup.id AS raw_markup_id,raw_markup.url_id AS raw_markup_url_id,raw_markup.markup AS raw_markup_markup,raw_markup.spider_processing_sentinel AS raw_markup_spider_processing_sentinel,raw_markup.spider_processed_sentinel AS raw_markup_spider_processed_sentinel,raw_markup.spider_skip AS raw_markup_spider_skip,raw_markup.version_spider AS raw_markup_version_spider,raw_markup.updated_at AS raw_markup_updated_at
FROM raw_markup) AS anon_1",""
2021-01-05 12:22:56.954 UTC,18,"statement: SELECT count(*) AS count_1
FROM (SELECT url_queue.id AS url_queue_id,url_queue.url_id AS url_queue_url_id,url_queue.crawler_processing_sentinel AS url_queue_crawler_processing_sentinel,url_queue. crawler_processed_sentinel AS url_queue_crawler_processed_sentinel,url_queue.crawler_skip AS url_queue_crawler_skip,url_queue.retries AS url_queue_retries,url_queue. occurrences AS url_queue_occurrences,url_queue.version_crawl_frontier AS url_queue_version_crawl_frontier,url_queue.created_at AS url_queue_created_at
FROM url_queue) AS anon_1",""
2021-01-05 12:22:57.153 UTC,19,"statement: SELECT url.url AS url_url,url_queue.id AS url_queue_id
FROM url_queue,url LEFT OUTER JOIN raw_markup ON url.id = raw_markup.url_id
WHERE url.id = url_queue.url_id AND raw_markup.id IS NULL AND url_queue.retries < 4 AND url_queue.crawler_processing_sentinel = false AND ((url.url LIKE '%' || 'thomasnet.com' || '%') OR (url.url LIKE '%' || 'go4worldbusiness.com' || '%'))
LIMIT 512 OFFSET 31636",""
2021-01-05 12:22:58.721 UTC,20,"statement: UPDATE url_queue SET crawler_processing_sentinel=true WHERE url_queue.id
2021-01-05 12:23:01.722 UTC,3614,"localhost.localdomain:56084",5ff45a1c.e1e,21,2021-01-05 12:22:52 UTC,5/57,77447626,"statement: ROLLBACK",""
2021-01-05 12:23:05.919 UTC,22,"idle",5/58,"statement: ROLLBACK",23,"ROLLBACK",WARNING,25P01,"there is no transaction in progress","UserAbortTransactionBlock,xact.c:3946",""
2021-01-05 12:23:05.920 UTC,3606,"localhost.localdomain:55748",5ff45a1a.e16,2021-01-05 12:22:50 UTC,"disconnection: session time: 0: 00:15.745 user=postgres database=scraper host=localhost.localdomain port=55748","log_disconnections,postgres.c:4677",""
2021-01-05 12:23:05.952 UTC,"disconnection: session time: 0: 00:10.994 user=postgres database=scraper host=localhost.localdomain port=56422",24,"disconnection: session time: 0: 00:13.606 user=postgres database=scraper host=localhost.localdomain port=56084",""
SQLAlchemy documentation 写入 It’s critical that when using a connection pool,and by extension when using an Engine created via create_engine(),that the pooled connections are not shared to a forked process. TCP connections are represented as file descriptors,which usually work across process boundaries,meaning this will cause concurrent access to the file descriptor on behalf of two or more entirely independent Python interpreter states.
,也许池是跨进程共享的?
编辑 3:当我使用 python3.8
构建多进程 spawn creation differently
编辑 4:当我将查询更改为多个提交非常频繁的简单插入语句(而不是批量插入语句,并且提交频率不高)时,此错误发生的频率。
解决方法
代替
class AsyncProcessWrapper:
def __init__(self):
resource_database = Database()
self.main = Main(database=resource_database)
def run(self):
asyncio.run(self.main.run_main_loop())
你应该这样做:
class AsyncProcessWrapper:
def __init__(self):
self.name = 'PROC:' + ''.join(random.choice(string.ascii_uppercase) for _ in range(4))
def run_main_loop(self):
self.resource_database = Database()
self.resource_database.engine.dispose()
self.main = Main(name=self.name,database=self.resource_database)
asyncio.run(self.main.run_main_loop())
因为连接池否则会在父进程中产生,根据 this 您希望避免这种情况(__init__
不被子进程调用,但仍在父循环中)>