问题描述
我需要在我的应用中实现断路器模式,所以我尝试使用 PyBreaker 来实现。该应用程序运行正常,但我不知道如何测试它并查看断路器是否运行良好。这是我的 CircuitBreakerListener
课:
import pybreaker
import logging
class CircuitBreakerListener(pybreaker.CircuitBreakerListener):
def failure(self,cb,exc):
logging.info("CRITICAL {0}: The last request caused a system error".format(cb))
def state_change(self,old_state,new_state):
logging.info("Circuit Breaker {0}: The {1} state changed to {2} state".format(cb,new_state))
from starlette.responses import JSONResponse
def circuitbreaker_call(circuitbreaker,data):
if circuitbreaker.current_state == 'open':
data = JSONResponse(status_code=404,content={"Error": "Oops,something went wrong. Try again in 10 seconds"})
print(circuitbreaker.current_state)
return data
据我所知,断路器应该在发生 Exception
时改变其状态。这就是我为了抛出异常所做的:
db_breaker = pybreaker.CircuitBreaker(fail_max=1,reset_timeout=5,listeners=[CircuitBreakerListener()])
@db_breaker
def get_projects(db: Session,skip: int = 0,limit: int = 100):
return db.query(models.Project).offset(skip).limit(limit).all() == 'a'
和我的端点:
@app.get("/projects",response_model=List[schemas.Project])
def get_projects(skip: int = 0,limit: int = 100,db: Session = Depends(get_db)):
return circuitbreaker_call(db_breaker,crud.get_projects(db,skip=skip,limit=limit))
但我没有收到带有“糟糕,出了点问题”消息的 404 代码,而是在控制台中收到了 500 内部服务器错误,而 /projects
端点上没有任何真正的变化。有没有办法触发断路器并测试它是否真的有效?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)