在 Django 中创建中间件以验证来自不同微服务的用户时出错

问题描述

使用 Django 和 pyJWT 构建微服务

让我在总结庄园解释一下,也许有人可以帮我解决这个问题

我有三个微服务,即

  1. 身份验证:创建用户并使用 RS256 算法为用户生成令牌
  2. Student :基本上是需要使用 JWT 验证的用户类型
  3. Investor:也是需要使用JWT验证的用户类型

[我尝试在身份验证服务中创建一个通用的创建令牌函数,例如]

data = {"id": user.uuid.hex,"type": user.user_type}
encoded = jwt.encode(data,private_key,algorithm="RS256")
return encoded

它正在生成一个我已经使用 JWT.io 验证过的正确令牌

在我的学生服务中,我创建了一个类似这样的中间件

class JWTAuthenticationMiddleware(object):
    
    #Simple  JWT token based authentication.

    #Clients should authenticate by passing the token key in the "Authorization"
    #HTTP header,prepended with the string "Bearer ".  For example:

    #    Authorization: Bearer <UNIQUE_TOKEN>
   

    keyword = "Bearer"

    def __init__(self,get_response):
        self.get_response = get_response

    def __call__(self,request):

        auth = request.Meta.get("AUTHORIZATION",None)

        splitted = auth.split() if auth else []

        if len(splitted) == 2 and splitted[0] == self.keyword:
            if not splitted or splitted[0].lower() != self.keyword.lower().encode():
                return None
            if len(splitted) == 0:
                msg = _("Invalid token header. No credentials provided.")
                raise exceptions.AuthenticationFailed(msg)
            elif len(splitted) > 2:
                msg = _("Invalid token header. Token string should not contain spaces.")
                raise exceptions.AuthenticationFailed(msg)

            user = get_user(request)

            decoded = verify_token(auth)
            try:
                student = Student.objects.get(user_id=decoded.get("id"))
            except Student.DoesNotExist:
                student = Student.objects.update(user_id=decoded.get("id"))

        response = self.get_response(request)
        return response

我的 verify_token 函数看起来像

def verify_token(token):
decoded = jwt.decode(token,JWT_AUTH["PUBLIC_KEY"],algorithms=["RS256"])
return decoded

我还在身份验证中间件下方的设置中添加了我的中间件

MIDDLEWARE = [
    "django.middleware.security.SecurityMiddleware","corsheaders.middleware.CorsMiddleware","django.contrib.sessions.middleware.SessionMiddleware","django.middleware.locale.LocaleMiddleware","django.middleware.common.CommonMiddleware","django.middleware.csrf.CsrfViewMiddleware","django.contrib.auth.middleware.AuthenticationMiddleware","config.middleware.authentication.JWTAuthenticationMiddleware","django.contrib.messages.middleware.MessageMiddleware","django.middleware.common.brokenLinkEmailsMiddleware","django.middleware.clickjacking.XFrameOptionsMiddleware",]

我只是无法解码我的令牌并将有效负载分配给相应的用户类型,有人能帮我解决哪里出错了吗?我也尝试过使用类似这样的身份验证后端,它不起作用,已经实现了 simple_jwt_djangorestframework 包,但它没有在学生服务上解码我的有效负载并说无效令牌所以我不想添加它以增加不必要的代码

我的视图集看起来像这样

class StudentViewset(viewsets.ViewSet):
    queryset = Student.objects.filter(is_active=True)
    serializer_class = StudentSerializer
    lookup_field = "uuid"
    permission_classes = [IsAuthenticated]

当我使用 isAuthenticated 作为权限类时,我的错误总是说

"message": "Authentication credentials were not provided.",

解决方法

也许验证原始令牌应该有效:

decoded = verify_token(splitted[1])

实际上您不必实施它。它有 simple jwt package。按照文档安装包,如果它不起作用,请提供错误来帮助您。

,

自己回答这个问题以备将来参考

class JWTAuthenticationMiddleware(object):
    
    #Simple  JWT token based authentication.

    #Clients should authenticate by passing the token key in the "Authorization"
    #HTTP header,prepended with the string "Bearer ".  For example:

    #    Authorization: Bearer <UNIQUE_TOKEN>
   

    keyword = "Bearer"

    def __init__(self,get_response):
        self.get_response = get_response

    def __call__(self,request):

        auth = request.META.get("AUTHORIZATION",None)

        splitted = auth.split() if auth else []

        if len(splitted) == 2 and splitted[0] == self.keyword:
            if not splitted or splitted[0].lower() != self.keyword.lower().encode():
                return None
            if len(splitted) == 0:
                msg = _("Invalid token header. No credentials provided.")
                return JsonResponse(data=msg,status=403,safe=False)
            elif len(splitted) > 2:
                msg = _("Invalid token header. Token string should not contain spaces.")
                return JsonResponse(data=msg,safe=False)

            user = get_user(request)

            decoded = verify_token(auth)
            try:
                student = Student.objects.get(user_id=decoded.get("id"))
                return self.get_response(request)
            except Student.DoesNotExist:
                msg = _("Invalid User Not found")
                return JsonResponse(data=msg,safe=False)

我做错的几件事是当我找到一个学生时没有传递响应,这就是为什么当时没有绕过中间件,而且中间件级别的异常不能使用中间件只返回 HTTPResponses,因此我想到了使用 JsonResponse .