如何使用Asyncio避免错误429请求过多python

问题描述

我正在使用以下代码向aiohttp客户端发出请求。我尝试发送请求的服务器每个IP每小时有30k请求限制。因此,我收到429个过多的请求错误。每当工作达到极限时,我都想把它睡觉。

我可以从标题提取x_rateLimit_reset,所以我以为可以用它来使工作进入睡眠状态,但是我观察到了非常奇怪的行为。有时,工作的睡眠时间变为负数,有时会陷入睡眠模式。

例如,上一次我执行工作时,它先睡了2000秒,然后经过一段时间后,它又尝试再睡2500秒并陷入睡眠模式。我认为可能是其他并行进程引起了这个问题,所以想知道在使用Asyncio时如何处理过多的请求错误消息。

@backoff.on_exception(backoff.expo,(asyncio.TimeoutError,aiohttp.client_exceptions.ServerdisconnectedError,TooManyRequests),max_time=300)
    async def fetch(self,url,session,params):
        try:
            async with session.get(url,params=params) as response:
                Now = int(time.time())
                print(response)
                output = await response.read()
                output = json.loads(output)

                if 'X-RateLimit-Remaining' in response.headers:
                    rate = response.headers['X-RateLimit-Remaining']

                if 'status' in output and output['status'] == 429:
                    x_rateLimit_reset = int(response.headers['X-RateLimit-Reset'])
                    print("sleep mode")
                    seconds = x_rateLimit_reset - Now
                    LOGGER.info("The job will sleep for {} seconds".format(seconds))
                    time.sleep(max(seconds,0))
                    raise TooManyRequests()



            return output

        except (asyncio.TimeoutError,TypeError,json.decoder.JSONDecodeError,aiohttp.client_exceptions.ServerdisconnectedError) as e:
            print(str(e))

    async def bound_fetch(self,sem,params):
        # Getter function with semaphore.
        async with sem:
            output = await self.fetch(url,params)
        return {"url": url,"output": output}

编辑: 这是我启动bound_fetch并定义URL的方式:

def get_responses(self,urls,office_token,params=None):   
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(self.run(office_token,params))
    responses = loop.run_until_complete(future)
    return responses

async def run(self,params):
        tasks = []
        # create instance of Semaphore
        sem = asyncio.BoundedSemaphore(200)
        timeout = ClientTimeout(total=1000)

        async with ClientSession(auth=BasicAuth(office_token,password=' '),timeout=timeout,connector=TCPConnector(ssl=False)) as session:
            for url in urls:
                # pass Semaphore and session to every GET request
                task = asyncio.ensure_future(self.bound_fetch(sem,params))
                tasks.append(task)

            responses = await asyncio.gather(*tasks)
            return responses

urls = [
                        "{}/{}".format(self.base_url,"{}?page={}&api_key={}".format(object_name,page_number,self.api_keys))
                        for page_number in range(batch * chunk_size + 1,chunk_size * (1 + batch) + 1)]

解决方法

您使用public class Person : IPropertyChangeLog { private PropertyChangeLog<Person> _log = new PropertyChangeLog<Person>(); private string _name; private int? _age; private int? _height; [JsonProperty("name")] public string Name { get => _name; set => _log.UpdateProperty(value,ref _name); } [JsonProperty("age")] public int? Age { get => _age; set => _log.UpdateProperty(value,ref _age); } [JsonProperty("height")] public int? Height { get => _height; set => _log.UpdateProperty(value,ref _height); } IEnumerable<string> IPropertyChangeLog.PropertiesChanged => _log.PropertiesChanged; void IPropertyChangeLog.Reset() => _log.Reset(); } 而不是var person = JsonConvert.DeserializeObject<Person>("{ \"name\": \"test\" }"); var log = (IPropertyChangeLog)person; // log.PropertiesChanged should now contain 'Name' foreach (var property in log.PropertiesChanged) { // we know that the property named in 'property' changed } log.Reset(); JsonConvert.PopulateObject("{ \"age\": null }",person); // now log.PropertiesChanged should only contain 'Age' foreach (var property in log.PropertiesChanged) { // we know that the property named in 'property' changed } 的主要原因。

更新

这是最小的工作解决方案,并有人评论了它的工作原理。

请使用它来采用您的解决方案。

看看asyncio-throttle

time.sleep()