from rest_framework import authentication from rest_framework import exceptions from django.contrib.auth.models import AnonymousUser from .models import User, UserToken from django.utils import timezone class CustomTokenAuthentication(authentication.BaseAuthentication): keyword = 'Token' # 设置认证头关键字 def authenticate(self, request): # 从请求头获取token auth_header = request.META.get('HTTP_AUTHORIZATION') if not auth_header: return None try: # 提取token parts = auth_header.split() if len(parts) != 2 or parts[0] != self.keyword: raise exceptions.AuthenticationFailed('无效的认证头格式,应为: Token ') token = parts[1] # 查找token记录并确保token存在且有效 try: token_obj = UserToken.objects.select_related('user').get( token=token, expired_at__gt=timezone.now() # 确保token未过期 ) except UserToken.DoesNotExist: raise exceptions.AuthenticationFailed('无效的token') # 检查用户是否激活 if not token_obj.user.is_active: raise exceptions.AuthenticationFailed('用户已被禁用') return (token_obj.user, None) except Exception as e: raise exceptions.AuthenticationFailed(f'认证失败: {str(e)}')