daren/apps/common/services/permission_service.py
2025-05-29 10:11:19 +08:00

338 lines
13 KiB
Python

# apps/common/services/permission_service.py
import logging
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
from rest_framework.exceptions import ValidationError
from apps.user.models import User
from apps.knowledge_base.models import KnowledgeBase
logger = logging.getLogger(__name__)
class PermissionService:
def can_manage_knowledge_base(self, user, knowledge_base):
"""检查用户是否是知识库的创建者"""
return str(knowledge_base.user_id) == str(user.id)
def check_extend_permission(self, permission, user):
"""检查是否有权限延长权限有效期"""
knowledge_base = permission.knowledge_base
if knowledge_base.type == 'private':
return knowledge_base.user_id == user.id
if knowledge_base.type == 'leader':
return user.role == 'admin'
if knowledge_base.type == 'member':
return user.role == 'admin' or (
user.role == 'leader' and user.department == knowledge_base.department
)
return False
def create_permission_request(self, user, validated_data, notification_service):
"""创建权限申请并发送通知"""
knowledge_base = validated_data['knowledge_base']
if str(knowledge_base.user_id) == str(user.id):
raise ValidationError({
"code": 400,
"message": "您是此知识库的创建者,无需申请权限",
"data": None
})
approver = User.objects.get(id=knowledge_base.user_id)
requested_permissions = validated_data.get('permissions', {})
expires_at = validated_data.get('expires_at')
if not any([requested_permissions.get('can_read'),
requested_permissions.get('can_edit'),
requested_permissions.get('can_delete')]):
raise ValidationError("至少需要申请一种权限(读/改/删)")
if not expires_at:
raise ValidationError("请指定权限到期时间")
existing_request = Permission.objects.filter(
knowledge_base=knowledge_base,
applicant=user,
status='pending'
).first()
if existing_request:
raise ValidationError("您已有一个待处理的权限申请")
existing_permission = Permission.objects.filter(
knowledge_base=knowledge_base,
applicant=user,
status='approved',
expires_at__gt=timezone.now()
).first()
if existing_permission:
raise ValidationError("您已有此知识库的访问权限")
with transaction.atomic():
permission = Permission.objects.create(
knowledge_base=knowledge_base,
applicant=user,
approver=approver,
permissions=requested_permissions,
expires_at=expires_at,
status='pending'
)
permission_types = []
if requested_permissions.get('can_read'):
permission_types.append('读取')
if requested_permissions.get('can_edit'):
permission_types.append('编辑')
if requested_permissions.get('can_delete'):
permission_types.append('删除')
permission_str = ''.join(permission_types)
notification_service.send_notification(
user=approver,
title="新的权限申请",
content=f"用户 {user.name} 申请了知识库 '{knowledge_base.name}'{permission_str}权限",
notification_type="permission_request",
related_object_id=str(permission.id)
)
return permission
def approve_permission(self, user, permission, response_message, notification_service):
"""审批权限申请"""
if not self.can_manage_knowledge_base(user, permission.knowledge_base):
raise ValidationError({
'code': 403,
'message': '只有知识库创建者可以审批此申请',
'data': None
})
with transaction.atomic():
permission.status = 'approved'
permission.approver = user
permission.response_message = response_message
permission.save()
kb_permission = KBPermissionModel.objects.filter(
knowledge_base=permission.knowledge_base,
user=permission.applicant
).first()
if kb_permission:
kb_permission.can_read = permission.permissions.get('can_read', False)
kb_permission.can_edit = permission.permissions.get('can_edit', False)
kb_permission.can_delete = permission.permissions.get('can_delete', False)
kb_permission.granted_by = user
kb_permission.status = 'active'
kb_permission.expires_at = permission.expires_at
kb_permission.save()
logger.info(f"更新知识库权限记录: {kb_permission.id}")
else:
kb_permission = KBPermissionModel.objects.create(
knowledge_base=permission.knowledge_base,
user=permission.applicant,
can_read=permission.permissions.get('can_read', False),
can_edit=permission.permissions.get('can_edit', False),
can_delete=permission.permissions.get('can_delete', False),
granted_by=user,
status='active',
expires_at=permission.expires_at
)
logger.info(f"创建新的知识库权限记录: {kb_permission.id}")
notification_service.send_notification(
user=permission.applicant,
title="权限申请已通过",
content=f"您对知识库 '{permission.knowledge_base.name}' 的权限申请已通过",
notification_type="permission_approved",
related_object_id=str(permission.id)
)
return permission
def reject_permission(self, user, permission, response_message, notification_service):
"""拒绝权限申请"""
if not self.can_manage_knowledge_base(user, permission.knowledge_base):
raise ValidationError({
'code': 403,
'message': '只有知识库创建者可以审批此申请',
'data': None
})
if permission.status != 'pending':
raise ValidationError({
'code': 400,
'message': '该申请已被处理',
'data': None
})
if not response_message:
raise ValidationError({
'code': 400,
'message': '请填写拒绝原因',
'data': None
})
with transaction.atomic():
permission.status = 'rejected'
permission.approver = user
permission.response_message = response_message
permission.save()
notification_service.send_notification(
user=permission.applicant,
title="权限申请已拒绝",
content=f"您对知识库 '{permission.knowledge_base.name}' 的权限申请已被拒绝\n拒绝原因:{response_message}",
notification_type="permission_rejected",
related_object_id=str(permission.id)
)
return permission
def extend_permission(self, user, permission, new_expires_at, notification_service):
"""延长权限有效期"""
if not self.check_extend_permission(permission, user):
raise ValidationError({
"code": 403,
"message": "您没有权限延长此权限",
"data": None
})
if not new_expires_at:
raise ValidationError({
"code": 400,
"message": "请设置新的过期时间",
"data": None
})
try:
new_expires_at = timezone.datetime.strptime(new_expires_at, '%Y-%m-%dT%H:%M:%SZ')
new_expires_at = timezone.make_aware(new_expires_at)
if new_expires_at <= timezone.now():
raise ValidationError({
"code": 400,
"message": "过期时间不能早于或等于当前时间",
"data": None
})
except ValueError:
raise ValidationError({
"code": 400,
"message": "过期时间格式错误,应为 ISO 格式 (YYYY-MM-DDThh:mm:ssZ)",
"data": None
})
with transaction.atomic():
permission.expires_at = new_expires_at
permission.save()
kb_permission = KBPermissionModel.objects.get(
knowledge_base=permission.knowledge_base,
user=permission.applicant
)
kb_permission.expires_at = new_expires_at
kb_permission.save()
notification_service.send_notification(
user=permission.applicant,
title="权限有效期延长",
content=f"您对知识库 '{permission.knowledge_base.name}' 的权限有效期已延长至 {new_expires_at.strftime('%Y-%m-%d %H:%M:%S')}",
notification_type="permission_extended",
related_object_id=str(permission.id)
)
return permission
def update_user_permission(self, admin_user, user_id, knowledge_base_id, permissions, expires_at_str, notification_service):
"""管理员更新用户权限"""
if admin_user.role != 'admin':
raise ValidationError({
'code': 403,
'message': '只有管理员可以直接修改权限',
'data': None
})
if not all([user_id, knowledge_base_id, permissions]):
raise ValidationError({
'code': 400,
'message': '缺少必要参数',
'data': None
})
required_permission_fields = ['can_read', 'can_edit', 'can_delete']
if not all(field in permissions for field in required_permission_fields):
raise ValidationError({
'code': 400,
'message': '权限参数格式错误,必须包含 can_read、can_edit、can_delete',
'data': None
})
try:
user = User.objects.get(id=user_id)
knowledge_base = KnowledgeBase.objects.get(id=knowledge_base_id)
except User.DoesNotExist:
raise ValidationError({
'code': 404,
'message': f'用户ID {user_id} 不存在',
'data': None
})
except KnowledgeBase.DoesNotExist:
raise ValidationError({
'code': 404,
'message': f'知识库ID {knowledge_base_id} 不存在',
'data': None
})
if knowledge_base.type == 'private' and str(knowledge_base.user_id) != str(user.id):
raise ValidationError({
'code': 403,
'message': '不能修改其他用户的私有知识库权限',
'data': None
})
if user.role == 'member' and permissions.get('can_delete'):
raise ValidationError({
'code': 400,
'message': '普通成员不能获得删除权限',
'data': None
})
expires_at = None
if expires_at_str:
try:
expires_at = timezone.datetime.strptime(expires_at_str, '%Y-%m-%dT%H:%M:%SZ')
expires_at = timezone.make_aware(expires_at)
if expires_at <= timezone.now():
raise ValidationError({
'code': 400,
'message': '过期时间不能早于或等于当前时间',
'data': None
})
except ValueError:
raise ValidationError({
'code': 400,
'message': '过期时间格式错误,应为 ISO 格式 (YYYY-MM-DDThh:mm:ssZ)',
'data': None
})
with transaction.atomic():
permission, created = KBPermissionModel.objects.update_or_create(
user=user,
knowledge_base=knowledge_base,
defaults={
'can_read': permissions.get('can_read', False),
'can_edit': permissions.get('can_edit', False),
'can_delete': permissions.get('can_delete', False),
'granted_by': admin_user,
'status': 'active',
'expires_at': expires_at
}
)
notification_service.send_notification(
user=user,
title="知识库权限更新",
content=f"管理员已{created and '授予' or '更新'}您对知识库 '{knowledge_base.name}' 的权限",
notification_type="permission_updated",
related_object_id=str(permission.id)
)
return permission, created