42 lines
1.5 KiB
Python
42 lines
1.5 KiB
Python
from rest_framework import authentication
|
|
from rest_framework import exceptions
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from .models import User, UserToken
|
|
from django.utils import timezone
|
|
|
|
class CustomTokenAuthentication(authentication.BaseAuthentication):
|
|
keyword = 'Token' # 设置认证头关键字
|
|
|
|
def authenticate(self, request):
|
|
# 从请求头获取token
|
|
auth_header = request.META.get('HTTP_AUTHORIZATION')
|
|
|
|
if not auth_header:
|
|
return None
|
|
|
|
try:
|
|
# 提取token
|
|
parts = auth_header.split()
|
|
|
|
if len(parts) != 2 or parts[0] != self.keyword:
|
|
raise exceptions.AuthenticationFailed('无效的认证头格式,应为: Token <token>')
|
|
|
|
token = parts[1]
|
|
|
|
# 查找token记录并确保token存在且有效
|
|
try:
|
|
token_obj = UserToken.objects.select_related('user').get(
|
|
token=token,
|
|
expired_at__gt=timezone.now() # 确保token未过期
|
|
)
|
|
except UserToken.DoesNotExist:
|
|
raise exceptions.AuthenticationFailed('无效的token')
|
|
|
|
# 检查用户是否激活
|
|
if not token_obj.user.is_active:
|
|
raise exceptions.AuthenticationFailed('用户已被禁用')
|
|
|
|
return (token_obj.user, None)
|
|
|
|
except Exception as e:
|
|
raise exceptions.AuthenticationFailed(f'认证失败: {str(e)}') |