优化权限判断

This commit is contained in:
wanjia 2025-03-10 17:03:58 +08:00
parent 07a33dbf54
commit af5aed7c0f
3 changed files with 558 additions and 38 deletions

59
deploy.ps1 Normal file
View File

@ -0,0 +1,59 @@
# 确保在项目根目录下执行此脚本
# 7-Zip路径 - 请根据实际安装位置修改
$7zipPath = "D:\7zip\7-Zip\7z.exe" # 原始路径
# $7zipPath = "C:\Program Files\7-Zip\7z.exe" # 修改后的路径
# 创建临时部署目录
New-Item -Path "deploy_temp" -ItemType Directory -Force
# 复制整个项目目录到临时目录
Copy-Item -Path "role_based_system" -Destination "deploy_temp\" -Recurse -Force
Copy-Item -Path "user_management" -Destination "deploy_temp\" -Recurse -Force
Copy-Item -Path "manage.py" -Destination "deploy_temp\" -Force
Copy-Item -Path "requirements.txt" -Destination "deploy_temp\" -Force
Copy-Item -Path "*.md" -Destination "deploy_temp\" -Force -ErrorAction SilentlyContinue
# 移除不需要的文件和目录
Get-ChildItem -Path "deploy_temp" -Recurse -Filter "__pycache__" -Directory | Remove-Item -Recurse -Force
Get-ChildItem -Path "deploy_temp" -Recurse -Filter "*.pyc" | Remove-Item -Force
# 特别排除不需要的目录
$excludeDirs = @(
".git", ".idea", ".vscode",
"venv", ".venv", "env", "__pycache__", "migrations"
)
foreach ($dir in $excludeDirs) {
Get-ChildItem -Path "deploy_temp" -Recurse -Directory -Filter $dir |
Where-Object { $_.FullName -notmatch "\\migrations\\__pycache__" } |
Remove-Item -Recurse -Force -ErrorAction SilentlyContinue
}
# 保留migrations目录但删除其中的pyc文件
if (Test-Path "deploy_temp\user_management\migrations") {
Get-ChildItem -Path "deploy_temp\user_management\migrations" -Filter "*.pyc" | Remove-Item -Force
Get-ChildItem -Path "deploy_temp\user_management\migrations" -Directory -Filter "__pycache__" | Remove-Item -Recurse -Force
}
# 排除不需要的文件
$excludeFiles = @(
"*.pyc", "*.pyo", "*.pyd", "*.so", "*.dll",
"*.db", "*.sqlite3", "*.log", "*.zip", "*.tar.gz",
"local_settings.py", "*.bak"
)
foreach ($pattern in $excludeFiles) {
Get-ChildItem -Path "deploy_temp" -Recurse -Filter $pattern | Remove-Item -Force -ErrorAction SilentlyContinue
}
# 使用7-Zip打包
& $7zipPath a -ttar knowledge_system.tar ".\deploy_temp\*"
& $7zipPath a -tgzip knowledge_system.tar.gz knowledge_system.tar
# 清理临时文件
Remove-Item -Path "knowledge_system.tar" -Force
Remove-Item -Path "deploy_temp" -Recurse -Force
Write-Host "部署包已创建: knowledge_system.tar.gz" -ForegroundColor Green

View File

