# apps/permissions/views.py import logging from django.db.models import Q from django.utils import timezone from rest_framework import viewsets, status from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.decorators import action from apps.accounts.models import User from apps.knowledge_base.models import KnowledgeBase from apps.permissions.models import Permission, KnowledgeBasePermission as KBPermissionModel from apps.permissions.serializers import PermissionSerializer from apps.common.services.permission_service import PermissionService from apps.common.services.notification_service import NotificationService logger = logging.getLogger(__name__) class PermissionViewSet(viewsets.ModelViewSet): serializer_class = PermissionSerializer permission_classes = [IsAuthenticated] def get_queryset(self): """获取权限申请列表:申请人或审批人是当前用户""" user_id = str(self.request.user.id) query = Q(applicant_id=user_id) | Q(approver_id=user_id) return Permission.objects.filter(query).select_related( 'knowledge_base', 'applicant', 'approver' ) def list(self, request, *args, **kwargs): """获取权限申请列表,包含详细信息""" try: queryset = self.get_queryset() user_id = str(request.user.id) page = int(request.query_params.get('page', 1)) page_size = int(request.query_params.get('page_size', 10)) total = queryset.count() start = (page - 1) * page_size end = start + page_size permissions = queryset[start:end] data = [] for permission in permissions: if user_id not in [str(permission.applicant_id), str(permission.approver_id)]: continue permission_data = { 'id': str(permission.id), 'knowledge_base': { 'id': str(permission.knowledge_base.id), 'name': permission.knowledge_base.name, 'type': permission.knowledge_base.type, }, 'applicant': { 'id': str(permission.applicant.id), 'username': permission.applicant.username, 'name': permission.applicant.name, 'department': permission.applicant.department, }, 'approver': { 'id': str(permission.approver.id) if permission.approver else '', 'username': permission.approver.username if permission.approver else '', 'name': permission.applicant.name if permission.approver else '', 'department': permission.approver.department if permission.approver else '', }, 'permissions': permission.permissions, 'status': permission.status, 'created_at': permission.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'expires_at': permission.expires_at.strftime('%Y-%m-%d %H:%M:%S') if permission.expires_at else None, 'response_message': permission.response_message or '', 'role': 'applicant' if str(permission.applicant_id) == user_id else 'approver' } data.append(permission_data) return Response({ 'code': 200, 'message': '获取权限申请列表成功', 'data': { 'total': len(data), 'page': page, 'page_size': page_size, 'results': data } }) except Exception as e: logger.error(f"获取权限申请列表失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'获取权限申请列表失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def perform_create(self, serializer): """创建权限申请""" try: permission_service = PermissionService() notification_service = NotificationService() permission = permission_service.create_permission_request( self.request.user, serializer.validated_data, notification_service ) serializer.instance = permission except Exception as e: logger.error(f"创建权限申请失败: {str(e)}") raise @action(detail=True, methods=['post']) def approve(self, request, pk=None): """批准权限申请""" try: permission = self.get_object() permission_service = PermissionService() notification_service = NotificationService() response_message = request.data.get('response_message', '') permission = permission_service.approve_permission( request.user, permission, response_message, notification_service ) return Response({ 'code': 200, 'message': '权限申请已批准', 'data': None }) except Permission.DoesNotExist: return Response({ 'code': 404, 'message': '权限申请不存在', 'data': None }, status=status.HTTP_404_NOT_FOUND) except Exception as e: logger.error(f"处理权限申请失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'处理权限申请失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=True, methods=['post']) def reject(self, request, pk=None): """拒绝权限申请""" try: permission = self.get_object() permission_service = PermissionService() notification_service = NotificationService() response_message = request.data.get('response_message') permission = permission_service.reject_permission( request.user, permission, response_message, notification_service ) return Response({ 'code': 200, 'message': '权限申请已拒绝', 'data': PermissionSerializer(permission).data }) except Permission.DoesNotExist: return Response({ 'code': 404, 'message': '权限申请不存在', 'data': None }, status=status.HTTP_404_NOT_FOUND) except Exception as e: logger.error(f"拒绝权限申请失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'拒绝权限申请失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=True, methods=['post']) def extend(self, request, pk=None): """延长权限有效期""" try: permission = self.get_object() permission_service = PermissionService() notification_service = NotificationService() new_expires_at = request.data.get('expires_at') permission = permission_service.extend_permission( request.user, permission, new_expires_at, notification_service ) return Response({ 'code': 200, 'message': '权限有效期延长成功', 'data': PermissionSerializer(permission).data }) except Permission.DoesNotExist: return Response({ 'code': 404, 'message': '权限申请不存在', 'data': None }, status=status.HTTP_404_NOT_FOUND) except Exception as e: logger.error(f"延长权限有效期失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'延长权限有效期失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['get']) def user_permissions(self, request): """获取指定用户的所有知识库权限""" try: username = request.query_params.get('username') if not username: return Response({ 'code': 400, 'message': '请提供用户名', 'data': None }, status=status.HTTP_400_BAD_REQUEST) try: target_user = User.objects.get(username=username) except User.DoesNotExist: return Response({ 'code': 404, 'message': f'用户 {username} 不存在', 'data': None }, status=status.HTTP_404_NOT_FOUND) permissions = KBPermissionModel.objects.filter( user=target_user, status='active' ).select_related('knowledge_base', 'granted_by') permissions_data = [ { 'id': str(perm.id), 'knowledge_base': { 'id': str(perm.knowledge_base.id), 'name': perm.knowledge_base.name, 'type': perm.knowledge_base.type, 'department': perm.knowledge_base.department, 'group': perm.knowledge_base.group }, 'permissions': { 'can_read': perm.can_read, 'can_edit': perm.can_edit, 'can_delete': perm.can_delete }, 'granted_by': { 'id': str(perm.granted_by.id) if perm.granted_by else None, 'username': perm.granted_by.username if perm.granted_by else None, 'name': perm.granted_by.name if perm.granted_by else None }, 'created_at': perm.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'expires_at': perm.expires_at.strftime('%Y-%m-%d %H:%M:%S') if perm.expires_at else None, 'status': perm.status } for perm in permissions ] return Response({ 'code': 200, 'message': '获取用户权限成功', 'data': { 'user': { 'id': str(target_user.id), 'username': target_user.username, 'name': target_user.name, 'department': target_user.department, 'role': target_user.role }, 'permissions': permissions_data } }) except Exception as e: logger.error(f"获取用户权限失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'获取用户权限失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['get']) def all_permissions(self, request): """管理员获取所有用户的知识库权限(不包括私有知识库)""" try: if request.user.role != 'admin': return Response({ 'code': 403, 'message': '只有管理员可以查看所有权限', 'data': None }, status=status.HTTP_403_FORBIDDEN) page = int(request.query_params.get('page', 1)) page_size = int(request.query_params.get('page_size', 10)) status_filter = request.query_params.get('status') department = request.query_params.get('department') kb_type = request.query_params.get('kb_type') queryset = KBPermissionModel.objects.filter( ~Q(knowledge_base__type='private') ).select_related('user', 'knowledge_base', 'granted_by') if status_filter == 'active': queryset = queryset.filter( Q(expires_at__gt=timezone.now()) | Q(expires_at__isnull=True), status='active' ) elif status_filter == 'expired': queryset = queryset.filter( Q(expires_at__lte=timezone.now()) | Q(status='inactive') ) if department: queryset = queryset.filter(user__department=department) if kb_type: queryset = queryset.filter(knowledge_base__type=kb_type) user_permissions = {} for perm in queryset: user_id = str(perm.user.id) if user_id not in user_permissions: user_permissions[user_id] = { 'user_info': { 'id': user_id, 'username': perm.user.username, 'name': getattr(perm.user, 'name', perm.user.username), 'department': getattr(perm.user, 'department', None), 'role': getattr(perm.user, 'role', None) }, 'permissions': [], 'stats': { 'total': 0, 'by_type': { 'admin': 0, 'secret': 0, 'leader': 0, 'member': 0 }, 'by_permission': { 'read_only': 0, 'read_write': 0, 'full_access': 0 } } } perm_data = { 'id': str(perm.id), 'knowledge_base': { 'id': str(perm.knowledge_base.id), 'name': perm.knowledge_base.name, 'type': perm.knowledge_base.type, 'department': perm.knowledge_base.department, 'group': perm.knowledge_base.group, 'creator': { 'id': str(perm.knowledge_base.user_id), 'name': getattr(User.objects.filter(id=perm.knowledge_base.user_id).first(), 'name', None), 'username': getattr(User.objects.filter(id=perm.knowledge_base.user_id).first(), 'username', None) } }, 'permissions': { 'can_read': perm.can_read, 'can_edit': perm.can_edit, 'can_delete': perm.can_delete }, 'granted_by': { 'id': str(perm.granted_by.id) if perm.granted_by else None, 'username': perm.granted_by.username if perm.granted_by else None, 'name': getattr(perm.granted_by, 'name', None) if perm.granted_by else None }, 'granted_at': perm.granted_at.strftime('%Y-%m-%d %H:%M:%S'), 'expires_at': perm.expires_at.strftime('%Y-%m-%d %H:%M:%S') if perm.expires_at else None, 'status': perm.status } user_permissions[user_id]['permissions'].append(perm_data) stats = user_permissions[user_id]['stats'] stats['total'] += 1 stats['by_type'][perm.knowledge_base.type] += 1 if perm.can_delete: stats['by_permission']['full_access'] += 1 elif perm.can_edit: stats['by_permission']['read_write'] += 1 elif perm.can_read: stats['by_permission']['read_only'] += 1 users_list = list(user_permissions.values()) total = len(users_list) start = (page - 1) * page_size end = start + page_size paginated_users = users_list[start:end] return Response({ 'code': 200, 'message': '获取权限列表成功', 'data': { 'total': total, 'page': page, 'page_size': page_size, 'results': paginated_users } }) except Exception as e: logger.error(f"获取所有权限失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'获取所有权限失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @action(detail=False, methods=['post']) def update_permission(self, request): """管理员更新用户的知识库权限""" try: permission_service = PermissionService() notification_service = NotificationService() permission, created = permission_service.update_user_permission( request.user, request.data.get('user_id'), request.data.get('knowledge_base_id'), request.data.get('permissions'), request.data.get('expires_at'), notification_service ) return Response({ 'code': 200, 'message': f"{'创建' if created else '更新'}权限成功", 'data': { 'id': str(permission.id), 'user': { 'id': str(permission.user.id), 'username': permission.user.username, 'name': permission.user.name, 'department': permission.user.department, 'role': permission.user.role }, 'knowledge_base': { 'id': str(permission.knowledge_base.id), 'name': permission.knowledge_base.name, 'type': permission.knowledge_base.type, 'department': permission.knowledge_base.department, 'group': permission.knowledge_base.group }, 'permissions': { 'can_read': permission.can_read, 'can_edit': permission.can_edit, 'can_delete': permission.can_delete }, 'granted_by': { 'id': str(request.user.id), 'username': request.user.username, 'name': request.user.name }, 'expires_at': permission.expires_at.strftime('%Y-%m-%d %H:%M:%S') if permission.expires_at else None, 'created': created } }) except Exception as e: logger.error(f"更新权限失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return Response({ 'code': 500, 'message': f'更新权限失败: {str(e)}', 'data': None }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)