# apps/accounts/views.py from django.contrib.auth import authenticate, login, logout from django.core.exceptions import ValidationError from django.contrib.auth.hashers import check_password from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from rest_framework.views import APIView from rest_framework.decorators import api_view, permission_classes from rest_framework.response import Response from rest_framework import status from rest_framework.permissions import AllowAny, IsAuthenticated, IsAdminUser from rest_framework.authtoken.models import Token from django.db.models import Q from django.shortcuts import get_object_or_404 import uuid import logging import traceback from apps.accounts.models import User from apps.accounts.services.auth_service import ( authenticate_user, create_user, generate_token, delete_token ) from apps.accounts.services.utils import ( format_user_response, validate_uuid_param ) logger = logging.getLogger(__name__) @method_decorator(csrf_exempt, name='dispatch') class LoginView(APIView): """用户登录视图""" authentication_classes = [] # 清空认证类 permission_classes = [AllowAny] def post(self, request): try: username = request.data.get('username') password = request.data.get('password') # 参数验证 if not username or not password: return Response({ "code": 400, "message": "请提供用户名和密码", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 验证用户 user = authenticate_user(request, username, password) if user is not None: # 获取或创建token token = generate_token(user) # 登录用户(可选) login(request, user) return Response({ "code": 200, "message": "登录成功", "data": { "id": str(user.id), "username": user.username, "email": user.email, "name": user.name, "role": user.role, "department": user.department, "group": user.group, "token": token } }) else: return Response({ "code": 401, "message": "用户名或密码错误", "data": None }, status=status.HTTP_401_UNAUTHORIZED) except Exception as e: logger.error(f"登录失败: {str(e)}") logger.error(f"错误类型: {type(e)}") logger.error(f"错误堆栈: {traceback.format_exc()}") return Response({ "code": 500, "message": "登录失败,请稍后重试", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @method_decorator(csrf_exempt, name='dispatch') class RegisterView(APIView): """用户注册视图""" permission_classes = [AllowAny] def post(self, request): try: data = request.data # 检查必填字段 required_fields = ['username', 'password', 'email', 'role', 'name'] for field in required_fields: if not data.get(field): return Response({ "code": 400, "message": f"缺少必填字段: {field}", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 验证角色 valid_roles = ['admin', 'leader', 'member'] roles_str = ', '.join(valid_roles) # 先构造角色字符串 if data['role'] not in valid_roles: return Response({ "code": 400, "message": f"无效的角色,必须是: {roles_str}", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 创建用户 user = create_user(data) if isinstance(user, dict): # 错误响应 return Response(user, status=user.get('status', status.HTTP_400_BAD_REQUEST)) # 生成认证令牌 token = generate_token(user) return Response({ "code": 200, "message": "注册成功", "data": { "id": str(user.id), "username": user.username, "email": user.email, "role": user.role, "department": user.department, "name": user.name, "group": user.group, "token": token, "created_at": user.date_joined.strftime('%Y-%m-%d %H:%M:%S') } }, status=status.HTTP_201_CREATED) except Exception as e: logger.error(f"注册失败: {str(e)}") logger.error(f"错误类型: {type(e)}") logger.error(f"错误堆栈: {traceback.format_exc()}") return Response({ "code": 500, "message": f"注册失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @method_decorator(csrf_exempt, name='dispatch') class LogoutView(APIView): """用户登出视图""" permission_classes = [IsAuthenticated] def post(self, request): try: # 删除用户的token delete_token(request.user) # 执行django的登出 logout(request) return Response({ "code": 200, "message": "登出成功", "data": None }) except Exception as e: logger.error(f"登出失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"登出失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET', 'PUT']) @permission_classes([IsAuthenticated]) def user_profile(request): """ 获取或更新当前登录用户信息 """ try: if request.method == 'GET': user = request.user if not user.is_authenticated: return Response({ 'code': 401, 'message': '用户未认证', 'data': None }, status=status.HTTP_401_UNAUTHORIZED) data = { 'id': str(user.id), 'username': user.username, 'email': user.email, 'name': user.name, 'role': user.role, 'department': user.department, 'group': user.group, 'date_joined': user.date_joined.strftime('%Y-%m-%d %H:%M:%S') } return Response({ 'code': 200, 'message': '获取用户信息成功', 'data': data }) elif request.method == 'PUT': try: if not request.data: return Response({ 'code': 400, 'message': '请求数据为空或格式错误', 'data': None }, status=status.HTTP_400_BAD_REQUEST) except Exception as data_error: return Response({ 'code': 400, 'message': f'请求数据格式错误: {str(data_error)}。请确保提交的是有效的JSON格式数据,属性名必须使用双引号。', 'data': None }, status=status.HTTP_400_BAD_REQUEST) user = request.user allowed_fields = ['email', 'name', 'department', 'group'] updated_fields = [] for field in allowed_fields: if field in request.data: # 检查name字段是否重名 if field == 'name' and request.data['name'] != user.name: # 检查是否有其他用户使用相同name if User.objects.filter(name=request.data['name']).exclude(id=user.id).exists(): return Response({ 'code': 400, 'message': '用户名称已存在', 'data': None }, status=status.HTTP_400_BAD_REQUEST) setattr(user, field, request.data[field]) updated_fields.append(field) if updated_fields: try: user.save() return Response({ 'code': 200, 'message': f'用户信息更新成功,已更新字段: {", ".join(updated_fields)}', 'data': { 'id': str(user.id), 'username': user.username, 'email': user.email, 'name': user.name, 'role': user.role, 'department': user.department, 'group': user.group, } }) except Exception as save_error: logger.error(f"保存用户数据失败: {str(save_error)}") return Response({ 'code': 500, 'message': f'更新用户信息失败: {str(save_error)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) else: return Response({ 'code': 400, 'message': '没有提供任何可更新的字段', 'data': None }, status=status.HTTP_400_BAD_REQUEST) else: return Response({ 'code': 405, 'message': f'不支持的请求方法: {request.method}', 'data': None }, status=status.HTTP_405_METHOD_NOT_ALLOWED) except Exception as e: logger.error(f"处理用户信息请求失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'处理请求失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @csrf_exempt @api_view(['POST']) @permission_classes([IsAuthenticated]) def change_password(request): """修改密码""" try: old_password = request.data.get('old_password') new_password = request.data.get('new_password') # 验证参数 if not old_password or not new_password: return Response({ "code": 400, "message": "请提供旧密码和新密码", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 验证旧密码 user = request.user if not user.check_password(old_password): return Response({ "code": 400, "message": "旧密码错误", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 验证新密码长度 if len(new_password) < 8: return Response({ "code": 400, "message": "新密码长度必须至少为8位", "data": None }, status=status.HTTP_400_BAD_REQUEST) # 修改密码 user.set_password(new_password) user.save() # 更新token delete_token(user) token = generate_token(user) return Response({ "code": 200, "message": "密码修改成功", "data": { "token": token } }) except Exception as e: logger.error(f"密码修改失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"密码修改失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET']) @permission_classes([IsAuthenticated]) def user_detail(request, pk): """获取用户详情""" try: # 使用通用UUID验证工具函数 uuid_obj, error_response = validate_uuid_param(pk) if error_response: return error_response user = get_object_or_404(User, pk=uuid_obj) return Response({ "code": 200, "message": "获取用户信息成功", "data": format_user_response(user) }) except Exception as e: logger.error(f"获取用户信息失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"获取用户信息失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['PUT']) @permission_classes([IsAuthenticated]) def user_update(request, pk): """ 更新用户信息 - 管理员可以更新任何用户 - 普通用户只能更新自己的信息 """ try: try: if not request.data: return Response({ 'code': 400, 'message': '请求数据为空或格式错误', 'data': None }, status=status.HTTP_400_BAD_REQUEST) except Exception as data_error: return Response({ 'code': 400, 'message': f'请求数据格式错误: {str(data_error)}。请确保提交的是有效的JSON格式数据,属性名必须使用双引号。', 'data': None }, status=status.HTTP_400_BAD_REQUEST) # 使用通用UUID验证工具函数 uuid_obj, error_response = validate_uuid_param(pk) if error_response: return error_response try: user = User.objects.get(pk=uuid_obj) except User.DoesNotExist: return Response({ 'code': 404, 'message': '用户不存在', 'data': None }, status=status.HTTP_404_NOT_FOUND) # 验证权限:用户只能修改自己的信息,管理员可以修改任何人 is_admin = request.user.is_staff or request.user.role == 'admin' is_self_update = str(request.user.id) == str(user.id) if not (is_admin or is_self_update): return Response({ 'code': 403, 'message': '权限不足,您只能修改自己的信息', 'data': None }, status=status.HTTP_403_FORBIDDEN) # 根据用户权限决定可以修改的字段 if is_admin: allowed_fields = ['email', 'role', 'department', 'group', 'is_active', 'name'] else: # 普通用户只能修改部分字段 allowed_fields = ['email', 'name'] updated_fields = [] for field in allowed_fields: if field in request.data: setattr(user, field, request.data[field]) updated_fields.append(field) if not updated_fields: return Response({ 'code': 400, 'message': '没有提供任何可更新的字段', 'data': None }, status=status.HTTP_400_BAD_REQUEST) user.save() return Response({ 'code': 200, 'message': '用户信息更新成功', 'data': format_user_response(user, include_is_active=is_admin) }) except Exception as e: logger.error(f"更新用户信息失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'更新用户信息失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['DELETE']) @permission_classes([IsAdminUser]) def user_delete(request, pk): """删除用户""" try: # 使用通用UUID验证工具函数 uuid_obj, error_response = validate_uuid_param(pk) if error_response: return error_response try: user = User.objects.get(pk=uuid_obj) except User.DoesNotExist: return Response({ 'code': 404, 'message': '用户不存在', 'data': None }, status=status.HTTP_404_NOT_FOUND) if user.is_superuser or user.role == 'admin': return Response({ 'code': 403, 'message': '不允许删除管理员账户', 'data': None }, status=status.HTTP_403_FORBIDDEN) username = user.username user.delete() return Response({ 'code': 200, 'message': f'用户 {username} 删除成功', 'data': None }) except Exception as e: logger.error(f"删除用户失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'删除用户失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @csrf_exempt @api_view(['POST']) @permission_classes([IsAuthenticated]) def verify_token(request): """验证令牌有效性""" try: return Response({ "code": 200, "message": "令牌有效", "data": { "is_valid": True, "user": format_user_response(request.user) } }) except Exception as e: logger.error(f"验证令牌失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ "code": 500, "message": f"验证失败: {str(e)}", "data": None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET']) @permission_classes([IsAuthenticated]) def user_list(request): """获取用户列表""" try: page = int(request.query_params.get('page', 1)) page_size = int(request.query_params.get('page_size', 20)) keyword = request.query_params.get('keyword', '') user = request.user base_query = User.objects.all() if user.role == 'admin': users_query = base_query elif user.role == 'leader': users_query = base_query.filter(department=user.department) else: users_query = base_query.filter(id=user.id) if keyword: users_query = users_query.filter( Q(username__icontains=keyword) | Q(email__icontains=keyword) | Q(name__icontains=keyword) | Q(department__icontains=keyword) ) total = users_query.count() start = (page - 1) * page_size end = start + page_size users = users_query[start:end] user_data = [] for u in users: user_info = format_user_response(u, include_is_active=True) user_info['date_joined'] = u.date_joined.strftime('%Y-%m-%d %H:%M:%S') user_data.append(user_info) return Response({ 'code': 200, 'message': '获取用户列表成功', 'data': { 'total': total, 'page': page, 'page_size': page_size, 'users': user_data } }) except ValueError as e: return Response({ 'code': 400, 'message': f'参数错误: {str(e)}', 'data': None }, status=status.HTTP_400_BAD_REQUEST) except Exception as e: logger.error(f"获取用户列表失败: {str(e)}") logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'获取用户列表失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)