Twitter Api 无法使用 AsyncOAuth 1.0 访问广告 API

问题描述

我已经在 the Twitter 论坛上发布了这个问题,但是我没有得到成功的回复,我决定在这里发布它的原因是为了看看它是否可以获得更多的知名度。

我能够使用 python -oauth2 库向广告 API 发出 API 请求。

现在我想通过在一个小时内向 API 发出大约 400 个请求来扩展它,我发现使用 async aiohttps 库可以为我完成这项工作。

但是,我再次面临身份验证问题。当我使用通过异步支持实现 OAuth 1.0 的 python AsyncOAuth 时,它不起作用。我收到了未经授权的回复

httpx.HTTPStatusError: 401 Client Error: Unauthorized for url: https://ads-api.twitter.com/9/insights/keywords/searc

但是如果我尝试使用相同的方法访问正常的:https://api.twitter.com/1.1/account/verify_credentials.json URL,它可以正常工作。

所以基本上这里是我正在做的事情的简单代码


    from authlib.integrations.httpx_client import AsyncOAuth1Client


    from authlib.integrations.httpx_client import AsyncOAuth1Client
    def build_request_url(self,keyword):
        base_url = "https://ads-api.twitter.com"
        path = "/9/insights/keywords/search"
        params = f'granularity={self.granularity}&keywords={keyword}&start_time={self.since}&location={self.location}'
        return f'{base_url}{path}?{params}'

    async def make_async_request(self,keyword,client,**kwargs):
        """GET request wrapper to fetch page HTML.

        kwargs are passed to `session.request()`.
        """
        url = self.build_request_url(keyword)
        account_url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
        response = await client.get(url)
        response.raise_for_status()
        print("Got response [%s] for URL: %s",response.json(),url)
        json = response.json()
        return json

    async def bulk_crawl_and_write(self,file,**kwargs):
        """Crawl & write concurrently to `file` for multiple `urls`."""
        async with AsyncOAuth1Client(self.consumer_key,client_secret=self.consumer_secret,token=self.access_token,token_secret=self.access_secret) as session_client:
            tasks = []
            for keyword in self.keywords_list:
                response = await self.make_async_request(keyword,session_client,**kwargs)
                tasks.append(write_one(file=file,response=response,keyword=keyword,**kwargs))
        await asyncio.gather(*tasks)
   async def make_async_request(self,**kwargs))
        await asyncio.gather(*tasks)

对我在这里做错了什么有什么想法吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)