如何测试pybreaker并查看它是否有效

问题描述

我需要在我的应用中实现断路器模式,所以我尝试使用 PyBreaker 来实现。该应用程序运行正常,但我不知道如何测试它并查看断路器是否运行良好。这是我的 CircuitBreakerListener 课:

import pybreaker
import logging

class CircuitBreakerListener(pybreaker.CircuitBreakerListener):

    def failure(self,cb,exc):
        logging.info("CRITICAL {0}: The last request caused a system error".format(cb))

    def state_change(self,old_state,new_state):
        logging.info("Circuit Breaker {0}: The {1} state changed to {2} state".format(cb,new_state))

和我的函数调用

from starlette.responses import JSONResponse

def circuitbreaker_call(circuitbreaker,data):
    if circuitbreaker.current_state == 'open':
        data = JSONResponse(status_code=404,content={"Error": "Oops,something went wrong. Try again in 10 seconds"})
    print(circuitbreaker.current_state)
    return data

据我所知,断路器应该在发生 Exception 时改变其状态。这就是我为了抛出异常所做的:

db_breaker = pybreaker.CircuitBreaker(fail_max=1,reset_timeout=5,listeners=[CircuitBreakerListener()])

@db_breaker
def get_projects(db: Session,skip: int = 0,limit: int = 100):
    return db.query(models.Project).offset(skip).limit(limit).all() == 'a'

和我的端点:

@app.get("/projects",response_model=List[schemas.Project])
def get_projects(skip: int = 0,limit: int = 100,db: Session = Depends(get_db)):
    return circuitbreaker_call(db_breaker,crud.get_projects(db,skip=skip,limit=limit))

但我没有收到带有“糟糕,出了点问题”消息的 404 代码,而是在控制台中收到了 500 内部服务器错误,而 /projects 端点上没有任何真正的变化。有没有办法触发断路器并测试它是否真的有效?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)