# # apps/common/services/permission_service.py # import logging # from django.db import transaction # from django.db.models import Q # from django.utils import timezone # from rest_framework.exceptions import ValidationError # from apps.user.models import User # from apps.knowledge_base.models import KnowledgeBase # from apps.permissions.models import Permission, KnowledgeBasePermission as KBPermissionModel # logger = logging.getLogger(__name__) # class PermissionService: # def can_manage_knowledge_base(self, user, knowledge_base): # """检查用户是否是知识库的创建者""" # return str(knowledge_base.user_id) == str(user.id) # def check_extend_permission(self, permission, user): # """检查是否有权限延长权限有效期""" # knowledge_base = permission.knowledge_base # if knowledge_base.type == 'private': # return knowledge_base.user_id == user.id # if knowledge_base.type == 'leader': # return user.role == 'admin' # if knowledge_base.type == 'member': # return user.role == 'admin' or ( # user.role == 'leader' and user.department == knowledge_base.department # ) # return False # def create_permission_request(self, user, validated_data, notification_service): # """创建权限申请并发送通知""" # knowledge_base = validated_data['knowledge_base'] # if str(knowledge_base.user_id) == str(user.id): # raise ValidationError({ # "code": 400, # "message": "您是此知识库的创建者,无需申请权限", # "data": None # }) # approver = User.objects.get(id=knowledge_base.user_id) # requested_permissions = validated_data.get('permissions', {}) # expires_at = validated_data.get('expires_at') # if not any([requested_permissions.get('can_read'), # requested_permissions.get('can_edit'), # requested_permissions.get('can_delete')]): # raise ValidationError("至少需要申请一种权限(读/改/删)") # if not expires_at: # raise ValidationError("请指定权限到期时间") # existing_request = Permission.objects.filter( # knowledge_base=knowledge_base, # applicant=user, # status='pending' # ).first() # if existing_request: # raise ValidationError("您已有一个待处理的权限申请") # existing_permission = Permission.objects.filter( # knowledge_base=knowledge_base, # applicant=user, # status='approved', # expires_at__gt=timezone.now() # ).first() # if existing_permission: # raise ValidationError("您已有此知识库的访问权限") # with transaction.atomic(): # permission = Permission.objects.create( # knowledge_base=knowledge_base, # applicant=user, # approver=approver, # permissions=requested_permissions, # expires_at=expires_at, # status='pending' # ) # permission_types = [] # if requested_permissions.get('can_read'): # permission_types.append('读取') # if requested_permissions.get('can_edit'): # permission_types.append('编辑') # if requested_permissions.get('can_delete'): # permission_types.append('删除') # permission_str = '、'.join(permission_types) # notification_service.send_notification( # user=approver, # title="新的权限申请", # content=f"用户 {user.name} 申请了知识库 '{knowledge_base.name}' 的{permission_str}权限", # notification_type="permission_request", # related_object_id=str(permission.id) # ) # return permission # def approve_permission(self, user, permission, response_message, notification_service): # """审批权限申请""" # if not self.can_manage_knowledge_base(user, permission.knowledge_base): # raise ValidationError({ # 'code': 403, # 'message': '只有知识库创建者可以审批此申请', # 'data': None # }) # with transaction.atomic(): # permission.status = 'approved' # permission.approver = user # permission.response_message = response_message # permission.save() # kb_permission = KBPermissionModel.objects.filter( # knowledge_base=permission.knowledge_base, # user=permission.applicant # ).first() # if kb_permission: # kb_permission.can_read = permission.permissions.get('can_read', False) # kb_permission.can_edit = permission.permissions.get('can_edit', False) # kb_permission.can_delete = permission.permissions.get('can_delete', False) # kb_permission.granted_by = user # kb_permission.status = 'active' # kb_permission.expires_at = permission.expires_at # kb_permission.save() # logger.info(f"更新知识库权限记录: {kb_permission.id}") # else: # kb_permission = KBPermissionModel.objects.create( # knowledge_base=permission.knowledge_base, # user=permission.applicant, # can_read=permission.permissions.get('can_read', False), # can_edit=permission.permissions.get('can_edit', False), # can_delete=permission.permissions.get('can_delete', False), # granted_by=user, # status='active', # expires_at=permission.expires_at # ) # logger.info(f"创建新的知识库权限记录: {kb_permission.id}") # notification_service.send_notification( # user=permission.applicant, # title="权限申请已通过", # content=f"您对知识库 '{permission.knowledge_base.name}' 的权限申请已通过", # notification_type="permission_approved", # related_object_id=str(permission.id) # ) # return permission # def reject_permission(self, user, permission, response_message, notification_service): # """拒绝权限申请""" # if not self.can_manage_knowledge_base(user, permission.knowledge_base): # raise ValidationError({ # 'code': 403, # 'message': '只有知识库创建者可以审批此申请', # 'data': None # }) # if permission.status != 'pending': # raise ValidationError({ # 'code': 400, # 'message': '该申请已被处理', # 'data': None # }) # if not response_message: # raise ValidationError({ # 'code': 400, # 'message': '请填写拒绝原因', # 'data': None # }) # with transaction.atomic(): # permission.status = 'rejected' # permission.approver = user # permission.response_message = response_message # permission.save() # notification_service.send_notification( # user=permission.applicant, # title="权限申请已拒绝", # content=f"您对知识库 '{permission.knowledge_base.name}' 的权限申请已被拒绝\n拒绝原因:{response_message}", # notification_type="permission_rejected", # related_object_id=str(permission.id) # ) # return permission # def extend_permission(self, user, permission, new_expires_at, notification_service): # """延长权限有效期""" # if not self.check_extend_permission(permission, user): # raise ValidationError({ # "code": 403, # "message": "您没有权限延长此权限", # "data": None # }) # if not new_expires_at: # raise ValidationError({ # "code": 400, # "message": "请设置新的过期时间", # "data": None # }) # try: # new_expires_at = timezone.datetime.strptime(new_expires_at, '%Y-%m-%dT%H:%M:%SZ') # new_expires_at = timezone.make_aware(new_expires_at) # if new_expires_at <= timezone.now(): # raise ValidationError({ # "code": 400, # "message": "过期时间不能早于或等于当前时间", # "data": None # }) # except ValueError: # raise ValidationError({ # "code": 400, # "message": "过期时间格式错误,应为 ISO 格式 (YYYY-MM-DDThh:mm:ssZ)", # "data": None # }) # with transaction.atomic(): # permission.expires_at = new_expires_at # permission.save() # kb_permission = KBPermissionModel.objects.get( # knowledge_base=permission.knowledge_base, # user=permission.applicant # ) # kb_permission.expires_at = new_expires_at # kb_permission.save() # notification_service.send_notification( # user=permission.applicant, # title="权限有效期延长", # content=f"您对知识库 '{permission.knowledge_base.name}' 的权限有效期已延长至 {new_expires_at.strftime('%Y-%m-%d %H:%M:%S')}", # notification_type="permission_extended", # related_object_id=str(permission.id) # ) # return permission # def update_user_permission(self, admin_user, user_id, knowledge_base_id, permissions, expires_at_str, notification_service): # """管理员更新用户权限""" # if admin_user.role != 'admin': # raise ValidationError({ # 'code': 403, # 'message': '只有管理员可以直接修改权限', # 'data': None # }) # if not all([user_id, knowledge_base_id, permissions]): # raise ValidationError({ # 'code': 400, # 'message': '缺少必要参数', # 'data': None # }) # required_permission_fields = ['can_read', 'can_edit', 'can_delete'] # if not all(field in permissions for field in required_permission_fields): # raise ValidationError({ # 'code': 400, # 'message': '权限参数格式错误,必须包含 can_read、can_edit、can_delete', # 'data': None # }) # try: # user = User.objects.get(id=user_id) # knowledge_base = KnowledgeBase.objects.get(id=knowledge_base_id) # except User.DoesNotExist: # raise ValidationError({ # 'code': 404, # 'message': f'用户ID {user_id} 不存在', # 'data': None # }) # except KnowledgeBase.DoesNotExist: # raise ValidationError({ # 'code': 404, # 'message': f'知识库ID {knowledge_base_id} 不存在', # 'data': None # }) # if knowledge_base.type == 'private' and str(knowledge_base.user_id) != str(user.id): # raise ValidationError({ # 'code': 403, # 'message': '不能修改其他用户的私有知识库权限', # 'data': None # }) # if user.role == 'member' and permissions.get('can_delete'): # raise ValidationError({ # 'code': 400, # 'message': '普通成员不能获得删除权限', # 'data': None # }) # expires_at = None # if expires_at_str: # try: # expires_at = timezone.datetime.strptime(expires_at_str, '%Y-%m-%dT%H:%M:%SZ') # expires_at = timezone.make_aware(expires_at) # if expires_at <= timezone.now(): # raise ValidationError({ # 'code': 400, # 'message': '过期时间不能早于或等于当前时间', # 'data': None # }) # except ValueError: # raise ValidationError({ # 'code': 400, # 'message': '过期时间格式错误,应为 ISO 格式 (YYYY-MM-DDThh:mm:ssZ)', # 'data': None # }) # with transaction.atomic(): # permission, created = KBPermissionModel.objects.update_or_create( # user=user, # knowledge_base=knowledge_base, # defaults={ # 'can_read': permissions.get('can_read', False), # 'can_edit': permissions.get('can_edit', False), # 'can_delete': permissions.get('can_delete', False), # 'granted_by': admin_user, # 'status': 'active', # 'expires_at': expires_at # } # ) # notification_service.send_notification( # user=user, # title="知识库权限更新", # content=f"管理员已{created and '授予' or '更新'}您对知识库 '{knowledge_base.name}' 的权限", # notification_type="permission_updated", # related_object_id=str(permission.id) # ) # return permission, created