DRF认证权限频率

目录

编写登录接口

认证

编写认证类

全局使用

局部使用

内置的认证类

on_delete相关

权限

编写权限类

全局使用

局部使用

局部禁用

内置权限类

 频率

编写频率类

配置文件

局部配置

内置的频率类

认证源码分析

权限源码分析

频率源码分析


编写登录接口

models.py

from django.db import models# Create your models here.class User(models.Model):username = models.CharField(max_length=32)password = models.CharField(max_length=32)class UserToken(models.Model):token = models.CharField(max_length=32)user = models.OneToOneField(to='User', on_delete=models.CASCADE)

views.py

from .serializer import UserSerializer
from .models import User, UserToken
import uuid
from django.contrib.auth.hashers import make_passwordclass UserView(ModelViewSet):queryset = Userserializer_class = UserSerializer@action(methods=['POST'], detail=False)def login(self, request):ser = self.get_serializer(data=request.data)if ser.is_valid():username = ser.data.get('username')password = ser.data.get('password')user = User.objects.filter(username=username, password=password).first()if user:# 生成唯一随机tokentoken = uuid.uuid4()# 用户登陆过就更新没登录就新增UserToken表UserToken.objects.update_or_create(defaults={'token': token}, user=user)return Response({'code': 100, 'msg': '登陆成功'})return Response({'code': 101, 'msg': '用户名或密码错误'})return Response(ser.errors)

serializer.py

from .models import User
from rest_framework import serializersclass UserSerializer(serializers.ModelSerializer):class Meta:model = Userfields = '__all__'

urls.y

from django.contrib import admin
from django.urls import path
from rest_framework.routers import DefaultRouter
from app01 import viewsrouter = DefaultRouter()router.register('user', views.UserView, 'user')urlpatterns = [path('admin/', admin.site.urls),
]urlpatterns.extend(router.urls)

认证

访问接口时, 需要登录后才能访问

编写认证类

1. 新建一个认证py文件, 写一个认证类, 继承BaseAuthentication

2.规定通过路由后缀传递token值, 【如果token值可以查询到, 说明可以登录】

3. 重写authenticate方法

4. 验证成功返回两个值【当前登录用户和token]

5. 验证失败则抛出AuthenticationFailed异常

6. 只要验证成功,后续requestuser就是当前登陆用户

userauth.py

from rest_framework.authentication import BaseAuthentication
from .models import UserToken
from rest_framework.exceptions import AuthenticationFailedclass UserAuth(BaseAuthentication):def authenticate(self, request):token = request.GET.get('token')token_obj = UserToken.objects.filter(token=token).first()if token_obj:return token_obj.user, tokenelse:raise AuthenticationFailed('没有登陆')

全局使用

REST_FRAMEWORK = {'DEFAULT_AUTHENTICATION_CLASSES': ['app01.userauth.UserAuth']
}

局部使用

class UserView(ModelViewSet):queryset = User.objectsserializer_class = UserSerializerauthentication_classes = [UserAuth]

局部禁用将authentication_classes设置为空即可。

内置的认证类

    -BasicAuthentication
    -RemoteUserAuthentication
    -SessionAuthentication:session认证,建议自己写
        -如果前端带着cookie过来,经过session的中间件,如果登录了,在request.user中就有当前登录用户
        -drf没有限制是否登录
        -加了这个认证,如果没有登录,就不允许往后访问了
    -TokenAuthentication:jwt认证,建议自己写
    -建议自己写

on_delete相关

当一个由ForeignKey引用的对象被删除时, django将模拟on_delete参数所指定的SQL约束的行为。 例如: 有一个可为空的ForeignKey,并且你希望当被引用的对象被删除时他被设置为空

class Book(models.Model):title = models.CharField(max_length=32)price = models.CharField(max_length=32)publish = models.ForeignKey(to='Publish', on_delete=models.SET_NULL, null=True)

1. models.CASCADE

