没有使用异步阻止块“同时为真”

问题描述

使用下面的代码,我尝试使用asyncio启动2个无限循环:

async def do_job_1():
    while True :
        print('do_job_1')
        await asyncio.sleep(5)

async def do_job_2():
    while True :
        print('do_job_2')
        await asyncio.sleep(5)

if __name__ == '__main__':
    asyncio.run(do_job_1())
    asyncio.run(do_job_2())

do_job_1阻止do_job_2,因为do_job_2从不打印do_job_1。我犯了什么错误

最终我正在尝试转换kafka消费者代码

from confluent_kafka import Consumer,KafkaError

settings = {
    'bootstrap.servers': 'localhost:9092','group.id': 'mygroup','client.id': 'client-1','enable.auto.commit': True,'session.timeout.ms': 6000,'default.topic.config': {'auto.offset.reset': 'smallest'}
}

c = Consumer(settings)

c.subscribe(['mytopic'])

try:
    while True:
        msg = c.poll(0.1)
        if msg is None:
            continue
        elif not msg.error():
            print('Received message: {0}'.format(msg.value()))
        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {0}/{1}'
                  .format(msg.topic(),msg.partition()))
        else:
            print('Error occured: {0}'.format(msg.error().str()))

except KeyboardInterrupt:
    pass

finally:
    c.close()
https://www.confluent.io/blog/introduction-to-apache-kafka-for-python-programmers提取

是并发的,因此我可以并行处理Kafka消息。

解决方法

来自help(asyncio.run)

应将其用作asyncio程序的主要入口,理想情况下应仅调用一次。

但是您可以使用asyncio.gather来加入任务:

import asyncio

async def do_job_1():
    while True :
        print('do_job_1')
        await asyncio.sleep(5)

async def do_job_2():
    while True :
        print('do_job_2')
        await asyncio.sleep(5)

async def main():
    await asyncio.gather(do_job_1(),do_job_2())

if __name__ == '__main__':
    asyncio.run(main())