问题描述
我正在尝试将Celery与SQS一起用作经纪人。为了从我的容器中使用SQS,我需要承担一个角色,为此,我正在使用STS。我的代码如下:
role_info = {
'RoleArn': 'arn:aws:iam::xxxxxxx:role/my-role-execution','RoleSessionName': 'roleExecution'
}
sts_client = boto3.client('sts',region_name='eu-central-1')
credentials = sts_client.assume_role(**role_info)
aws_access_key_id = credentials["Credentials"]['AccessKeyId']
aws_secret_access_key = credentials["Credentials"]['SecretAccessKey']
aws_session_token = credentials["Credentials"]["SessionToken"]
os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key
os.environ["AWS_DEFAULT_REGION"] = 'eu-central-1'
os.environ["AWS_SESSION_TOKEN"] = aws_session_token
broker = "sqs://"
backend = 'redis://redis-service:6379/0'
celery = Celery('tasks',broker=broker,backend=backend)
celery.conf["task_default_queue"] = 'my-queue'
celery.conf["broker_transport_options"] = {
'region': 'eu-central-1','predefined_queues': {
'my-queue': {
'url': 'https://sqs.eu-central-1.amazonaws.com/xxxxxxx/my-queue'
}
}
}
@celery.task(name='my-queue.my_task')
def my_task(content) -> int:
print("hello")
return 0
[2020-09-24 10:38:03,602: CRITICAL/MainProcess] Unrecoverable error: ClientError('An error occurred (AccessDenied) when calling the ListQueues operation: Access to the resource https://eu-central-1.queue.amazonaws.com/ is denied.',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py",line 921,in create_channel
return self._avail_channels.pop()
IndexError: pop from empty list
During handling of the above exception,another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/worker/worker.py",line 208,in start
self.blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py",line 119,in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py",line 369,in start
return self.obj.start()
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py",line 318,in start
blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py",in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/connection.py",line 23,in start
c.connection = c.connect()
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py",line 405,in connect
conn = self.connection_for_read(heartbeat=self.amqheartbeat)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py",line 412,in connection_for_read
self.app.connection_for_read(heartbeat=heartbeat))
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py",line 439,in ensure_connected
callback=maybe_shutdown,File "/usr/local/lib/python3.6/site-packages/kombu/connection.py",line 422,in ensure_connection
callback,timeout=timeout)
File "/usr/local/lib/python3.6/site-packages/kombu/utils/functional.py",line 341,in retry_over_time
return fun(*args,**kwargs)
File "/usr/local/lib/python3.6/site-packages/kombu/connection.py",line 275,in connect
return self.connection
File "/usr/local/lib/python3.6/site-packages/kombu/connection.py",line 823,in connection
self._connection = self._establish_connection()
File "/usr/local/lib/python3.6/site-packages/kombu/connection.py",line 778,in _establish_connection
conn = self.transport.establish_connection()
File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py",line 941,in establish_connection
self._avail_channels.append(self.create_channel(self))
File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py",line 923,in create_channel
channel = self.Channel(connection)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/SQS.py",line 100,in __init__
self._update_queue_cache(self.queue_name_prefix)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/SQS.py",line 105,in _update_queue_cache
resp = self.sqs.list_queues(QueueNamePrefix=queue_name_prefix)
File "/usr/local/lib/python3.6/site-packages/botocore/client.py",line 337,in _api_call
return self._make_api_call(operation_name,kwargs)
File "/usr/local/lib/python3.6/site-packages/botocore/client.py",line 656,in _make_api_call
raise error_class(parsed_response,operation_name)
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the ListQueues operation: Access to the resource https://eu-central-1.queue.amazonaws.com/ is denied.
如果我在没有Celery的情况下直接直接使用boto3
,则可以连接到队列并检索数据,而不会出现此错误。我不知道为什么Celery / Kombu在我指定predefined_queues
配置时尝试列出队列,tha用来避免这些行为(来自文档):
如果您希望Celery在AWS中使用一组预定义的队列,并且从不尝试列出SQS队列,也不尝试创建或删除它们,请使用预定义的queue_urls设置将队列名称映射传递给URL
有人知道会发生什么吗?我应该如何修改我的代码才能使其正常工作?似乎Celery根本没有使用凭据。
我正在使用的版本:
celery==4.4.7
boto3==1.14.54
kombu==4.5.0
谢谢!
PS:我在Github中创建了issue来跟踪这是否可能是库错误...
解决方法
我解决了将依赖项更新到最新版本的问题:
celery==5.0.0
boto3==1.14.54
kombu==5.0.2
pycurl==7.43.0.6
,
通过设置以下配置选项,我能够使 celery==4.4.7
和 kombu==4.6.11
工作:
celery.conf["task_create_missing_queues"] = False