问题描述
我正在尝试从pulsar读取数据并使用Python中的asyncio和asyncpg写入postgres表。
import asyncio
import asyncpg
import requests
import nest_asyncio
nest_asyncio.apply()
class Connection:
loop = asyncio.get_event_loop()
@classmethod
async def connect_persist_postgresql(cls,data):
connection = await asyncpg.connect(
user='postgres,password='password_postgres',database='postgres',host='10.xxx.xxx.xx',port=5432
)
query = '''INSERT INTO {}.{} (
uid,col1,col2,col3,col4
)
VALUES (
$1,$2,$3,$4,$5,)'''
if isinstance(data,list):
for event in data:
await connection.execute(
query,'schema','table_test',event['uid'],event['col1'],event['col2'],event['col3'],event['col4'],)
else:
await connection.execute(
query,data[0]['uid'],data[0]['col1'],data[0]['col2'],data[0]['col3'],data[0]['col4']
)
await connection.close()
然后我以消费者的身份连接到pulsar,获取数据并尝试用下一行写入postgresql表: def run_consumer():
client = pulsar.Client(
'pulsar://10.xxx.xx.xx:6650')
consumer = client.subscribe('test-topic-pulsar-postgresql','test-subs')
while True:
msg = consumer.receive()
unpacked_list = []
loop = asyncio.get_event_loop()
try:
my_new_string_value = msg.data()
unpacked_list.append(my_new_string_value)
loop.run_until_complete(connect_persist_postgresql(unpacked_list))
except Exception as a:
print(a)
但是我收到一个错误:This event loop is already running
欢迎任何提示,谢谢。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)