连接如何在多进程池中回收来自单个 requests.Session 对象的请求?

问题描述

以下是为问题简化的完整代码

ids_to_check 返回一个 id 列表。在我的测试中,我使用了一个包含 13 个随机字符串的列表。

#!/usr/bin/env python3
import time
from multiprocessing.dummy import Pool as ThreadPool,current_process as threadpool_process
import requests

def ids_to_check():
     some_calls()
     return(id_list)

def execute_task(id):
     url = f"https://myserver.com/todos/{ id }"
     json_op = s.get(url,verify=False).json()
     value = json_op['id']
     print(str(value) + '-' + str(threadpool_process()) + str(id(s)))

def main():
    pool = ThreadPool(processes=20)
    while True:
        pool.map(execute_task,ids_to_check())
        print("Let's wait for 10 seconds")
        time.sleep(10)

if __name__ == "__main__":
    s = requests.Session()
    s.headers.update = {
      'Accept': 'application/json'
    }

    main()

输出

4-<DummyProcess(Thread-2,started daemon 140209222559488)>140209446508360
5-<DummyProcess(Thread-5,started daemon 140209123481344)>140209446508360
7-<DummyProcess(Thread-6,started daemon 140209115088640)>140209446508360
2-<DummyProcess(Thread-11,started daemon 140208527894272)>140209446508360
None-<DummyProcess(Thread-1,started daemon 140209230952192)>140209446508360
10-<DummyProcess(Thread-4,started daemon 140209131874048)>140209446508360
12-<DummyProcess(Thread-7,started daemon 140209106695936)>140209446508360
8-<DummyProcess(Thread-3,started daemon 140209140266752)>140209446508360
6-<DummyProcess(Thread-12,started daemon 140208519501568)>140209446508360
3-<DummyProcess(Thread-13,started daemon 140208511108864)>140209446508360
11-<DummyProcess(Thread-10,started daemon 140208536286976)>140209446508360
9-<DummyProcess(Thread-9,started daemon 140209089910528)>140209446508360
1-<DummyProcess(Thread-8,started daemon 140209098303232)>140209446508360
Let's wait for 10 seconds
None-<DummyProcess(Thread-14,started daemon 140208502716160)>140209446508360
3-<DummyProcess(Thread-20,started daemon 140208108455680)>140209446508360
1-<DummyProcess(Thread-19,started daemon 140208116848384)>140209446508360
7-<DummyProcess(Thread-17,started daemon 140208133633792)>140209446508360
6-<DummyProcess(Thread-6,started daemon 140209115088640)>140209446508360
4-<DummyProcess(Thread-4,started daemon 140209131874048)>140209446508360
9-<DummyProcess(Thread-16,started daemon 140208485930752)>140209446508360
5-<DummyProcess(Thread-15,started daemon 140208494323456)>140209446508360
2-<DummyProcess(Thread-2,started daemon 140209222559488)>140209446508360
8-<DummyProcess(Thread-18,started daemon 140208125241088)>140209446508360
11-<DummyProcess(Thread-1,started daemon 140209230952192)>140209446508360
10-<DummyProcess(Thread-11,started daemon 140208527894272)>140209446508360
12-<DummyProcess(Thread-5,started daemon 140209123481344)>140209446508360
Let's wait for 10 seconds
None-<DummyProcess(Thread-3,started daemon 140209140266752)>140209446508360
2-<DummyProcess(Thread-10,started daemon 140208536286976)>140209446508360
1-<DummyProcess(Thread-12,started daemon 140208519501568)>140209446508360
4-<DummyProcess(Thread-9,started daemon 140209089910528)>140209446508360
5-<DummyProcess(Thread-14,started daemon 140208502716160)>140209446508360
9-<DummyProcess(Thread-6,started daemon 140209115088640)>140209446508360
8-<DummyProcess(Thread-16,started daemon 140208485930752)>140209446508360
7-<DummyProcess(Thread-4,started daemon 140209131874048)>140209446508360
3-<DummyProcess(Thread-20,started daemon 140208108455680)>140209446508360
6-<DummyProcess(Thread-8,started daemon 140209098303232)>140209446508360
12-<DummyProcess(Thread-13,started daemon 140208511108864)>140209446508360
10-<DummyProcess(Thread-7,started daemon 140209106695936)>140209446508360
11-<DummyProcess(Thread-19,started daemon 140208116848384)>140209446508360
Let's wait for 10 seconds
.
.

我的观察:

  • 创建了多个连接(即每个进程的连接),但会话对象在整个代码执行过程中是相同的(因为会话对象 ID 是相同的)
  • 从 ss 输出可以看出,连接保持回收。我无法确定回收的任何特定模式/超时
  • 如果我将进程减少到较小的数量,则连接不会回收。 (示例:5)

我不明白连接是如何/为什么被回收的,如果我减少进程数,为什么它们不会被回收。我已尝试禁用垃圾收集器 import gc; gc.disable(),但仍然回收连接。

我希望创建的连接保持活动状态,直到达到最大请求数。我认为它可以在没有会话并使用保持活动连接标头的情况下工作。

但我很想知道是什么导致这些会话连接在进程池长度很长时保持回收。

我可以在任何服务器上重现这个问题,所以它可能不依赖于服务器。

解决方法

我通过为每个进程创建会话和并行化请求执行为自己解决了同样的问题。第一次我也使用了 multiprocessing.dummy,但我遇到了与您相同的问题,因此将其更改为 concurrent.futures.thread.ThreadPoolExecutor

这是我的解决方案。

from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial

from requests import Session,Response
from requests.adapters import HTTPAdapter

def thread_pool_execute(iterables,method,pool_size=30) -> list:
    """Multiprocess requests,returns list of responses."""
    session = Session()
    session.mount('https://',HTTPAdapter(pool_maxsize=pool_size))  # that's it
    session.mount('http://',HTTPAdapter(pool_maxsize=pool_size))  # that's it    
    worker = partial(method,session)
    with ThreadPoolExecutor(pool_size) as pool:
        results = pool.map(worker,iterables)
    session.close()
    return list(results)

def simple_request(session,url) -> Response:
    return session.get(url)

response_list = thread_pool_execute(list_of_urls,simple_request)

我使用 pool_size=150 测试带有 20 万个网址的站点地图,没有任何问题。仅受目标主机配置限制。