级联操作, 当主表中被连接的一条数据删除时,从表中所有与之关联的数据同时被删除

2. models.SET_NULL

当主表中的一行数据删除时, 从表中与之关联的数据的相关字段设置为null, 此时注意定义外键时, 这个字段必须可以允许为空

3. models.PROTECT

当主表中的一行数据删除时,由于从表中相关字段是受保护的外键 , 所以都不允许删除

4. models.SET_DEFAULT

当主表中的一行数据删除时,从表中所有的关联数据的关联字段设置为默认值, 此时主要定义外键时, 这个外键字段应该有一个默认值

5. models.SET()

【当主表中的一条数据删除时, 从表中所有的关联数据字段设置为SET()中设置的值, 与models.SET_DEFAULT相似, 只不过此时从表中的相关字段不需要设置default参数】

将ForeignKey设置为传递给Set()的值, 如果传入了一个可调配对象,则设置为调用它的结果, 大多数情况下,为了避免在导入models,py时执行查询, 需要传入一个可调用对象

from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import modelsdef get_sentinel_user():return get_user_model().objects.get_or_create(username='deleted')[0]class MyModel(models.Model):user = models.ForeignKey(settings.AUTH_USER_MODEL,on_delete=models.SET(get_sentinel_user),
)

6. models.DO_NOTHING

什么都不做, 一切都看数据库级别的约束, 注意数据库级别的默认约束为RESTRICT , 这个约束与jango中的models.PROTECT相似

7.models.RESTRICT

通过引发RestrictError(django.db.IntegrityError的一个子类)来防止删除被引用的对象, 与PROTECT不同的是, 如果被引用的对象也引用了一个在统一操作中被删除的不同对象, 但通过CASCADE关系,则允许删除被引用的对象。

class Artist(models.Model):
name = models.CharField(max_length=10)class Album(models.Model):artist = models.ForeignKey(Artist, on_delete=models.CASCADE)class Song(models.Model):artist = models.ForeignKey(Artist, on_delete=models.CASCADE)album = models.ForeignKey(Album, on_delete=models.RESTRICT)

Artist可以被删除, 即使这意味着删除被Song引用的Album,因为Song也通过级联关系引用Artist本身, 例如

>>> artist_one = Artist.objects.create(name='artist one')
>>> artist_two = Artist.objects.create(name='artist two')
>>> album_one = Album.objects.create(artist=artist_one)
>>> album_two = Album.objects.create(artist=artist_two)
>>> song_one = Song.objects.create(artist=artist_one, album=album_one)
>>> song_two = Song.objects.create(artist=artist_one, album=album_two)
>>> album_one.delete()
# Raises RestrictedError.
>>> artist_two.delete()
# Raises RestrictedError.
>>> artist_one.delete()
(4, {'Song': 2, 'Album': 1, 'Artist': 1})

权限

权限控制可以限制用户对于视图的访问和对于具体数据对象的访问

        在执行视图的diapatch方法前, 会先进行视图访问权限的判断

        在通过get_object()获取具体对象时, 会进行模型对象访问权限的判断

编写权限类

限制只有超级用户才能访问

models.py

from django.db import models# Create your models here.
class User(models.Model):username = models.CharField(max_length=32)password = models.CharField(max_length=32)user_type = models.IntegerField(choices=((1, 'SVIP'), (2, 'VIP'), (3, '普通')))class UserToken(models.Model):token = models.CharField(max_length=32)user = models.OneToOneField(to=User, on_delete=models.CASCADE)class Book(models.Model):name = models.CharField(max_length=32)price = models.CharField(max_length=32)create_time = models.DateTimeField(auto_now_add=True)publish = models.ForeignKey(to='Publish', on_delete=models.SET_NULL, null=True)def detail(self):try:return {'出版社': self.publish.name, '出版社地址': self.publish.addr}except:return {}class Publish(models.Model):name = models.CharField(max_length=32)addr = models.CharField(max_length=32)def detail(self):try:return [{'书名': _.name, '书价': _.price, '出版日期': _.create_time} for _ in self.book_set.all()]except:return []

