operations_project/apps/permissions/views.py

481 lines
20 KiB
Python
Raw Permalink Normal View History

2025-05-07 22:24:02 +08:00
# apps/permissions/views.py
import logging
from django.db.models import Q
from django.utils import timezone
from rest_framework import viewsets, status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.decorators import action
from apps.accounts.models import User
from apps.knowledge_base.models import KnowledgeBase
from apps.permissions.models import Permission, KnowledgeBasePermission as KBPermissionModel
from apps.permissions.serializers import PermissionSerializer
from apps.common.services.permission_service import PermissionService
from apps.common.services.notification_service import NotificationService
2025-05-07 18:01:48 +08:00
2025-05-07 22:24:02 +08:00
logger = logging.getLogger(__name__)
class PermissionViewSet(viewsets.ModelViewSet):
serializer_class = PermissionSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""获取权限申请列表:申请人或审批人是当前用户"""
user_id = str(self.request.user.id)
query = Q(applicant_id=user_id) | Q(approver_id=user_id)
return Permission.objects.filter(query).select_related(
'knowledge_base', 'applicant', 'approver'
)
def list(self, request, *args, **kwargs):
"""获取权限申请列表,包含详细信息"""
try:
queryset = self.get_queryset()
user_id = str(request.user.id)
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
total = queryset.count()
start = (page - 1) * page_size
end = start + page_size
permissions = queryset[start:end]
data = []
for permission in permissions:
if user_id not in [str(permission.applicant_id), str(permission.approver_id)]:
continue
permission_data = {
'id': str(permission.id),
'knowledge_base': {
'id': str(permission.knowledge_base.id),
'name': permission.knowledge_base.name,
'type': permission.knowledge_base.type,
},
'applicant': {
'id': str(permission.applicant.id),
'username': permission.applicant.username,
'name': permission.applicant.name,
'department': permission.applicant.department,
},
'approver': {
'id': str(permission.approver.id) if permission.approver else '',
'username': permission.approver.username if permission.approver else '',
'name': permission.applicant.name if permission.approver else '',
'department': permission.approver.department if permission.approver else '',
},
'permissions': permission.permissions,
'status': permission.status,
'created_at': permission.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'expires_at': permission.expires_at.strftime('%Y-%m-%d %H:%M:%S') if permission.expires_at else None,
'response_message': permission.response_message or '',
'role': 'applicant' if str(permission.applicant_id) == user_id else 'approver'
}
data.append(permission_data)
return Response({
'code': 200,
'message': '获取权限申请列表成功',
'data': {
'total': len(data),
'page': page,
'page_size': page_size,
'results': data
}
})
except Exception as e:
logger.error(f"获取权限申请列表失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取权限申请列表失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def perform_create(self, serializer):
"""创建权限申请"""
try:
permission_service = PermissionService()
notification_service = NotificationService()
permission = permission_service.create_permission_request(
self.request.user, serializer.validated_data, notification_service
)
serializer.instance = permission
except Exception as e:
logger.error(f"创建权限申请失败: {str(e)}")
raise
@action(detail=True, methods=['post'])
def approve(self, request, pk=None):
"""批准权限申请"""
try:
permission = self.get_object()
permission_service = PermissionService()
notification_service = NotificationService()
response_message = request.data.get('response_message', '')
permission = permission_service.approve_permission(
request.user, permission, response_message, notification_service
)
return Response({
'code': 200,
'message': '权限申请已批准',
'data': None
})
except Permission.DoesNotExist:
return Response({
'code': 404,
'message': '权限申请不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"处理权限申请失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'处理权限申请失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['post'])
def reject(self, request, pk=None):
"""拒绝权限申请"""
try:
permission = self.get_object()
permission_service = PermissionService()
notification_service = NotificationService()
response_message = request.data.get('response_message')
permission = permission_service.reject_permission(
request.user, permission, response_message, notification_service
)
return Response({
'code': 200,
'message': '权限申请已拒绝',
'data': PermissionSerializer(permission).data
})
except Permission.DoesNotExist:
return Response({
'code': 404,
'message': '权限申请不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"拒绝权限申请失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'拒绝权限申请失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['post'])
def extend(self, request, pk=None):
"""延长权限有效期"""
try:
permission = self.get_object()
permission_service = PermissionService()
notification_service = NotificationService()
new_expires_at = request.data.get('expires_at')
permission = permission_service.extend_permission(
request.user, permission, new_expires_at, notification_service
)
return Response({
'code': 200,
'message': '权限有效期延长成功',
'data': PermissionSerializer(permission).data
})
except Permission.DoesNotExist:
return Response({
'code': 404,
'message': '权限申请不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"延长权限有效期失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'延长权限有效期失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def user_permissions(self, request):
"""获取指定用户的所有知识库权限"""
try:
username = request.query_params.get('username')
if not username:
return Response({
'code': 400,
'message': '请提供用户名',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
try:
target_user = User.objects.get(username=username)
except User.DoesNotExist:
return Response({
'code': 404,
'message': f'用户 {username} 不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
permissions = KBPermissionModel.objects.filter(
user=target_user,
status='active'
).select_related('knowledge_base', 'granted_by')
permissions_data = [
{
'id': str(perm.id),
'knowledge_base': {
'id': str(perm.knowledge_base.id),
'name': perm.knowledge_base.name,
'type': perm.knowledge_base.type,
'department': perm.knowledge_base.department,
'group': perm.knowledge_base.group
},
'permissions': {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
},
'granted_by': {
'id': str(perm.granted_by.id) if perm.granted_by else None,
'username': perm.granted_by.username if perm.granted_by else None,
'name': perm.granted_by.name if perm.granted_by else None
},
'created_at': perm.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'expires_at': perm.expires_at.strftime('%Y-%m-%d %H:%M:%S') if perm.expires_at else None,
'status': perm.status
}
for perm in permissions
]
return Response({
'code': 200,
'message': '获取用户权限成功',
'data': {
'user': {
'id': str(target_user.id),
'username': target_user.username,
'name': target_user.name,
'department': target_user.department,
'role': target_user.role
},
'permissions': permissions_data
}
})
except Exception as e:
logger.error(f"获取用户权限失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取用户权限失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def all_permissions(self, request):
"""管理员获取所有用户的知识库权限(不包括私有知识库)"""
try:
if request.user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以查看所有权限',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
status_filter = request.query_params.get('status')
department = request.query_params.get('department')
kb_type = request.query_params.get('kb_type')
queryset = KBPermissionModel.objects.filter(
~Q(knowledge_base__type='private')
).select_related('user', 'knowledge_base', 'granted_by')
if status_filter == 'active':
queryset = queryset.filter(
Q(expires_at__gt=timezone.now()) | Q(expires_at__isnull=True),
status='active'
)
elif status_filter == 'expired':
queryset = queryset.filter(
Q(expires_at__lte=timezone.now()) | Q(status='inactive')
)
if department:
queryset = queryset.filter(user__department=department)
if kb_type:
queryset = queryset.filter(knowledge_base__type=kb_type)
user_permissions = {}
for perm in queryset:
user_id = str(perm.user.id)
if user_id not in user_permissions:
user_permissions[user_id] = {
'user_info': {
'id': user_id,
'username': perm.user.username,
'name': getattr(perm.user, 'name', perm.user.username),
'department': getattr(perm.user, 'department', None),
'role': getattr(perm.user, 'role', None)
},
'permissions': [],
'stats': {
'total': 0,
'by_type': {
'admin': 0,
'secret': 0,
'leader': 0,
'member': 0
},
'by_permission': {
'read_only': 0,
'read_write': 0,
'full_access': 0
}
}
}
perm_data = {
'id': str(perm.id),
'knowledge_base': {
'id': str(perm.knowledge_base.id),
'name': perm.knowledge_base.name,
'type': perm.knowledge_base.type,
'department': perm.knowledge_base.department,
'group': perm.knowledge_base.group,
'creator': {
'id': str(perm.knowledge_base.user_id),
'name': getattr(User.objects.filter(id=perm.knowledge_base.user_id).first(), 'name', None),
'username': getattr(User.objects.filter(id=perm.knowledge_base.user_id).first(), 'username', None)
}
},
'permissions': {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
},
'granted_by': {
'id': str(perm.granted_by.id) if perm.granted_by else None,
'username': perm.granted_by.username if perm.granted_by else None,
'name': getattr(perm.granted_by, 'name', None) if perm.granted_by else None
},
'granted_at': perm.granted_at.strftime('%Y-%m-%d %H:%M:%S'),
'expires_at': perm.expires_at.strftime('%Y-%m-%d %H:%M:%S') if perm.expires_at else None,
'status': perm.status
}
user_permissions[user_id]['permissions'].append(perm_data)
stats = user_permissions[user_id]['stats']
stats['total'] += 1
stats['by_type'][perm.knowledge_base.type] += 1
if perm.can_delete:
stats['by_permission']['full_access'] += 1
elif perm.can_edit:
stats['by_permission']['read_write'] += 1
elif perm.can_read:
stats['by_permission']['read_only'] += 1
users_list = list(user_permissions.values())
total = len(users_list)
start = (page - 1) * page_size
end = start + page_size
paginated_users = users_list[start:end]
return Response({
'code': 200,
'message': '获取权限列表成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'results': paginated_users
}
})
except Exception as e:
logger.error(f"获取所有权限失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取所有权限失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['post'])
def update_permission(self, request):
"""管理员更新用户的知识库权限"""
try:
permission_service = PermissionService()
notification_service = NotificationService()
permission, created = permission_service.update_user_permission(
request.user,
request.data.get('user_id'),
request.data.get('knowledge_base_id'),
request.data.get('permissions'),
request.data.get('expires_at'),
notification_service
)
return Response({
'code': 200,
'message': f"{'创建' if created else '更新'}权限成功",
'data': {
'id': str(permission.id),
'user': {
'id': str(permission.user.id),
'username': permission.user.username,
'name': permission.user.name,
'department': permission.user.department,
'role': permission.user.role
},
'knowledge_base': {
'id': str(permission.knowledge_base.id),
'name': permission.knowledge_base.name,
'type': permission.knowledge_base.type,
'department': permission.knowledge_base.department,
'group': permission.knowledge_base.group
},
'permissions': {
'can_read': permission.can_read,
'can_edit': permission.can_edit,
'can_delete': permission.can_delete
},
'granted_by': {
'id': str(request.user.id),
'username': request.user.username,
'name': request.user.name
},
'expires_at': permission.expires_at.strftime('%Y-%m-%d %H:%M:%S') if permission.expires_at else None,
'created': created
}
})
except Exception as e:
logger.error(f"更新权限失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'更新权限失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)