从austpg代理向worker租用asyncpg连接池

问题描述

我正在使用kafka + faust

我有示例任务应用程序

from faust import Record
import faust
import asyncio
import asyncpg
from os import environ

async def init_app(): #async method to create a pool
    pool = await asyncpg.create_pool(f"postgresql://{environ.get('POSTGRES_USER')}:{environ.get('POSTGRES_PASSWORD')}@postgres:5432/{environ.get('POSTGRES_DB')}")
    return pool

class Task(Record,serializer='json'):
    task: str
    p1: str
class kTask(Record,serializer='json'):
    task_id: str

app = faust.App('task',broker='kafka://broker:29092',stream_wait_empty=False)
topic = app.topic('task',value_type=Task,key_type=kTask)

loop = asyncio.get_event_loop() # here is where I try to get on the same event loop as the agent
pool = loop.run_until_complete(init_app())

@app.agent(topic)
async def hello(tasks):
    async with pool.acquire() as connection: #for every worker that has to do a task I want them to use the connection pool from the agent
        async for key,task in tasks.items():
            print(f'request: {key.task_id*6} task:{task.task} with {task.p1}')
            async with connection.transaction():

                result = await connection.fetchval('select 2 ^ 4') #testing to make sure this is being run on the DB
                print(result)

if __name__ == '__main__':
    app.main()

启动我的浮士德代理后,我可以看到postgres创建了一个包含10个连接的池,并且代码按预期执行。 5分钟后,所有10个连接均断开。我在想的是,我可能正在为一个工作人员创建一个池-当该工作人员被自动杀死后,该池就会被丢弃。

enter image description here

是否有任何简便的方法可以从代理创建连接池并将其传递给工作人员?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...