serializer.py

from rest_framework import serializers
from .models import Book, Publishclass BookSerializer(serializers.ModelSerializer):class Meta:model = Bookfields = ['id', 'name', 'price', 'create_time', 'publish', 'detail']extra_kwargs = {'create_time': {'read_only': True},'publish': {'write_only': True},'detail': {'read_only': True}}class PublishSerializer(serializers.ModelSerializer):class Meta:model = Publishfields = ['id', 'name', 'addr', 'detail']extra_kwargs = {'detail': {'read_only': True}}

 views.py

from django.shortcuts import render# Create your views here.
from rest_framework.viewsets import ModelViewSet, GenericViewSet
from .models import Book, Publish, User, UserToken
from .serializer import BookSerializer, PublishSerializer
import uuid
from rest_framework.response import Response
from rest_framework.decorators import actionclass BookView(ModelViewSet):queryset = Book.objectsserializer_class = BookSerializerpermission_classes = []class PublishView(ModelViewSet):queryset = Publish.objectsserializer_class = PublishSerializerclass UserView(GenericViewSet):queryset = User.objectsserializer_class = Noneauthentication_classes = []permission_classes = []@action(methods=['POST'], detail=False, url_path='login')def login(self, request):username = request.data.get('username')password = request.data.get('password')obj = User.objects.filter(username=username, password=password).first()if obj:token = uuid.uuid4()UserToken.objects.update_or_create(defaults={'token': token}, user=obj)return Response({'code': 100, 'msg': '登陆成功', 'token': token})return Response({'code': 101, 'msg': '用户名或密码错误!'})

步骤

1, 写一个类继承BasePermission

2,重写has_permission方法

3,认证和才走权限,所以request.user就是当前登录用户

 4,如果有权限就返回True,没有就返回False

5,self.message就是返回给前端的提示信息

 permissions.py

from rest_framework.authentication import BaseAuthentication
from .models import UserToken
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.permissions import BasePermission
from rest_framework.throttling import SimpleRateThrottle, BaseThrottleclass Auth(BaseAuthentication):def authenticate(self, request):# token在请求体中token = request.META.get('HTTP_TOKEN')obj = UserToken.objects.filter(token=token).first()if obj:return obj.user, tokenelse:raise AuthenticationFailed('没有登陆,不能访问!')class Per(BasePermission):def has_permission(self, request, view):user_type = request.user.user_typeif user_type == 1:return Trueelse:self.message = f'{request.user.get_user_type_display()}用户没有权限'return False

全局使用

同认证类一致

REST_FRAMEWORK = {'DEFAULT_AUTHENTICATION_CLASSES': ['app01.authentication_throttling_permissions.Auth'],
}

局部使用

from .authentication_throttling_permissions import Perclass UserView(GenericViewSet):...permission_classes = [per]

局部禁用

class UserView(GenericViewSet):permission_classes = []

说明

若要自定义权限,需要继承rest_framework.permissions.BasePermission父类, 并实现以下两个方法中的任意一个或者全部

1, .has_permission(self,request,view)  是否可以访问视图, view表示当前视图对象

2, .has_object_permission(self,request,view,obj) 是否可以访问数据对象,view表示当前视图, obj为数据对象

内置权限类

-BasePermission:所有的基类
    -AllowAny:允许所有
    -IsAuthenticated:是否登录,登录了才有权限
        -认证类,如果登录了,返回(当前登录用户,None), 如果没有登录返回 None
    -IsAdminUser:auth的user表中is_staff字段是否为True,对后台管理有没有权限

 频率

可以对接口访问的频次进行限制,以减轻服务器压力, 一般用于付费购买次数,投票等场景使用

编写频率类

根据用户IP限制

