celery 任务无法使用 python 从 postgresql 数据库中迭代多行

问题描述

我正在使用 databases python 包 (https://pypi.org/project/databases/) 来管理与我的 postgresql 数据库的连接

来自文档 (https://www.encode.io/databases/database_queries/#queries) 它说我可以使用

# Fetch multiple rows without loading them all into memory at once
query = notes.select()
async for row in database.iterate(query=query):
    ...

# Fetch multiple rows
query = notes.select()
rows = await database.fetch_all(query=query)

这是我尝试过的:

def check_all_orders():
    query = "SELECT * FROM orders WHERE shipped=True"
    return database.fetch_all(query)

...
...
...

@app.task
async def check_orders():

    query = await check_all_orders()
    
    today = datetime.utcNow()

    for q in query:
        if q.last_notification is not None:
            if (today - q.last_notification).total_seconds() < q.cooldown:
                continue

@app.task
async def check_orders():

    query = "SELECT * FROM orders WHERE shipped=True"

    today = datetime.utcNow()

    async for q in database.iterate(query=query):
        if q.last_notification is not None:
            if (today - q.last_notification).total_seconds() < q.cooldown:
                continue

我两个都用过,但出现以下错误

raise TypeError(f'Object of type {o.class.name} ' kombu.exceptions.EncodeError:协程类型的对象不是 JSON 可序列化的

下面的完整错误

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py",line 472,in trace_task
    mark_as_done(
  File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py",line 154,in mark_as_done
    self.store_result(task_id,result,state,request=request)
  File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py",line 434,in store_result
    self._store_result(task_id,traceback,File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py",line 856,in _store_result
    self._set_with_state(self.get_key_for_task(task_id),self.encode(Meta),state)
  File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py",line 324,in encode
    _,_,payload = self._encode(data)
  File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py",line 328,in _encode
    return dumps(data,serializer=self.serializer)
  File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py",line 220,in dumps
    payload = encoder(data)
  File "/usr/local/lib/python3.9/contextlib.py",line 135,in __exit__
    self.gen.throw(type,value,traceback)
  File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py",line 53,in _reraise_errors
    reraise(wrapper,wrapper(exc),sys.exc_info()[2])
  File "/usr/local/lib/python3.9/site-packages/kombu/exceptions.py",line 21,in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py",line 49,in _reraise_errors
    yield
  File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py",in dumps
    payload = encoder(data)
  File "/usr/local/lib/python3.9/site-packages/kombu/utils/json.py",line 65,in dumps
    return _dumps(s,cls=cls or _default_encoder,File "/usr/local/lib/python3.9/json/__init__.py",line 234,in dumps
    return cls(
  File "/usr/local/lib/python3.9/json/encoder.py",line 199,in encode
    chunks = self.iterencode(o,_one_shot=True)
  File "/usr/local/lib/python3.9/json/encoder.py",line 257,in iterencode
    return _iterencode(o,0)
  File "/usr/local/lib/python3.9/site-packages/kombu/utils/json.py",line 55,in default
    return super().default(o)
  File "/usr/local/lib/python3.9/json/encoder.py",line 179,in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
kombu.exceptions.EncodeError: Object of type coroutine is not JSON serializable

解决方法

Schema.jsonify 方法——就像来自 Flask 的 jsonify——返回一个 Response 对象,kombu 将无法序列化(IIRC Kombu 默认序列化为 JSON)。您可能应该使用 dump 而不是 jsonify 来返回字典。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...