从Pulsar读取并使用asyncio加载到Postgres

问题描述

我正在尝试从pulsar读取数据并使用Python中的asyncio和asyncpg写入postgres表。

import asyncio
import asyncpg
import requests
import nest_asyncio
nest_asyncio.apply()
        
class Connection:

    loop = asyncio.get_event_loop()

    @classmethod
    async def connect_persist_postgresql(cls,data):
        connection = await asyncpg.connect(
            user='postgres,password='password_postgres',database='postgres',host='10.xxx.xxx.xx',port=5432
        )

        query = '''INSERT INTO {}.{} (
                                        uid,col1,col2,col3,col4
                                        ) 
                   VALUES (
                           $1,$2,$3,$4,$5,)'''
        if isinstance(data,list):
            for event in data:
                await connection.execute(
                    query,'schema','table_test',event['uid'],event['col1'],event['col2'],event['col3'],event['col4'],)
        else:
            await connection.execute(
                query,data[0]['uid'],data[0]['col1'],data[0]['col2'],data[0]['col3'],data[0]['col4']
            )

        await connection.close()

然后我以消费者的身份连接到pulsar,获取数据并尝试用下一行写入postgresql表: def run_consumer():

client = pulsar.Client(
    'pulsar://10.xxx.xx.xx:6650')
consumer = client.subscribe('test-topic-pulsar-postgresql','test-subs')
while True:
    msg = consumer.receive()
    unpacked_list = []
    loop = asyncio.get_event_loop()
    try:
        my_new_string_value = msg.data()
        unpacked_list.append(my_new_string_value)
        loop.run_until_complete(connect_persist_postgresql(unpacked_list))
    except Exception as a:
        print(a)

但是我收到一个错误:This event loop is already running

欢迎任何提示,谢谢。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...