1,写一个类,继承SimpleRateThrottle

2,定义一个类属性scope='xx'

3,重新get_cache_key方法, get_cache_key方法返回什么就以说明作为频率限制的依据(返回唯一的依据如ip 或者id)

4,settings中配置作用域限制频次

 throttling.py

from rest_framework.throttling import BaseThrottle,SimpleRateThrottleclass VisitThrottling(SimpleRateThrottle):# 类属性,这个类属性可以随意命名,但是要跟配置文件对应scope = 'apple'def get_cache_key(self, request, view):# 返回什么, 频率就会以什么作为限制return request.META.get('REMOTE_ADDR')

配置文件

settings.py


REST_FRAMEWORK = {'DEFAULT_THROTTLE_CLASSES':['app01.throttling.VisitThrottling'],'DEFAULT_THROTTLE_RATES': {'apple': '3/m'
}}

局部配置

  throttle_classes = [VisitThorottling,]

内置的频率类


    -BaseThrottle:所有的基类
    -SimpleRateThrottle:咱们写频率类都继承它,少写代码
    -AnonRateThrottle:如果你想按ip地址限制,只需要在配置文件中配置
      'DEFAULT_THROTTLE_RATES': {
        'anon': '3/m',
        }
    -UserRateThrottle:如果你想按用户id限制,只需要在配置文件中配置
     'DEFAULT_THROTTLE_RATES': {
        'user': '3/m',
        }

认证源码分析

APIView类中的dispatch方法包装类新的request方法, 执行力认证权限和频率, 执行视图类中的方法,捕获全局异常。 

入口: APIView类中的dispatch中的self.initial(request,*args,**kwargs)方法, self是视图类的对象, 执行的是APIView类中的initial方法

def initial(self, request, *args, **kwargs):...# Ensure that the incoming request is permittedself.perform_authentication(request)  # 认证相关self.check_permissions(request)self.check_throttles(request)

接下来去找self.perform_authentication(request),self是视图类对象, 执行的是APIView类中的perform_authentication方法

def perform_authentication(self, request):"""Perform authentication on the incoming request.Note that if you override this and simply 'pass', then authenticationwill instead be performed lazily, the first time either`request.user` or `request.auth` is accessed."""request.user

里面只执行了一行代码, 执行力request,user, request是Request类的对象, 所以却Request类中找user方法或属性

@property
def user(self):"""Returns the user associated with the current request, as authenticatedby the authentication classes provided to the request."""# request是Reques的对象,调用user方法,传入的self是requestif not hasattr(self, '_user'):with wrap_attributeerrors():self._authenticate()return self._user

如果request对象没有_user属性,就执行瑟利夫。——authenticate(), request确实没_user属性, 所以就执行Request类中的——authenticate方法

def _authenticate(self):"""Attempt to authenticate the request using each authentication instancein turn."""for authenticator in self.authenticators:try:user_auth_tuple = authenticator.authenticate(self)except exceptions.APIException:self._not_authenticated()raiseif user_auth_tuple is not None:self._authenticator = authenticatorself.user, self.auth = user_auth_tuplereturnself._not_authenticated()

for循环self.authenticators, self是Request类实例化时传入的, 而我们在dispatch中包装了新的request,所以去找Request实例化对象, 是在dispatch中的request=self.initializ_request(request,*args,**kwargs)这行代码的initialize_request方法实例化了Request。 所以去看initialize_request

def initialize_request(self, request, *args, **kwargs):"""Returns the initial request object."""parser_context = self.get_parser_context(request)return Request(request,parsers=self.get_parsers(),authenticators=self.get_authenticators(),negotiator=self.get_content_negotiator(),parser_context=parser_context)

Request实例化传入的参数authenticators是方法self.get_authenticators()的结果,所以去看self.get_authenticators()

def get_authenticators(self):"""Instantiates and returns the list of authenticators that this view can use."""return [auth() for auth in self.authentication_classes]

