问题描述
我正在使用以下代码向aiohttp客户端发出请求。我尝试发送请求的服务器每个IP每小时有30k请求限制。因此,我收到429个过多的请求错误。每当工作达到极限时,我都想把它睡觉。
我可以从标题中提取x_rateLimit_reset,所以我以为可以用它来使工作进入睡眠状态,但是我观察到了非常奇怪的行为。有时,工作的睡眠时间变为负数,有时会陷入睡眠模式。
例如,上一次我执行工作时,它先睡了2000秒,然后经过一段时间后,它又尝试再睡2500秒并陷入睡眠模式。我认为可能是其他并行进程引起了这个问题,所以想知道在使用Asyncio时如何处理过多的请求错误消息。
@backoff.on_exception(backoff.expo,(asyncio.TimeoutError,aiohttp.client_exceptions.ServerdisconnectedError,TooManyRequests),max_time=300)
async def fetch(self,url,session,params):
try:
async with session.get(url,params=params) as response:
Now = int(time.time())
print(response)
output = await response.read()
output = json.loads(output)
if 'X-RateLimit-Remaining' in response.headers:
rate = response.headers['X-RateLimit-Remaining']
if 'status' in output and output['status'] == 429:
x_rateLimit_reset = int(response.headers['X-RateLimit-Reset'])
print("sleep mode")
seconds = x_rateLimit_reset - Now
LOGGER.info("The job will sleep for {} seconds".format(seconds))
time.sleep(max(seconds,0))
raise TooManyRequests()
return output
except (asyncio.TimeoutError,TypeError,json.decoder.JSONDecodeError,aiohttp.client_exceptions.ServerdisconnectedError) as e:
print(str(e))
async def bound_fetch(self,sem,params):
# Getter function with semaphore.
async with sem:
output = await self.fetch(url,params)
return {"url": url,"output": output}
编辑: 这是我启动bound_fetch并定义URL的方式:
def get_responses(self,urls,office_token,params=None):
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(self.run(office_token,params))
responses = loop.run_until_complete(future)
return responses
async def run(self,params):
tasks = []
# create instance of Semaphore
sem = asyncio.BoundedSemaphore(200)
timeout = ClientTimeout(total=1000)
async with ClientSession(auth=BasicAuth(office_token,password=' '),timeout=timeout,connector=TCPConnector(ssl=False)) as session:
for url in urls:
# pass Semaphore and session to every GET request
task = asyncio.ensure_future(self.bound_fetch(sem,params))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
urls = [
"{}/{}".format(self.base_url,"{}?page={}&api_key={}".format(object_name,page_number,self.api_keys))
for page_number in range(batch * chunk_size + 1,chunk_size * (1 + batch) + 1)]
解决方法
您使用public class Person : IPropertyChangeLog
{
private PropertyChangeLog<Person> _log = new PropertyChangeLog<Person>();
private string _name;
private int? _age;
private int? _height;
[JsonProperty("name")]
public string Name
{
get => _name;
set => _log.UpdateProperty(value,ref _name);
}
[JsonProperty("age")]
public int? Age
{
get => _age;
set => _log.UpdateProperty(value,ref _age);
}
[JsonProperty("height")]
public int? Height
{
get => _height;
set => _log.UpdateProperty(value,ref _height);
}
IEnumerable<string> IPropertyChangeLog.PropertiesChanged => _log.PropertiesChanged;
void IPropertyChangeLog.Reset() => _log.Reset();
}
而不是var person = JsonConvert.DeserializeObject<Person>("{ \"name\": \"test\" }");
var log = (IPropertyChangeLog)person;
// log.PropertiesChanged should now contain 'Name'
foreach (var property in log.PropertiesChanged)
{
// we know that the property named in 'property' changed
}
log.Reset();
JsonConvert.PopulateObject("{ \"age\": null }",person);
// now log.PropertiesChanged should only contain 'Age'
foreach (var property in log.PropertiesChanged)
{
// we know that the property named in 'property' changed
}
的主要原因。
更新
这是最小的工作解决方案,并有人评论了它的工作原理。
请使用它来采用您的解决方案。
time.sleep()