@ -65,7 +65,7 @@ MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
# 'django.middleware.csrf.CsrfViewMiddleware', # WebSocket不需要CSRF
'django.middleware.csrf.CsrfViewMiddleware', # 确保这行没有被注释
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
@ -176,8 +176,10 @@ CORS_ALLOW_CREDENTIALS = True
CORS_ALLOWED_ORIGINS = [
"http://localhost:8000",
"http://127.0.0.1:8000",
"http://124.222.236.141:8000",
"ws://localhost:8000", # 添加 WebSocket
"ws://127.0.0.1:8000" # 添加 WebSocket
"ws://127.0.0.1:8000", # 添加 WebSocket
"ws://124.222.236.141:8000", # 添加 WebSocket
]
# 允许的请求头
CORS_ALLOWED_HEADERS = [
@ -208,8 +210,10 @@ CORS_ALLOWED_METHODS = [
CSRF_TRUSTED_ORIGINS = [
'http://localhost:8000',
'http://127.0.0.1:8000',
'http://124.222.236.141:8000',
'ws://localhost:8000', # 添加 WebSocket
'ws://127.0.0.1:8000', # 添加 WebSocket
'ws://124.222.236.141:8000', # 添加 WebSocket
]
# 服务器配置
# 静态文件配置
@ -241,6 +245,13 @@ LOGGING = {
'handlers': ['console', 'file'],
'level': 'DEBUG',
},
'loggers': {
'django.security.csrf': {
'handlers': ['file'],
'level': 'WARNING',
'propagate': True,
},
},
}
# CSRF 配置

View File

@ -420,12 +420,112 @@ class KnowledgeBaseViewSet(viewsets.ModelViewSet):
def list(self, request, *args, **kwargs):
try:
queryset = self.get_queryset()
serializer = self.get_serializer(queryset, many=True)
# 获取搜索关键字
keyword = request.query_params.get('keyword', '')
# 如果有关键字,构建搜索条件
if keyword:
query = Q(name__icontains=keyword) | \
Q(desc__icontains=keyword) | \
Q(department__icontains=keyword) | \
Q(group__icontains=keyword)
queryset = queryset.filter(query)
# 获取分页参数
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
except ValueError:
page = 1
page_size = 10
# 获取用户所有有效的知识库权限
user = request.user
active_permissions = KBPermissionModel.objects.filter(
user=user,
status='active',
expires_at__gt=timezone.now()
).select_related('knowledge_base')
# 创建权限映射字典
permission_map = {
str(perm.knowledge_base.id): {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
}
for perm in active_permissions
}
# 计算总数量
total = queryset.count()
# 分页处理
start = (page - 1) * page_size
end = start + page_size
paginated_queryset = queryset[start:end]
# 序列化知识库数据
serializer = self.get_serializer(paginated_queryset, many=True)
data = serializer.data
# 为每个知识库添加权限信息
for item in data:
kb_id = item['id']
permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 检查知识库特定权限
if kb_id in permission_map:
permissions.update(permission_map[kb_id])
# 如果没有特定权限使用_can_read/_can_edit/_can_delete方法设置默认权限
else:
# 获取必要的知识库属性
kb_type = item['type']
department = item.get('department')
group = item.get('group')
creator_id = item.get('user_id')
# 使用方法判断权限
permissions['can_read'] = self._can_read(kb_type, user, department, group, creator_id)
permissions['can_edit'] = self._can_edit(kb_type, user, department, group, creator_id)
permissions['can_delete'] = self._can_delete(kb_type, user, department, group, creator_id)
# 添加权限信息到知识库数据
item['permissions'] = permissions
# 如果有关键字,添加高亮信息
if keyword:
# 处理name高亮
if 'name' in item and keyword.lower() in item['name'].lower():
highlighted = item['name'].replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_name'] = highlighted
# 处理desc高亮注意处理None值
if 'desc' in item and item.get('desc') is not None:
desc_text = str(item['desc'])
if keyword.lower() in desc_text.lower():
highlighted = desc_text.replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_desc'] = highlighted
return Response({
"code": 200,
"message": "获取知识库列表成功",
"data": serializer.data
"data": {
"total": total,
"page": page,
"page_size": page_size,
"keyword": keyword if keyword else None,
"items": data
}
})
except Exception as e:
logger.error(f"获取知识库列表失败: {str(e)}")
@ -439,9 +539,10 @@ class KnowledgeBaseViewSet(viewsets.ModelViewSet):
def get_queryset(self):
"""获取用户有权限查看的知识库列表"""
user = self.request.user
all_knowledge_bases = KnowledgeBase.objects.all()
# 返回用户创建的或有读权限的知识库
return KnowledgeBase.objects.filter(
# 首先过滤出用户创建的或有显式读权限的知识库
explicit_permission_kbs = KnowledgeBase.objects.filter(
Q(user_id=user.id) | # 是创建者
Q( # 或在权限表中有读权限
id__in=KBPermissionModel.objects.filter(
@ -450,7 +551,34 @@ class KnowledgeBaseViewSet(viewsets.ModelViewSet):
status='active'
).values_list('knowledge_base_id', flat=True)
)
).distinct()
)
# 获取显式权限知识库的ID集合
explicit_kb_ids = set(explicit_permission_kbs.values_list('id', flat=True))
# 对于没有显式权限的知识库使用_can_read方法判断
result_kbs = list(explicit_permission_kbs)
# 如果用户是管理员,可以查看所有知识库
if user.role == 'admin':
return all_knowledge_bases
# 否则,根据角色和部门/组筛选
for kb in all_knowledge_bases:
if kb.id not in explicit_kb_ids:
has_read_permission = self._can_read(
type=kb.type,
user=user,
department=kb.department,
group=kb.group,
creator_id=kb.user_id
)
if has_read_permission:
result_kbs.append(kb)
# 转换为queryset并去重
kb_ids = [kb.id for kb in result_kbs]
return KnowledgeBase.objects.filter(id__in=kb_ids).distinct()
def create(self, request, *args, **kwargs):
try:
@ -655,22 +783,76 @@ class KnowledgeBaseViewSet(viewsets.ModelViewSet):
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def _can_edit(self, type, user):
def _can_edit(self, type, user, department=None, group=None, creator_id=None):
"""判断用户是否有编辑权限"""
if type == 'admin':
return True
elif type == 'secret':
return user.role == 'admin'
elif type in ['leader', 'member']:
return user.role in ['admin', 'leader']
if user.role == 'admin':
return True # 管理员对所有知识库都有编辑权限
if type == 'secret':
return False # 除管理员外其他人无权编辑secret类型知识库
if type == 'leader':
# 同部门组长可编辑leader知识库
return user.role == 'leader' and user.department == department
if type == 'member':
# 同部门组长可编辑所有member知识库
if user.role == 'leader' and user.department == department:
return True
# 成员只能编辑同部门同组的member知识库
return user.role == 'member' and user.department == department and user.group == group
if type == 'private':
# 私有知识库只有创建者可编辑
return str(user.id) == str(creator_id)
return False
def _can_delete(self, type, user):
def _can_delete(self, type, user, department=None, group=None, creator_id=None):
"""判断用户是否有删除权限"""
if user.role == 'admin':
return True # 管理员对所有知识库都有删除权限
if type in ['admin', 'secret']:
return user.role == 'admin'
elif type in ['leader', 'member']:
return user.role in ['admin', 'leader']
return False # 除管理员外其他人无权删除admin和secret类型知识库
if type == 'leader':
# 同部门组长可删除leader知识库
return user.role == 'leader' and user.department == department
if type == 'member':
# 只有组长可以删除成员知识库(组员不能删除)
return user.role == 'leader' and user.department == department
if type == 'private':
# 私有知识库只有创建者可删除
return str(user.id) == str(creator_id)
return False
def _can_read(self, type, user, department=None, group=None, creator_id=None):
"""判断用户是否有读取权限"""
if user.role == 'admin':
return True # 管理员可以读取所有知识库
if type == 'secret':
return False # 除管理员外secret类型知识库对其他人不可读
if type == 'leader':
# 同部门的leader和member都可以读取
return user.department == department
if type == 'member':
# 同部门组长可以读取所有member知识库
if user.role == 'leader' and user.department == department:
return True
# 成员可以读取同部门同组的member知识库
return user.role == 'member' and user.department == department and user.group == group
if type == 'private':
# 私有知识库只有创建者可读
return str(user.id) == str(creator_id)
return False
def _create_external_dataset(self, instance):
@ -763,16 +945,29 @@ class KnowledgeBaseViewSet(viewsets.ModelViewSet):
"""更新知识库"""
try:
instance = self.get_object()
user = request.user
# 检查编辑权限
# 首先检查权限表中是否有显式权限
permission = KBPermissionModel.objects.filter(
knowledge_base=instance,
user=request.user,
user=user,
can_edit=True,
status='active'
).first()
if not permission:
# 如果权限表中没有记录则使用_can_edit方法判断
has_permission = bool(permission)
if not has_permission:
# 根据知识库类型和用户角色判断权限
has_permission = self._can_edit(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
if not has_permission:
return Response({
"code": 403,
"message": "没有编辑权限",
@ -852,16 +1047,29 @@ class KnowledgeBaseViewSet(viewsets.ModelViewSet):
"""删除知识库"""
try:
instance = self.get_object()
user = request.user
# 检查删除权限
permission = KBPermissionModel.objects.filter(
knowledge_base=instance,
user=request.user,
user=user,
can_delete=True,
status='active'
).first()
if not permission:
# 如果权限表中没有记录则使用_can_delete方法判断
has_permission = bool(permission)
if not has_permission:
# 根据知识库类型和用户角色判断权限
has_permission = self._can_delete(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
if not has_permission:
return Response({
"code": 403,
"message": "没有删除权限",
@ -912,27 +1120,54 @@ class KnowledgeBaseViewSet(viewsets.ModelViewSet):
"""获取用户对特定知识库的权限"""
try:
instance = self.get_object()
user = request.user
# 从权限表获取权限信息
permission = KBPermissionModel.objects.filter(
knowledge_base=instance,
user=request.user,
status='active'
).first()
# 构建权限数据
# 构建默认权限数据
permissions_data = {
"can_read": False,
"can_edit": False,
"can_delete": False
}
# 从权限表获取权限信息
permission = KBPermissionModel.objects.filter(
knowledge_base=instance,
user=user,
status='active'
).first()
# 如果在权限表中有记录,则使用表中的权限
if permission:
permissions_data.update({
"can_read": permission.can_read,
"can_edit": permission.can_edit,
"can_delete": permission.can_delete
})
# 否则使用_can_read/_can_edit/_can_delete方法判断权限
else:
permissions_data.update({
"can_read": self._can_read(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
"can_edit": self._can_edit(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
"can_delete": self._can_delete(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
})
return Response({
"code": 200,
@ -958,7 +1193,6 @@ class KnowledgeBaseViewSet(viewsets.ModelViewSet):
"message": f"获取权限信息失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def summary(self, request):
"""获取所有可见知识库的概要信息除了secret类型"""
@ -1013,12 +1247,6 @@ class KnowledgeBaseViewSet(viewsets.ModelViewSet):
'can_edit': True,
'can_delete': True
})
elif user.role == 'leader' and user.department == kb.department:
permissions.update({
'can_read': True,
'can_edit': True,
'can_delete': True
})
elif user.role == 'member' and user.department == kb.department:
permissions.update({
'can_read': True,
@ -1069,6 +1297,228 @@ class KnowledgeBaseViewSet(viewsets.ModelViewSet):
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def retrieve(self, request, *args, **kwargs):
try:
# 获取知识库对象
instance = self.get_object()
serializer = self.get_serializer(instance)
data = serializer.data
# 获取用户
user = request.user
# 默认权限设置
permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 尝试获取特定权限记录
try:
permission_record = KBPermissionModel.objects.filter(
knowledge_base=instance,
user=user,
status='active',
expires_at__gt=timezone.now()
).first()
if permission_record:
permissions.update({
'can_read': permission_record.can_read,
'can_edit': permission_record.can_edit,
'can_delete': permission_record.can_delete
})
else:
# 使用_can_read/_can_edit/_can_delete方法判断权限
permissions.update({
'can_read': self._can_read(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
'can_edit': self._can_edit(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
'can_delete': self._can_delete(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
})
except Exception as e:
logger.error(f"获取权限信息失败: {str(e)}")
# 权限获取失败时使用默认权限
# 添加权限信息到返回数据
data['permissions'] = permissions
return Response({
'code': 200,
'message': '获取知识库详情成功',
'data': data
})
except Exception as e:
logger.error(f"获取知识库详情失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取知识库详情失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def search(self, request):
"""搜索知识库功能"""
try:
# 获取搜索关键字
keyword = request.query_params.get('keyword', '')
if not keyword:
return Response({
"code": 400,
"message": "搜索关键字不能为空",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取分页参数
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
except ValueError:
page = 1
page_size = 10
# 构建搜索条件
query = Q(name__icontains=keyword) | \
Q(desc__icontains=keyword) | \
Q(department__icontains=keyword) | \
Q(group__icontains=keyword)
# 排除 secret 类型的知识库
queryset = KnowledgeBase.objects.filter(query).exclude(type='secret')
# 获取用户
user = request.user
# 获取用户所有有效的知识库权限
active_permissions = KBPermissionModel.objects.filter(
user=user,
status='active',
expires_at__gt=timezone.now()
).select_related('knowledge_base')
# 创建权限映射字典
permission_map = {
str(perm.knowledge_base.id): {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
}
for perm in active_permissions
}
# 计算总数量
total = queryset.count()
# 分页处理
start = (page - 1) * page_size
end = start + page_size
paginated_queryset = queryset[start:end]
# 序列化知识库数据
serializer = self.get_serializer(paginated_queryset, many=True)
data = serializer.data
# 处理每个知识库项的权限和返回内容
result_items = []
for item in data:
kb_id = item['id']
kb_permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 检查知识库特定权限
if kb_id in permission_map:
kb_permissions.update(permission_map[kb_id])
# 如果没有特定权限使用_can方法判断默认权限
else:
kb_type = item['type']
department = item.get('department')
group = item.get('group')
creator_id = item.get('user_id')
kb_permissions.update({
'can_read': self._can_read(kb_type, user, department, group, creator_id),
'can_edit': self._can_edit(kb_type, user, department, group, creator_id),
'can_delete': self._can_delete(kb_type, user, department, group, creator_id)
})
# 添加权限信息
item['permissions'] = kb_permissions
# 根据权限返回不同级别的信息
if kb_permissions['can_read']:
# 有读取权限,返回完整信息
result_items.append(item)
else:
# 无读取权限,只返回概要信息
summary_info = {
'id': item['id'],
'name': item['name'],
'type': item['type'],
'department': item.get('department'),
'permissions': kb_permissions
}
result_items.append(summary_info)
# 高亮搜索关键字
for item in result_items:
if 'name' in item and keyword.lower() in item['name'].lower():
highlighted = item['name'].replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_name'] = highlighted
# 确保desc不为None并且是字符串
if 'desc' in item and item.get('desc') is not None:
desc_text = str(item['desc']) # 转换为字符串以确保安全
if keyword.lower() in desc_text.lower():
highlighted = desc_text.replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_desc'] = highlighted
return Response({
"code": 200,
"message": "搜索知识库成功",
"data": {
"total": total,
"page": page,
"page_size": page_size,
"keyword": keyword,
"items": result_items
}
})
except Exception as e:
logger.error(f"搜索知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"搜索知识库失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class PermissionViewSet(viewsets.ModelViewSet):
serializer_class = PermissionSerializer
permission_classes = [IsAuthenticated]