返回了一个列表生成式, 循环了self,authentication_classes, self是视图类的对象

authentication_classes就是我们在视图类中配置的认证类, 所以auth就是自己写的认证类,返回的就是认证类实例化的对象列表,回到Request类中的_authenticate方法

def _authenticate(self):"""Attempt to authenticate the request using each authentication instancein turn."""for authenticator in self.authenticators:try:user_auth_tuple = authenticator.authenticate(self)except exceptions.APIException:self._not_authenticated()raiseif user_auth_tuple is not None:self._authenticator = authenticatorself.user, self.auth = user_auth_tuplereturnself._not_authenticated()==============================================================
class Auth(BaseAuthentication):'''authenticate(self)传了一个self,为什么接受了两个值?因为authenticator对象调用authenticate方法,会自动将authenticator对象当作参数传入,而_authenticate里的self是Request类的对象,所以它就是request。'''def authenticate(self, request):

self.authenyicators就是自定义的认证类实例化的对象列表, authenticatot就是自定义的认证类实例化的对象。 接下来执行力aythentocator.authenticate(self),自定义的认证类实例化的对象的authenticate(self)方法,自定义的认证类继承了BaseAuthentication类, 类中有authenticate方法,但是什么页没有做并抛出了异常raise NotImplementedError(“.authenticate() must be overridden.”),所以我们要重写authenticate方法, 并返回一个元组, 接下来判断元组不为空就执行了self.user, self.auth=user_auth_tuple,所以authenticate方法认证通过需要返回两个值, 第一个是当前登录用户, 所以认证通过之后可以在request中访问到当前登录用户user和auth

如果有多个认证类, 前面的认证类需要返回None,因为不为空for循环就结束了,后边的认证类就不认证了、、

BaseAuthentication  # 认证类基类
    BasicAuthentication  # 基于浏览器进行认证
    RemoteUserAuthentication  # 基于Django admin中的用户进行认证,这也是官网的示例
    SessionAuthentication:sessin认证  # 基于django的session进行认证
        如果前端呆着cookie过来,经过session的中间件,如果登陆了,那么在request.user中就有当前登录用户
        drf没有限制是否登录
        加了这个认证如果没有登录,就不允许往后访问了
    TokenAuthentication,jwt认证  # 基于drf内部的token认证
 
源码验证流程
1.在django(CBV)中,客户端的发来的请求会执行视图类的as_view方法,而as_view方法中会执行dispacth方法,然后在根据请求的类型(反射)执行相应的方法(get、post等)。
 
2.使用django rest framework中的视图类需要继承APIView,请求到达视图类会执行视图类的as_view方法,而OrderView中没有as_view()方法,所以执行APIView的as_view()方法,
 
3.从APIView源码中可以看到APIView中as_view又执行了父类的as_view方法,在看看APIView的父类是View类,这恰好是django中的view视图类,
 
4.从View源码可以看出View类的as_view()方法执行流程:验证请求方法--->返回view函数名称(view函数会执行dispatch方法),一旦有请求进来执行view函数-->执行dispatch方法
 
5.当APIView的as_view方法执行了父类的as_view方法以后,请求进来会执行view方法,view方法中会执行dispatch方法,而Oderview没有dispatch方法,所以执行父类(APIView)的dispatch方法,
 
6.从APIView源码分析,执行APIView的dispatch方法时候会执行self.initialize_request方法,会对django原始的request进行封装。
 
7.self.initialize_request()源码分析,实例化Request()类,封装原始的request,authenticators(认证),执行self.get_authenticators(),到了这里就开始django rest framework的认证流程
 
