尝试使用STS通过SQS连接Celery时出错

问题描述

我正在尝试将Celery与SQS一起用作经纪人。为了从我的容器中使用SQS,我需要承担一个角色,为此,我正在使用STS。我的代码如下:

role_info = {
    'RoleArn': 'arn:aws:iam::xxxxxxx:role/my-role-execution','RoleSessionName': 'roleExecution'
}
sts_client = boto3.client('sts',region_name='eu-central-1')
credentials = sts_client.assume_role(**role_info)

aws_access_key_id = credentials["Credentials"]['AccessKeyId']
aws_secret_access_key = credentials["Credentials"]['SecretAccessKey']
aws_session_token = credentials["Credentials"]["SessionToken"]

os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key
os.environ["AWS_DEFAULT_REGION"] = 'eu-central-1'
os.environ["AWS_SESSION_TOKEN"] = aws_session_token

broker = "sqs://"
backend = 'redis://redis-service:6379/0'

celery = Celery('tasks',broker=broker,backend=backend)

celery.conf["task_default_queue"] = 'my-queue'
celery.conf["broker_transport_options"] = {
    'region': 'eu-central-1','predefined_queues': {
       'my-queue': {
            'url': 'https://sqs.eu-central-1.amazonaws.com/xxxxxxx/my-queue'
        }
    }
}

在同一文件中,我有以下任务:

@celery.task(name='my-queue.my_task')
def my_task(content) -> int:
    print("hello")
    return 0

当我执行以下代码时,我得到一个错误

[2020-09-24 10:38:03,602: CRITICAL/MainProcess] Unrecoverable error: ClientError('An error occurred (AccessDenied) when calling the ListQueues operation: Access to the resource https://eu-central-1.queue.amazonaws.com/ is denied.',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py",line 921,in create_channel
    return self._avail_channels.pop()
IndexError: pop from empty list

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/worker/worker.py",line 208,in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py",line 119,in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py",line 369,in start
    return self.obj.start()
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py",line 318,in start
    blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py",in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/connection.py",line 23,in start
    c.connection = c.connect()
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py",line 405,in connect
    conn = self.connection_for_read(heartbeat=self.amqheartbeat)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py",line 412,in connection_for_read
    self.app.connection_for_read(heartbeat=heartbeat))
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py",line 439,in ensure_connected
    callback=maybe_shutdown,File "/usr/local/lib/python3.6/site-packages/kombu/connection.py",line 422,in ensure_connection
    callback,timeout=timeout)
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/functional.py",line 341,in retry_over_time
    return fun(*args,**kwargs)
  File "/usr/local/lib/python3.6/site-packages/kombu/connection.py",line 275,in connect
    return self.connection
  File "/usr/local/lib/python3.6/site-packages/kombu/connection.py",line 823,in connection
    self._connection = self._establish_connection()
  File "/usr/local/lib/python3.6/site-packages/kombu/connection.py",line 778,in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py",line 941,in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py",line 923,in create_channel
    channel = self.Channel(connection)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/SQS.py",line 100,in __init__
    self._update_queue_cache(self.queue_name_prefix)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/SQS.py",line 105,in _update_queue_cache
    resp = self.sqs.list_queues(QueueNamePrefix=queue_name_prefix)
  File "/usr/local/lib/python3.6/site-packages/botocore/client.py",line 337,in _api_call
    return self._make_api_call(operation_name,kwargs)
  File "/usr/local/lib/python3.6/site-packages/botocore/client.py",line 656,in _make_api_call
    raise error_class(parsed_response,operation_name)
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the ListQueues operation: Access to the resource https://eu-central-1.queue.amazonaws.com/ is denied.

如果我在没有Celery的情况下直接直接使用boto3 ,则可以连接到队列并检索数据,而不会出现此错误。我不知道为什么Celery / Kombu在我指定predefined_queues配置时尝试列出队列,tha用来避免这些行为(来自文档):

如果您希望Celery在AWS中使用一组预定义的队列,并且从不尝试列出SQS队列,也不尝试创建或删除它们,请使用预定义的queue_urls设置将队列名称映射传递给URL

Source here

有人知道会发生什么吗?我应该如何修改我的代码才能使其正常工作?似乎Celery根本没有使用凭据。

我正在使用的版本:

celery==4.4.7
boto3==1.14.54
kombu==4.5.0

谢谢!

PS:我在Github中创建了issue来跟踪这是否可能是库错误...

解决方法

我解决了将依赖项更新到最新版本的问题:

celery==5.0.0
boto3==1.14.54
kombu==5.0.2
pycurl==7.43.0.6
,

通过设置以下配置选项,我能够使 celery==4.4.7kombu==4.6.11 工作:

celery.conf["task_create_missing_queues"] = False

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...