使用NATS.io PUB-SUB架构的Python asyncio

问题描述

Python:3.6.9

我正在通过PUB-SUB体系结构在云中实现机器学习(ML)模型。 Python程序会侦听许多 topic

两个主要的主题是:trainpredict

train主题训练一个需要一些时间的ML模型。发生这种情况时,我想通过较早的训练,对已经存在于应用程序内存中的另一个模型发出predict请求。我正在使用asyncio为我侦听的每个主题创建任务,然后在侦听该主题后立即执行callback函数

我面临的问题是进行predict操作时无法立即train。它总是顺序发生。

我认为这是一个multithreading问题,甚至可能与Python的GIL有关。

我已经使用NATS.io(Pub-Sub架构)模拟了这种情况。这段代码是我实际代码的精简版。

import asyncio
from nats.aio.client import Client as Nats
import time
import json

nc = Nats()
is_connected = False


async def connect():
    """
    Initial connection to the NATS server will make a ping every 5min(3 times) and will mark the connection stale if
    it does not receive any reply(PONG) at all
    """
    await nc.connect(servers=["http://0.0.0.0:4222"],ping_interval=300,max_outstanding_pings=3,token='development')


async def listen():
    if is_connected is False:
        await connect()
    return


async def train(msg):
    """
    The bot train requests come here
    :return:
    """
    subject = msg.subject
    reply = msg.reply
    data_train = json.loads(msg.data.decode())
    print("Received a message on '{subject} {reply}': {data}".format(
        subject=subject,reply=reply,data=data_train))

    print('Performing train . . ')
    time.sleep(20)
    print('Train completed. . ')


async def predict(msg):
    """
    The bot predict requests come here
    :return:
    """
    subject = msg.subject
    reply = msg.reply
    data_train = json.loads(msg.data.decode())
    print("Received a message on '{subject} {reply}': {data}".format(
        subject=subject,data=data_train))

    time.sleep(1)
    print('I am predict. I need to respond immediately')


async def main():
    await listen()
    loop.create_task(nc.subscribe('train',cb=train))
    loop.create_task(nc.subscribe('predict',cb=predict))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.run_forever()

同时在终端中:

  • 发送关于train主题的请求
  • 等待3-5秒
  • 发送关于predict主题的请求

实际输出

Received a message on 'train ': {'data': 123}
Performing train . . 
Train completed. . 
Received a message on 'predict ': {'data': 'text'}
I am predict. I need to respond immediately

预期输出

Received a message on 'train ': {'data': 123}
Performing train . . 
Received a message on 'predict ': {'data': 'text'}
I am predict. I need to respond immediately
Train completed. . 

我什至尝试在导入main()模块后对multiprocessing函数进行更改来使用多处理。但是没有运气!

async def main():
    await listen()
    p1 = multiprocessing.Process(target=loop.create_task,args=[await nc.subscribe('train',cb=train)])
    p2 = multiprocessing.Process(target=loop.create_task,args=[await nc.subscribe('predict',cb=predict)])

    p1.start()
    p2.start()

    print('Started processes')

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)