8.self.get_authenticators()源码分析,采用列表生成式,循环self.authentication_classes,实例化其中的每一个类,返回列表,不难发现authentication_classes属性正式我们在认证的时候用到认证类列表,这里会自动寻找该属性进行认证。倘若我们的视图类没有定义认证方法呢?,当然django rest framework 已经给我们加了默认配置,如果我们没有定义会自动使用settings中的DEFAULT_AUTHENTICATION_CLASSES作为默认(全局),
 
 9.继续分析APIView的dispatch方法,此时执行self.inital方法,并将封装过后的request对象(Reuqest)作为参数进行传递,
 
10.在self.inital方法中会执行self.perform_authentication方法,而self.perform_authentication方法用会执行request.user,此时的request是Request对象,所以需分析Request类中的user属性,
 
11.从源码分析,在Request对象中,user属性是一个属性方法,并会执行self._authentication方法,
 
12.从源码分析,Request对象的self._authentication中循环self.authenticators(该列表是由认证对象构成的[对象1,对象2]),并执行每一个对象中的authenticate方法返回tuple,同时对该过程其进行了异常捕捉,有异常将返回给用户,下面是异常验证逻辑:
 
如果有异常则执行self._not_authenticated()方法,继续向上抛异常。
如果有返回值必须是一个元组,分别赋值给self.user, self.auth(request.user和request.auth),并跳出循环。
如果返回None,则由下一个循环处理,如果都为None,则执行self._not_authenticated(),返回 (AnonymousUser,None)
 
13.当都没有返回值,就执行self._not_authenticated(),相当于匿名用户,没有通过认证,并且此时django会返回默认的匿名用户设置AnonymousUser,如需要单独设置匿名用户返回值,则编写需要写UNAUTHENTICATED_USER的返回值:
 
14.所以经过以上分析,我们需要进行认证时候,需要在每一个认证类中定义authenticate进行验证,并且需要返回元祖。
 
原文链接:https://juejin.cn/post/7027424194186985508

权限源码分析

入口 ;APIView类中的dispatch中的self.initial(request,*Args,**kwargs)方法, self是视图类的对象,执行的是APIVIew类中的initial方法

def initial(self, request, *args, **kwargs):...# Ensure that the incoming request is permittedself.perform_authentication(request)self.check_permissions(request)  # 权限相关self.check_throttles(request)

认证完就走self.check_permission(request),self是视图类对象, 找check_permission方法,是APIView中的check_permissions

def check_permissions(self, request):for permission in self.get_permissions():if not permission.has_permission(request, self):self.permission_denied(request,message=getattr(permission, 'message', None),code=getattr(permission, 'code', None))

for循环self.get_permissions(),接着去找get_permissions, 找到的是APIView中的get_permissions

def get_permissions(self):"""Instantiates and returns the list of permissions that this view requires."""return [permission() for permission in self.permission_classes]

也是一个列表生成式, 循环的是视图类中配重的权限类, 返回的是自定义权限类的对象列表, 回到check_permissions

def check_permissions(self, request):for permission in self.get_permissions():# 调用has_permission方法会自动将permission对象传入# request是新包装的request# self是视图类的对象if not permission.has_permission(request, self):self.permission_denied(request,message=getattr(permission, 'message', None),code=getattr(permission, 'code', None))

permission是自定义权限类的对象, 接下来执行自定义权限类的对象has_permission方法, 自定义权限类继承了BasePermission, 里边有has_permission, 它什么都没干,直接返回True, 就是没有权限校验,所以我们需要重写has_permission方法, 因为是在if判断里所有has+permission要返回布尔值,有权限就返回True,没有权限就返回false。 

如果返回false,没有权限,就执行self.permission_denied方法,有一个参数message, 是在权限类对象中去取messahe属性, 如果没有就是空, 如果在权限类中制定了message,就将它方会给前端展示

BasePermission  # 权限类基类
    AllowAny  # 允许所有
    IsAuthenticated  # 是否登录,登陆了才有权限,基于django的认证权限,官方示例
        # 认证类,如果登陆了,返回(当前用户,None),如果没有等领域则返回None
    IsAdminUser  # auth的user表中的is_staff字段是否为True,对后台有没有权限,基于django admin权限控制
    IsAuthenticatedOrReadOnly  # 基于django admin
    
