daren/apps/common/services/permission_service.py
2025-05-29 17:21:16 +08:00

338 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# # apps/common/services/permission_service.py
# import logging
# from django.db import transaction
# from django.db.models import Q
# from django.utils import timezone
# from rest_framework.exceptions import ValidationError
# from apps.user.models import User
# from apps.knowledge_base.models import KnowledgeBase
# from apps.permissions.models import Permission, KnowledgeBasePermission as KBPermissionModel
# logger = logging.getLogger(__name__)
# class PermissionService:
# def can_manage_knowledge_base(self, user, knowledge_base):
# """检查用户是否是知识库的创建者"""
# return str(knowledge_base.user_id) == str(user.id)
# def check_extend_permission(self, permission, user):
# """检查是否有权限延长权限有效期"""
# knowledge_base = permission.knowledge_base
# if knowledge_base.type == 'private':
# return knowledge_base.user_id == user.id
# if knowledge_base.type == 'leader':
# return user.role == 'admin'
# if knowledge_base.type == 'member':
# return user.role == 'admin' or (
# user.role == 'leader' and user.department == knowledge_base.department
# )
# return False
# def create_permission_request(self, user, validated_data, notification_service):
# """创建权限申请并发送通知"""
# knowledge_base = validated_data['knowledge_base']
# if str(knowledge_base.user_id) == str(user.id):
# raise ValidationError({
# "code": 400,
# "message": "您是此知识库的创建者,无需申请权限",
# "data": None
# })
# approver = User.objects.get(id=knowledge_base.user_id)
# requested_permissions = validated_data.get('permissions', {})
# expires_at = validated_data.get('expires_at')
# if not any([requested_permissions.get('can_read'),
# requested_permissions.get('can_edit'),
# requested_permissions.get('can_delete')]):
# raise ValidationError("至少需要申请一种权限(读/改/删)")
# if not expires_at:
# raise ValidationError("请指定权限到期时间")
# existing_request = Permission.objects.filter(
# knowledge_base=knowledge_base,
# applicant=user,
# status='pending'
# ).first()
# if existing_request:
# raise ValidationError("您已有一个待处理的权限申请")
# existing_permission = Permission.objects.filter(
# knowledge_base=knowledge_base,
# applicant=user,
# status='approved',
# expires_at__gt=timezone.now()
# ).first()
# if existing_permission:
# raise ValidationError("您已有此知识库的访问权限")
# with transaction.atomic():
# permission = Permission.objects.create(
# knowledge_base=knowledge_base,
# applicant=user,
# approver=approver,
# permissions=requested_permissions,
# expires_at=expires_at,
# status='pending'
# )
# permission_types = []
# if requested_permissions.get('can_read'):
# permission_types.append('读取')
# if requested_permissions.get('can_edit'):
# permission_types.append('编辑')
# if requested_permissions.get('can_delete'):
# permission_types.append('删除')
# permission_str = '、'.join(permission_types)
# notification_service.send_notification(
# user=approver,
# title="新的权限申请",
# content=f"用户 {user.name} 申请了知识库 '{knowledge_base.name}' 的{permission_str}权限",
# notification_type="permission_request",
# related_object_id=str(permission.id)
# )
# return permission
# def approve_permission(self, user, permission, response_message, notification_service):
# """审批权限申请"""
# if not self.can_manage_knowledge_base(user, permission.knowledge_base):
# raise ValidationError({
# 'code': 403,
# 'message': '只有知识库创建者可以审批此申请',
# 'data': None
# })
# with transaction.atomic():
# permission.status = 'approved'
# permission.approver = user
# permission.response_message = response_message
# permission.save()
# kb_permission = KBPermissionModel.objects.filter(
# knowledge_base=permission.knowledge_base,
# user=permission.applicant
# ).first()
# if kb_permission:
# kb_permission.can_read = permission.permissions.get('can_read', False)
# kb_permission.can_edit = permission.permissions.get('can_edit', False)
# kb_permission.can_delete = permission.permissions.get('can_delete', False)
# kb_permission.granted_by = user
# kb_permission.status = 'active'
# kb_permission.expires_at = permission.expires_at
# kb_permission.save()
# logger.info(f"更新知识库权限记录: {kb_permission.id}")
# else:
# kb_permission = KBPermissionModel.objects.create(
# knowledge_base=permission.knowledge_base,
# user=permission.applicant,
# can_read=permission.permissions.get('can_read', False),
# can_edit=permission.permissions.get('can_edit', False),
# can_delete=permission.permissions.get('can_delete', False),
# granted_by=user,
# status='active',
# expires_at=permission.expires_at
# )
# logger.info(f"创建新的知识库权限记录: {kb_permission.id}")
# notification_service.send_notification(
# user=permission.applicant,
# title="权限申请已通过",
# content=f"您对知识库 '{permission.knowledge_base.name}' 的权限申请已通过",
# notification_type="permission_approved",
# related_object_id=str(permission.id)
# )
# return permission
# def reject_permission(self, user, permission, response_message, notification_service):
# """拒绝权限申请"""
# if not self.can_manage_knowledge_base(user, permission.knowledge_base):
# raise ValidationError({
# 'code': 403,
# 'message': '只有知识库创建者可以审批此申请',
# 'data': None
# })
# if permission.status != 'pending':
# raise ValidationError({
# 'code': 400,
# 'message': '该申请已被处理',
# 'data': None
# })
# if not response_message:
# raise ValidationError({
# 'code': 400,
# 'message': '请填写拒绝原因',
# 'data': None
# })
# with transaction.atomic():
# permission.status = 'rejected'
# permission.approver = user
# permission.response_message = response_message
# permission.save()
# notification_service.send_notification(
# user=permission.applicant,
# title="权限申请已拒绝",
# content=f"您对知识库 '{permission.knowledge_base.name}' 的权限申请已被拒绝\n拒绝原因{response_message}",
# notification_type="permission_rejected",
# related_object_id=str(permission.id)
# )
# return permission
# def extend_permission(self, user, permission, new_expires_at, notification_service):
# """延长权限有效期"""
# if not self.check_extend_permission(permission, user):
# raise ValidationError({
# "code": 403,
# "message": "您没有权限延长此权限",
# "data": None
# })
# if not new_expires_at:
# raise ValidationError({
# "code": 400,
# "message": "请设置新的过期时间",
# "data": None
# })
# try:
# new_expires_at = timezone.datetime.strptime(new_expires_at, '%Y-%m-%dT%H:%M:%SZ')
# new_expires_at = timezone.make_aware(new_expires_at)
# if new_expires_at <= timezone.now():
# raise ValidationError({
# "code": 400,
# "message": "过期时间不能早于或等于当前时间",
# "data": None
# })
# except ValueError:
# raise ValidationError({
# "code": 400,
# "message": "过期时间格式错误,应为 ISO 格式 (YYYY-MM-DDThh:mm:ssZ)",
# "data": None
# })
# with transaction.atomic():
# permission.expires_at = new_expires_at
# permission.save()
# kb_permission = KBPermissionModel.objects.get(
# knowledge_base=permission.knowledge_base,
# user=permission.applicant
# )
# kb_permission.expires_at = new_expires_at
# kb_permission.save()
# notification_service.send_notification(
# user=permission.applicant,
# title="权限有效期延长",
# content=f"您对知识库 '{permission.knowledge_base.name}' 的权限有效期已延长至 {new_expires_at.strftime('%Y-%m-%d %H:%M:%S')}",
# notification_type="permission_extended",
# related_object_id=str(permission.id)
# )
# return permission
# def update_user_permission(self, admin_user, user_id, knowledge_base_id, permissions, expires_at_str, notification_service):
# """管理员更新用户权限"""
# if admin_user.role != 'admin':
# raise ValidationError({
# 'code': 403,
# 'message': '只有管理员可以直接修改权限',
# 'data': None
# })
# if not all([user_id, knowledge_base_id, permissions]):
# raise ValidationError({
# 'code': 400,
# 'message': '缺少必要参数',
# 'data': None
# })
# required_permission_fields = ['can_read', 'can_edit', 'can_delete']
# if not all(field in permissions for field in required_permission_fields):
# raise ValidationError({
# 'code': 400,
# 'message': '权限参数格式错误,必须包含 can_read、can_edit、can_delete',
# 'data': None
# })
# try:
# user = User.objects.get(id=user_id)
# knowledge_base = KnowledgeBase.objects.get(id=knowledge_base_id)
# except User.DoesNotExist:
# raise ValidationError({
# 'code': 404,
# 'message': f'用户ID {user_id} 不存在',
# 'data': None
# })
# except KnowledgeBase.DoesNotExist:
# raise ValidationError({
# 'code': 404,
# 'message': f'知识库ID {knowledge_base_id} 不存在',
# 'data': None
# })
# if knowledge_base.type == 'private' and str(knowledge_base.user_id) != str(user.id):
# raise ValidationError({
# 'code': 403,
# 'message': '不能修改其他用户的私有知识库权限',
# 'data': None
# })
# if user.role == 'member' and permissions.get('can_delete'):
# raise ValidationError({
# 'code': 400,
# 'message': '普通成员不能获得删除权限',
# 'data': None
# })
# expires_at = None
# if expires_at_str:
# try:
# expires_at = timezone.datetime.strptime(expires_at_str, '%Y-%m-%dT%H:%M:%SZ')
# expires_at = timezone.make_aware(expires_at)
# if expires_at <= timezone.now():
# raise ValidationError({
# 'code': 400,
# 'message': '过期时间不能早于或等于当前时间',
# 'data': None
# })
# except ValueError:
# raise ValidationError({
# 'code': 400,
# 'message': '过期时间格式错误,应为 ISO 格式 (YYYY-MM-DDThh:mm:ssZ)',
# 'data': None
# })
# with transaction.atomic():
# permission, created = KBPermissionModel.objects.update_or_create(
# user=user,
# knowledge_base=knowledge_base,
# defaults={
# 'can_read': permissions.get('can_read', False),
# 'can_edit': permissions.get('can_edit', False),
# 'can_delete': permissions.get('can_delete', False),
# 'granted_by': admin_user,
# 'status': 'active',
# 'expires_at': expires_at
# }
# )
# notification_service.send_notification(
# user=user,
# title="知识库权限更新",
# content=f"管理员已{created and '授予' or '更新'}您对知识库 '{knowledge_base.name}' 的权限",
# notification_type="permission_updated",
# related_object_id=str(permission.id)
# )
# return permission, created