问题描述
示例 here 展示了如何使用远程过程调用 (RPC) 在 python 中创建客户端和服务器。
但我无法想象 FastAPI 服务如何成为使用 pika for RabbitMQ 来处理来自 RCP 客户端的请求的服务器。
通过显式调用它们将请求任何 Web 服务,但是,我无法想象如何将 RabbitMQ 使用者集成到 Web 服务中。
另一方面,对于客户端来说,这样做很容易,通过显式调用 Web 服务,您可以发布对队列的请求,see this example
请问有什么帮助吗?还是一个好的开始?
解决方法
您可以将 aio_pika
与 RPC 模式一起使用并执行以下操作:
服务 1(消耗)
循环消费:
# app/__init__.py
from fastapi import FastAPI
from app.rpc import consume
app = FastAPI()
...
@app.on_event('startup')
def startup():
loop = asyncio.get_event_loop()
# use the same loop to consume
asyncio.ensure_future(consume(loop))
...
创建连接、通道并注册要从另一个服务调用的远程方法:
# app/rpc.py
from aio_pika import connect_robust
from aio_pika.patterns import RPC
from app.config import config
__all__ = [
'consume'
]
def remote_method():
# DO SOMETHING
# Move this method along with others to another place e.g. app/rpc_methods
# I put it here for simplicity
return 'It works!'
async def consume(loop):
connection = await connect_robust(config.AMQP_URI,loop=loop)
channel = await connection.channel()
rpc = await RPC.create(channel)
# Register your remote method
await rpc.register('remote_method',remote_method,auto_delete=True)
return connection
这就是您需要使用和响应的全部内容,现在让我们看看调用此远程方法的第二个服务。
服务 2(调用远程方法)
让我们先创建 RPC 中间件,以便轻松管理和访问 RPC 对象,以便从 API 函数调用我们的远程方法:
# app/utils/rpc_middleware.py
import asyncio
from fastapi import Request,Response
from aio_pika import connect_robust
from aio_pika.patterns import RPC
from app.config import config
__all__ = [
'get_rpc','rpc_middleware'
]
async def rpc_middleware(request: Request,call_next):
response = Response("Internal server error",status_code=500)
try:
# You can also pass a loop as an argument. Keep it here now for simplicity
loop = asyncio.get_event_loop()
connection = await connect_robust(config.AMQP_URI,loop=loop)
channel = await connection.channel()
request.state.rpc = await RPC.create(channel)
response = await call_next(request)
finally:
# UPD: just thought that we probably want to keep queue and don't
# recreate it for each request so we can remove this line and move
# connection,channel and rpc initialisation out from middleware
# and do it once on app start
# Also based of this: https://github.com/encode/starlette/issues/1029
# it's better to create ASGI middleware instead of HTTP
await request.state.rpc.close()
return response
# Dependency to use rpc inside routes functions
def get_rpc(request: Request):
rpc = request.state.rpc
return rpc
应用 RPC 中间件:
# app/__init__.py
from app.utils import rpc_middleware
...
app.middleware('http')(rpc_middleware)
...
通过 API 函数中的依赖项使用 RPC 对象:
# app/api/whatever.py
from aio_pika.patterns import RPC
from app.utils import get_rpc
...
@router.get('/rpc')
async def rpc_test(rpc: RPC = Depends(get_rpc)):
response = await rpc.proxy.remote_method()
...
添加一些日志记录以跟踪两个服务中发生的情况。此外,您还可以将两个服务的 RPC 逻辑合二为一,以便能够从同一服务中使用和调用远程方法。
希望它有助于获得基本的想法。
,今天(4 月 28 日)由安德烈·巴拉诺夫斯基 (Andrej Baranovskij) 发布的 youtube 上有一个很棒的教程,讨论了它。
我将在下面提供链接。你也可以查看github源代码。