源码验证流程
1.同样请求到达视图时候,先执行APIView的dispatch方法,以下源码是我们在认证篇已经解读过了
 
2.执行inital方法,initial方法中执行perform_authentication则开始进行认证
 
3.当执行完perform_authentication方法认证通过时候,这时候就进入了本篇文章主题--权限(check_permissions方法),下面是check_permissions方法源码
 
4.从上源码中我们可以看出,perform_authentication方法中循环get_permissions结果,并逐一判断权限,所以需要分析get_permissions方法返回结果,以下是get_permissions方法源码
 
5.get_permissions方法中寻找权限类是通过self.permission_class字段寻找,和认证类一样默认该字段在全局也有配置,如果我们视图类中已经定义,则使用我们自己定义的类。
 
6.承接check_permissions方法,当认证类中的has_permission()方法返回false时(也就是认证不通过),则执行self.permission_denied(),以下是self.permission_denied()源码
 
7.认证不通过,则至此django rest framework的权限源码到此结束,相对于认证源码简单一些。
 
原文链接:https://juejin.cn/post/7027424194186985508

频率源码分析

入口:APIView类中的dispatch中的self.initial(request,*args,**kwargs)方法, self是视图类的对象, 执行的是APIView类中的initial方法

def initial(self, request, *args, **kwargs):...# Ensure that the incoming request is permittedself.perform_authentication(request)self.check_permissions(request)self.check_throttles(request)  # 频率相关

认证和权限都走完就会执行self.check_throttles(request),接着去找check_throttles方法,找到的是APIView的check_throttles

def check_throttles(self, request):throttle_durations = []for throttle in self.get_throttles():if not throttle.allow_request(request, self):throttle_durations.append(throttle.wait())if throttle_durations:# Filter out `None` values which may happen in case of config / rate# changes, see #1438durations = [duration for duration in throttle_durationsif duration is not None]duration = max(durations, default=None)self.throttled(request, duration)

循环了self.get_throttles(),接着去找get_throttles(),还是APIView的get_throttles。 self是试图类对象, throttlr_classes是我们配置的频率类列表, throttle就是一个频率类,throttle()就是频率类实例化产生对象

def get_throttles(self):"""Instantiates and returns the list of throttles that this view uses."""return [throttle() for throttle in self.throttle_classes]

我们定制的频率类继承了SimpleRateThrottle,里边有__init__方法,实例化时执行__init__方法, 其中的self时频率类的对象, self没有一个rate属性, 所以就执行了self.rate = self.get_rate(),接着去找get_rate()方法,找到的就是SimpleRateThrottle里的get_rate

def __init__(self):if not getattr(self, 'rate', None):self.rate = self.get_rate()self.num_requests, self.duration = self.parse_rate(self.rate)

get_rate首先判断频率类对象有没有一个scope属性, 如果没有就会抛出ImproperlyConfigured异常,所以我们必须在频率类中定义一个scope属性, 接着执行self.THROTTLE_RATES[self.scope],由于在配置文件中配置了THROTTLE_RATES,并没有对应的scope和scope的值, 所以该方法返回的就是scope所对应的值

def get_rate(self):"""Determine the string representation of the allowed request rate."""if not getattr(self, 'scope', None):msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %self.__class__.__name__)raise ImproperlyConfigured(msg)try:return self.THROTTLE_RATES[self.scope]except KeyError:msg = "No default throttle rate set for '%s' scope" % self.scoperaise ImproperlyConfigured(msg)

