operations_project/apps/accounts/views.py
2025-05-13 11:58:17 +08:00

588 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# apps/accounts/views.py
from django.contrib.auth import authenticate, login, logout
from django.core.exceptions import ValidationError
from django.contrib.auth.hashers import check_password
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from rest_framework.views import APIView
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import AllowAny, IsAuthenticated, IsAdminUser
from rest_framework.authtoken.models import Token
from django.db.models import Q
from django.shortcuts import get_object_or_404
import uuid
import logging
import traceback
from apps.accounts.models import User
from apps.accounts.services.auth_service import (
authenticate_user, create_user, generate_token, delete_token
)
from apps.accounts.services.utils import (
convert_to_uuid, format_user_response, validate_uuid_param
)
logger = logging.getLogger(__name__)
@method_decorator(csrf_exempt, name='dispatch')
class LoginView(APIView):
"""用户登录视图"""
authentication_classes = [] # 清空认证类
permission_classes = [AllowAny]
def post(self, request):
try:
username = request.data.get('username')
password = request.data.get('password')
# 参数验证
if not username or not password:
return Response({
"code": 400,
"message": "请提供用户名和密码",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证用户
user = authenticate_user(request, username, password)
if user is not None:
# 获取或创建token
token = generate_token(user)
# 登录用户(可选)
login(request, user)
return Response({
"code": 200,
"message": "登录成功",
"data": {
"id": str(user.id),
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"department": user.department,
"group": user.group,
"token": token
}
})
else:
return Response({
"code": 401,
"message": "用户名或密码错误",
"data": None
}, status=status.HTTP_401_UNAUTHORIZED)
except Exception as e:
logger.error(f"登录失败: {str(e)}")
logger.error(f"错误类型: {type(e)}")
logger.error(f"错误堆栈: {traceback.format_exc()}")
return Response({
"code": 500,
"message": "登录失败,请稍后重试",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@method_decorator(csrf_exempt, name='dispatch')
class RegisterView(APIView):
"""用户注册视图"""
permission_classes = [AllowAny]
def post(self, request):
try:
data = request.data
# 检查必填字段
required_fields = ['username', 'password', 'email', 'role', 'name']
for field in required_fields:
if not data.get(field):
return Response({
"code": 400,
"message": f"缺少必填字段: {field}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证角色
valid_roles = ['admin', 'leader', 'member']
roles_str = ', '.join(valid_roles) # 先构造角色字符串
if data['role'] not in valid_roles:
return Response({
"code": 400,
"message": f"无效的角色,必须是: {roles_str}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 创建用户
user = create_user(data)
if isinstance(user, dict): # 错误响应
return Response(user, status=user.get('status', status.HTTP_400_BAD_REQUEST))
# 生成认证令牌
token = generate_token(user)
return Response({
"code": 200,
"message": "注册成功",
"data": {
"id": str(user.id),
"username": user.username,
"email": user.email,
"role": user.role,
"department": user.department,
"name": user.name,
"group": user.group,
"token": token,
"created_at": user.date_joined.strftime('%Y-%m-%d %H:%M:%S')
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
logger.error(f"注册失败: {str(e)}")
logger.error(f"错误类型: {type(e)}")
logger.error(f"错误堆栈: {traceback.format_exc()}")
return Response({
"code": 500,
"message": f"注册失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@method_decorator(csrf_exempt, name='dispatch')
class LogoutView(APIView):
"""用户登出视图"""
permission_classes = [IsAuthenticated]
def post(self, request):
try:
# 删除用户的token
delete_token(request.user)
# 执行django的登出
logout(request)
return Response({
"code": 200,
"message": "登出成功",
"data": None
})
except Exception as e:
logger.error(f"登出失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"登出失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET', 'PUT'])
@permission_classes([IsAuthenticated])
def user_profile(request):
"""
获取或更新当前登录用户信息
"""
try:
if request.method == 'GET':
user = request.user
if not user.is_authenticated:
return Response({
'code': 401,
'message': '用户未认证',
'data': None
}, status=status.HTTP_401_UNAUTHORIZED)
data = {
'id': str(user.id),
'username': user.username,
'email': user.email,
'name': user.name,
'role': user.role,
'department': user.department,
'group': user.group,
'date_joined': user.date_joined.strftime('%Y-%m-%d %H:%M:%S')
}
return Response({
'code': 200,
'message': '获取用户信息成功',
'data': data
})
elif request.method == 'PUT':
try:
if not request.data:
return Response({
'code': 400,
'message': '请求数据为空或格式错误',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as data_error:
return Response({
'code': 400,
'message': f'请求数据格式错误: {str(data_error)}。请确保提交的是有效的JSON格式数据属性名必须使用双引号。',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
user = request.user
allowed_fields = ['email', 'name', 'department', 'group']
updated_fields = []
for field in allowed_fields:
if field in request.data:
# 检查name字段是否重名
if field == 'name' and request.data['name'] != user.name:
# 检查是否有其他用户使用相同name
if User.objects.filter(name=request.data['name']).exclude(id=user.id).exists():
return Response({
'code': 400,
'message': '用户名称已存在',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
setattr(user, field, request.data[field])
updated_fields.append(field)
if updated_fields:
try:
user.save()
return Response({
'code': 200,
'message': f'用户信息更新成功,已更新字段: {", ".join(updated_fields)}',
'data': {
'id': str(user.id),
'username': user.username,
'email': user.email,
'name': user.name,
'role': user.role,
'department': user.department,
'group': user.group,
}
})
except Exception as save_error:
logger.error(f"保存用户数据失败: {str(save_error)}")
return Response({
'code': 500,
'message': f'更新用户信息失败: {str(save_error)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
return Response({
'code': 400,
'message': '没有提供任何可更新的字段',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
else:
return Response({
'code': 405,
'message': f'不支持的请求方法: {request.method}',
'data': None
}, status=status.HTTP_405_METHOD_NOT_ALLOWED)
except Exception as e:
logger.error(f"处理用户信息请求失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'处理请求失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def change_password(request):
"""修改密码"""
try:
old_password = request.data.get('old_password')
new_password = request.data.get('new_password')
# 验证参数
if not old_password or not new_password:
return Response({
"code": 400,
"message": "请提供旧密码和新密码",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证旧密码
user = request.user
if not user.check_password(old_password):
return Response({
"code": 400,
"message": "旧密码错误",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证新密码长度
if len(new_password) < 8:
return Response({
"code": 400,
"message": "新密码长度必须至少为8位",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 修改密码
user.set_password(new_password)
user.save()
# 更新token
delete_token(user)
token = generate_token(user)
return Response({
"code": 200,
"message": "密码修改成功",
"data": {
"token": token
}
})
except Exception as e:
logger.error(f"密码修改失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"密码修改失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def user_detail(request, pk):
"""获取用户详情"""
try:
# 使用通用UUID验证工具函数
uuid_obj, error_response = validate_uuid_param(pk)
if error_response:
return error_response
user = get_object_or_404(User, pk=uuid_obj)
return Response({
"code": 200,
"message": "获取用户信息成功",
"data": format_user_response(user)
})
except Exception as e:
logger.error(f"获取用户信息失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"获取用户信息失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['PUT'])
@permission_classes([IsAuthenticated])
def user_update(request, pk):
"""
更新用户信息
- 管理员可以更新任何用户
- 普通用户只能更新自己的信息
"""
try:
try:
if not request.data:
return Response({
'code': 400,
'message': '请求数据为空或格式错误',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as data_error:
return Response({
'code': 400,
'message': f'请求数据格式错误: {str(data_error)}。请确保提交的是有效的JSON格式数据属性名必须使用双引号。',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 使用通用UUID验证工具函数
uuid_obj, error_response = validate_uuid_param(pk)
if error_response:
return error_response
try:
user = User.objects.get(pk=uuid_obj)
except User.DoesNotExist:
return Response({
'code': 404,
'message': '用户不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 验证权限:用户只能修改自己的信息,管理员可以修改任何人
is_admin = request.user.is_staff or request.user.role == 'admin'
is_self_update = str(request.user.id) == str(user.id)
if not (is_admin or is_self_update):
return Response({
'code': 403,
'message': '权限不足,您只能修改自己的信息',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 根据用户权限决定可以修改的字段
if is_admin:
allowed_fields = ['email', 'role', 'department', 'group', 'is_active', 'name']
else:
# 普通用户只能修改部分字段
allowed_fields = ['email', 'name']
updated_fields = []
for field in allowed_fields:
if field in request.data:
setattr(user, field, request.data[field])
updated_fields.append(field)
if not updated_fields:
return Response({
'code': 400,
'message': '没有提供任何可更新的字段',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
user.save()
return Response({
'code': 200,
'message': '用户信息更新成功',
'data': format_user_response(user, include_is_active=is_admin)
})
except Exception as e:
logger.error(f"更新用户信息失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'更新用户信息失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['DELETE'])
@permission_classes([IsAdminUser])
def user_delete(request, pk):
"""删除用户"""
try:
# 使用通用UUID验证工具函数
uuid_obj, error_response = validate_uuid_param(pk)
if error_response:
return error_response
try:
user = User.objects.get(pk=uuid_obj)
except User.DoesNotExist:
return Response({
'code': 404,
'message': '用户不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
if user.is_superuser or user.role == 'admin':
return Response({
'code': 403,
'message': '不允许删除管理员账户',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
username = user.username
user.delete()
return Response({
'code': 200,
'message': f'用户 {username} 删除成功',
'data': None
})
except Exception as e:
logger.error(f"删除用户失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'删除用户失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def verify_token(request):
"""验证令牌有效性"""
try:
return Response({
"code": 200,
"message": "令牌有效",
"data": {
"is_valid": True,
"user": format_user_response(request.user)
}
})
except Exception as e:
logger.error(f"验证令牌失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"验证失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def user_list(request):
"""获取用户列表"""
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 20))
keyword = request.query_params.get('keyword', '')
user = request.user
base_query = User.objects.all()
if user.role == 'admin':
users_query = base_query
elif user.role == 'leader':
users_query = base_query.filter(department=user.department)
else:
users_query = base_query.filter(id=user.id)
if keyword:
users_query = users_query.filter(
Q(username__icontains=keyword) |
Q(email__icontains=keyword) |
Q(name__icontains=keyword) |
Q(department__icontains=keyword)
)
total = users_query.count()
start = (page - 1) * page_size
end = start + page_size
users = users_query[start:end]
user_data = []
for u in users:
user_info = format_user_response(u, include_is_active=True)
user_info['date_joined'] = u.date_joined.strftime('%Y-%m-%d %H:%M:%S')
user_data.append(user_info)
return Response({
'code': 200,
'message': '获取用户列表成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'users': user_data
}
})
except ValueError as e:
return Response({
'code': 400,
'message': f'参数错误: {str(e)}',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"获取用户列表失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取用户列表失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)