问题描述
我需要创建一个服务器,我可以通过从指定站点获取抓取的数据来向其发出 REST 请求。
例如像这样的网址:
http://myip/scraper?url=www.exampe.com&token=0
我必须抓取一个内置 javascript 的网站,以识别它是由真实浏览器还是无头浏览器打开的。
唯一的选择是 selenium 或 pyppeteer 和一个 virtualdisplay。
我目前使用 selenium 和 FastAPI,但它不是一个有很多请求的可用解决方案。对于每个请求 chrome 打开和关闭,这会大大延迟响应并使用大量资源。
使用 pyppeteer async,您可以在同一个浏览器实例上同时打开多个选项卡,从而减少响应时间。但这可能会在多次标签后导致其他问题。
我正在考虑创建一个浏览器实例池,将各种请求划分为 puppeteer-cluster。
但到目前为止我还没有弄清楚。
我目前正在为浏览器尝试此代码:
import json
from pyppeteer import launch
from strings import keepa_storage
class browser:
async def __aenter__(self):
self._session = await launch(headless=False,args=['--no-sandBox',"--disable-gpu",'--lang=it','--disable-blink-features=AutomationControlled'],autoClose=False)
return self
async def __aexit__(self,*err):
self._session = None
async def fetch(self,url):
page = await self._session.newPage()
page_source = None
try:
await page.goto("https://example.com/404")
for key in keepa_storage:
await page.evaluate(
"window.localStorage.setItem('{}',{})".format(key,json.dumps(example_local_storage.get(key))))
await page.goto(url)
await page.waitForSelector('#tableElement')
page_source = await page.content()
except TimeoutError as e:
print(f'Timeout for: {url}')
finally:
await page.close()
return page_source
还有这个请求的代码:
async with browser() as http:
source = await asyncio.gather(
http.fetch('https://example.com')
)
但我不知道如何为多个服务器请求重用相同的浏览器会话
解决方法
在初始化服务器时,创建一个 Manager
对象。根据实现 manager
自动生成所有需要的 Worker
。在 API 实现方法中调用 manager.assign(item)
。这应该得到一个空闲的工人并将项目分配给它。如果目前没有工作人员空闲,由于 Queue
的 manager._AVAILABLE_WORKER
性质,它应该等到工作人员可用。在不同的线程上创建一个无限循环并调用 manager.heartbeat()
以确保工作人员不会懈怠。
我已经在评论部分提到了每个方法的目的是什么,它应该做什么。这应该足以让你开始。如果需要进一步说明,请随时告诉我。
import Queue
class Worker:
###
# class to define behavior and parameters of workers
###
def __init__(self,base_url):
###
# Initialises a worker
# STEP 1. Create one worker with given inputs
# STEP 2. Mark the worker busy
# STEP 3. Get ready for item consumption with initialisation/login process done
# STEP 4. Mark the worker available and active
###
raise NotImplementedError()
def process_item(self,**item):
###
# Worker processes the given item and returns data to manager
# Step 1. worker marks himself busy
# Step 2. worker processes the item. Handle Errors here
# Step 3. worker marks himself available
# Step 4. Return the data scraped
###
raise NotImplementedError()
class Manager:
###
# class for manager who supervises all the workers and assigns work to them
###
def __init__(self):
self._WORKERS = set() # set container to hold all the workers details
self._AVAILABLE_WORKERS = Queue(maxsize=10) # queue container to hold available workers
# create all the worker we want and add them to self._WORKERS and self._AVAILABLE_WORKERS
def assign(self,item):
###
# Assigns an item to a worker to be processed and once processed returns data to the server
# STEP 1. remove worker from available pool
# STEP 2. assign item to worker
# STEP 3A. if item is successfully processed,put the worker back to available pool
# STEP 3B. if error occurred during item processing,try to reset the worker and put the worker back to
# available pool
###
raise NotImplementedError()
def heartbeat(self):
###
# process to check if all the workers are active and accounted for at particular interval.
# if the worker is available but not in the pool add it to the pool after checking if it's not busy
# if the worker is not active then reset the worker and add it to the pool
###
raise NotImplementedError()