接着回到__init__(self)方法, self.rate就是配置文件中scope所对应的值, 接着执行了self.parse_rate(self.rate),继续找parse_rate, 找到的是SimpleRateThrottle里的parse_rate。 首先rate不是空的, 接着按照'/'进行分割并解压赋值,第一个值是一个数字, 第二个值是时间, 将第一个值换成整形,用第二个值的第一个字符在{‘s’: 1, ‘m’: 60, ‘h’: 3600, ‘d’: 86400}这个字典中取出对应的值, 返回两个值, 所以我们在配置文件中写的THROTTLE_RATES的格式应该是(数字/s, 数字/m..)等

def parse_rate(self, rate):"""Given the request rate string, return a two tuple of:, """if rate is None:return (None, None)num, period = rate.split('/')num_requests = int(num)duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]return (num_requests, duration)

类的实例化就结束了, check_throttles中循环self.get_throttles()就是循环频率类对象列表, hrottle就是频率类对象, 接着执行了throttle.allow_request(request,self),去找频率类的allow_request方法, 找到的是SimpleRateThrottle中的allow_request

    def check_throttles(self, request):"""Check if request should be throttled.Raises an appropriate exception if the request is throttled."""throttle_durations = []for throttle in self.get_throttles():if not throttle.allow_request(request, self):throttle_durations.append(throttle.wait())----------------------------------------------------------
'''
首先调用了get_cache_key,SimpleRateThrottle类中有get_cache_key方法,但是
什么都没干,直接抛了NotImplementedError异常,所以我们必须在自定义频率类中
重写get_cache_key方法,这样执行时会执行重写后的。这个方法返回什么就会以什么作为频率限制的依据。接着根据get_cache_key返回的值去缓存中取值,取不到就为空列表,并获取了当前
时间,判断取到值了并且列表的最后一个值小于等于当前事件减去配置的时间,就将
该时间移除,这样列表中就只剩下配置时间之内的。接着判断时间列表长度如果大于等于配置的次数,就返回False,否则就将当前时间插入到列表的第一个位置,并返回True。当频率校验不通过,就会执行throttle_durations.append(throttle.wait())抛出异常给前端显示。
'''
def allow_request(self, request, view):"""Implement the check to see if the request should be throttled.On success calls `throttle_success`.On failure calls `throttle_failure`."""if self.rate is None:return Trueself.key = self.get_cache_key(request, view)if self.key is None:return Trueself.history = self.cache.get(self.key, [])self.now = self.timer()# Drop any requests from the history which have now passed the# throttle durationwhile self.history and self.history[-1] <= self.now - self.duration:self.history.pop()if len(self.history) >= self.num_requests:return self.throttle_failure()return self.throttle_success()def get_cache_key(self, request, view):"""Should return a unique cache-key which can be used for throttling.Must be overridden.May return `None` if the request should not be throttled."""raise NotImplementedError('.get_cache_key() must be overridden')def throttle_failure(self):"""Called when a request to the API has failed due to throttling."""return Falsedef throttle_success(self):"""Inserts the current request's timestamp along with the keyinto the cache."""self.history.insert(0, self.now)self.cache.set(self.key, self.history, self.duration)return Truedef wait(self):"""Returns the recommended next request time in seconds."""if self.history:remaining_duration = self.duration - (self.now - self.history[-1])else:remaining_duration = self.durationavailable_requests = self.num_requests - len(self.history) + 1if available_requests <= 0:return Nonereturn remaining_duration / float(available_requests)

    
源码解析
1.dispatch()
 
2.执行inital方法,initial方法中执行check_throttles则开始频率控制
 
3.下面是check_throttles源码,与认证、权限一样采用列表对象方式,通过判断allow_request方法返回值判断频率是否通过
 
4.get_throttles方法,采用列表生成式生成频率控制对象,与认证、权限一直
 
5.self.throttle_classes属性获取
 
6.通过以上分析,知道了频率控制是通过判断每个类中的allow_request放法的返回值来判断频率是否通过,下面我们来看看我们所使用的SimpleRateThrottle怎么实现的,分析部分请看注解
 
原文链接:https://juejin.cn/post/7027424194186985508


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部