operations_project/apps/accounts/views.py

590 lines
20 KiB
Python
Raw Permalink Normal View History

2025-05-07 18:01:48 +08:00
# apps/accounts/views.py
from django.contrib.auth import authenticate, login, logout
from django.core.exceptions import ValidationError
from django.contrib.auth.hashers import check_password
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from rest_framework.views import APIView
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import AllowAny, IsAuthenticated, IsAdminUser
from rest_framework.authtoken.models import Token
from django.db.models import Q
from django.shortcuts import get_object_or_404
import uuid
import logging
import traceback
2025-05-20 15:57:10 +08:00
from apps.accounts.models import User
2025-05-07 18:01:48 +08:00
from apps.accounts.services.auth_service import (
authenticate_user, create_user, generate_token, delete_token
)
from apps.accounts.services.utils import (
2025-05-20 15:57:10 +08:00
format_user_response, validate_uuid_param
2025-05-07 18:01:48 +08:00
)
2025-05-20 15:57:10 +08:00
2025-05-07 18:01:48 +08:00
logger = logging.getLogger(__name__)
@method_decorator(csrf_exempt, name='dispatch')
class LoginView(APIView):
"""用户登录视图"""
authentication_classes = [] # 清空认证类
permission_classes = [AllowAny]
def post(self, request):
try:
username = request.data.get('username')
password = request.data.get('password')
# 参数验证
if not username or not password:
return Response({
"code": 400,
"message": "请提供用户名和密码",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证用户
user = authenticate_user(request, username, password)
if user is not None:
# 获取或创建token
token = generate_token(user)
# 登录用户(可选)
login(request, user)
return Response({
"code": 200,
"message": "登录成功",
"data": {
"id": str(user.id),
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"department": user.department,
"group": user.group,
"token": token
}
})
else:
return Response({
"code": 401,
"message": "用户名或密码错误",
"data": None
}, status=status.HTTP_401_UNAUTHORIZED)
except Exception as e:
logger.error(f"登录失败: {str(e)}")
logger.error(f"错误类型: {type(e)}")
logger.error(f"错误堆栈: {traceback.format_exc()}")
return Response({
"code": 500,
"message": "登录失败,请稍后重试",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@method_decorator(csrf_exempt, name='dispatch')
class RegisterView(APIView):
"""用户注册视图"""
permission_classes = [AllowAny]
def post(self, request):
try:
data = request.data
# 检查必填字段
required_fields = ['username', 'password', 'email', 'role', 'name']
for field in required_fields:
if not data.get(field):
return Response({
"code": 400,
"message": f"缺少必填字段: {field}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证角色
valid_roles = ['admin', 'leader', 'member']
roles_str = ', '.join(valid_roles) # 先构造角色字符串
if data['role'] not in valid_roles:
return Response({
"code": 400,
"message": f"无效的角色,必须是: {roles_str}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 创建用户
user = create_user(data)
if isinstance(user, dict): # 错误响应
return Response(user, status=user.get('status', status.HTTP_400_BAD_REQUEST))
# 生成认证令牌
token = generate_token(user)
return Response({
"code": 200,
"message": "注册成功",
"data": {
"id": str(user.id),
"username": user.username,
"email": user.email,
"role": user.role,
"department": user.department,
"name": user.name,
"group": user.group,
"token": token,
"created_at": user.date_joined.strftime('%Y-%m-%d %H:%M:%S')
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
logger.error(f"注册失败: {str(e)}")
logger.error(f"错误类型: {type(e)}")
logger.error(f"错误堆栈: {traceback.format_exc()}")
return Response({
"code": 500,
"message": f"注册失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@method_decorator(csrf_exempt, name='dispatch')
class LogoutView(APIView):
"""用户登出视图"""
permission_classes = [IsAuthenticated]
def post(self, request):
try:
# 删除用户的token
delete_token(request.user)
# 执行django的登出
logout(request)
return Response({
"code": 200,
"message": "登出成功",
"data": None
})
except Exception as e:
logger.error(f"登出失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"登出失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET', 'PUT'])
@permission_classes([IsAuthenticated])
def user_profile(request):
"""
获取或更新当前登录用户信息
"""
try:
if request.method == 'GET':
user = request.user
if not user.is_authenticated:
return Response({
'code': 401,
'message': '用户未认证',
'data': None
}, status=status.HTTP_401_UNAUTHORIZED)
data = {
'id': str(user.id),
'username': user.username,
'email': user.email,
'name': user.name,
'role': user.role,
'department': user.department,
'group': user.group,
'date_joined': user.date_joined.strftime('%Y-%m-%d %H:%M:%S')
}
return Response({
'code': 200,
'message': '获取用户信息成功',
'data': data
})
elif request.method == 'PUT':
try:
if not request.data:
return Response({
'code': 400,
'message': '请求数据为空或格式错误',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as data_error:
return Response({
'code': 400,
'message': f'请求数据格式错误: {str(data_error)}。请确保提交的是有效的JSON格式数据属性名必须使用双引号。',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
user = request.user
allowed_fields = ['email', 'name', 'department', 'group']
updated_fields = []
for field in allowed_fields:
if field in request.data:
2025-05-13 11:58:17 +08:00
# 检查name字段是否重名
if field == 'name' and request.data['name'] != user.name:
# 检查是否有其他用户使用相同name
if User.objects.filter(name=request.data['name']).exclude(id=user.id).exists():
return Response({
'code': 400,
'message': '用户名称已存在',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
2025-05-07 18:01:48 +08:00
setattr(user, field, request.data[field])
updated_fields.append(field)
if updated_fields:
try:
user.save()
return Response({
'code': 200,
'message': f'用户信息更新成功,已更新字段: {", ".join(updated_fields)}',
'data': {
'id': str(user.id),
'username': user.username,
'email': user.email,
'name': user.name,
'role': user.role,
'department': user.department,
'group': user.group,
}
})
except Exception as save_error:
logger.error(f"保存用户数据失败: {str(save_error)}")
return Response({
'code': 500,
'message': f'更新用户信息失败: {str(save_error)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
return Response({
'code': 400,
'message': '没有提供任何可更新的字段',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
else:
return Response({
'code': 405,
'message': f'不支持的请求方法: {request.method}',
'data': None
}, status=status.HTTP_405_METHOD_NOT_ALLOWED)
except Exception as e:
logger.error(f"处理用户信息请求失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'处理请求失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def change_password(request):
"""修改密码"""
try:
old_password = request.data.get('old_password')
new_password = request.data.get('new_password')
# 验证参数
if not old_password or not new_password:
return Response({
"code": 400,
"message": "请提供旧密码和新密码",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证旧密码
user = request.user
if not user.check_password(old_password):
return Response({
"code": 400,
"message": "旧密码错误",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证新密码长度
if len(new_password) < 8:
return Response({
"code": 400,
"message": "新密码长度必须至少为8位",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 修改密码
user.set_password(new_password)
user.save()
# 更新token
delete_token(user)
token = generate_token(user)
return Response({
"code": 200,
"message": "密码修改成功",
"data": {
"token": token
}
})
except Exception as e:
logger.error(f"密码修改失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"密码修改失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def user_detail(request, pk):
"""获取用户详情"""
try:
# 使用通用UUID验证工具函数
uuid_obj, error_response = validate_uuid_param(pk)
if error_response:
return error_response
user = get_object_or_404(User, pk=uuid_obj)
return Response({
"code": 200,
"message": "获取用户信息成功",
"data": format_user_response(user)
})
except Exception as e:
logger.error(f"获取用户信息失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"获取用户信息失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['PUT'])
@permission_classes([IsAuthenticated])
def user_update(request, pk):
"""
更新用户信息
- 管理员可以更新任何用户
- 普通用户只能更新自己的信息
"""
try:
try:
if not request.data:
return Response({
'code': 400,
'message': '请求数据为空或格式错误',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as data_error:
return Response({
'code': 400,
'message': f'请求数据格式错误: {str(data_error)}。请确保提交的是有效的JSON格式数据属性名必须使用双引号。',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 使用通用UUID验证工具函数
uuid_obj, error_response = validate_uuid_param(pk)
if error_response:
return error_response
try:
user = User.objects.get(pk=uuid_obj)
except User.DoesNotExist:
return Response({
'code': 404,
'message': '用户不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 验证权限:用户只能修改自己的信息,管理员可以修改任何人
is_admin = request.user.is_staff or request.user.role == 'admin'
is_self_update = str(request.user.id) == str(user.id)
if not (is_admin or is_self_update):
return Response({
'code': 403,
'message': '权限不足,您只能修改自己的信息',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 根据用户权限决定可以修改的字段
if is_admin:
allowed_fields = ['email', 'role', 'department', 'group', 'is_active', 'name']
else:
# 普通用户只能修改部分字段
allowed_fields = ['email', 'name']
updated_fields = []
for field in allowed_fields:
if field in request.data:
setattr(user, field, request.data[field])
updated_fields.append(field)
if not updated_fields:
return Response({
'code': 400,
'message': '没有提供任何可更新的字段',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
user.save()
return Response({
'code': 200,
'message': '用户信息更新成功',
'data': format_user_response(user, include_is_active=is_admin)
})
except Exception as e:
logger.error(f"更新用户信息失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'更新用户信息失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['DELETE'])
@permission_classes([IsAdminUser])
def user_delete(request, pk):
"""删除用户"""
try:
# 使用通用UUID验证工具函数
uuid_obj, error_response = validate_uuid_param(pk)
if error_response:
return error_response
try:
user = User.objects.get(pk=uuid_obj)
except User.DoesNotExist:
return Response({
'code': 404,
'message': '用户不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
if user.is_superuser or user.role == 'admin':
return Response({
'code': 403,
'message': '不允许删除管理员账户',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
username = user.username
user.delete()
return Response({
'code': 200,
'message': f'用户 {username} 删除成功',
'data': None
})
except Exception as e:
logger.error(f"删除用户失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'删除用户失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def verify_token(request):
"""验证令牌有效性"""
try:
return Response({
"code": 200,
"message": "令牌有效",
"data": {
"is_valid": True,
"user": format_user_response(request.user)
}
})
except Exception as e:
logger.error(f"验证令牌失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"验证失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def user_list(request):
"""获取用户列表"""
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 20))
keyword = request.query_params.get('keyword', '')
user = request.user
base_query = User.objects.all()
if user.role == 'admin':
users_query = base_query
elif user.role == 'leader':
users_query = base_query.filter(department=user.department)
else:
users_query = base_query.filter(id=user.id)
if keyword:
users_query = users_query.filter(
Q(username__icontains=keyword) |
Q(email__icontains=keyword) |
Q(name__icontains=keyword) |
Q(department__icontains=keyword)
)
total = users_query.count()
start = (page - 1) * page_size
end = start + page_size
users = users_query[start:end]
user_data = []
for u in users:
user_info = format_user_response(u, include_is_active=True)
user_info['date_joined'] = u.date_joined.strftime('%Y-%m-%d %H:%M:%S')
user_data.append(user_info)
return Response({
'code': 200,
'message': '获取用户列表成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'users': user_data
}
})
except ValueError as e:
return Response({
'code': 400,
'message': f'参数错误: {str(e)}',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"获取用户列表失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取用户列表失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
2025-05-13 18:36:06 +08:00