role_based_system/user_management/views.py

3578 lines
147 KiB
Python
Raw Normal View History

from rest_framework import viewsets, status
from rest_framework.decorators import action, api_view, permission_classes
from rest_framework.permissions import IsAuthenticated, AllowAny, IsAdminUser
from rest_framework.response import Response
from rest_framework.exceptions import APIException, PermissionDenied, ValidationError, NotFound
from rest_framework.authentication import TokenAuthentication
from django.utils import timezone
from django.db import connection
from django.db.models import Q, Max, Count, F
from datetime import timedelta, datetime
import mysql.connector
from django.contrib.auth import get_user_model, authenticate, login, logout
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from rest_framework.authtoken.models import Token
import requests
import json
from django.db import transaction
from django.core.exceptions import ObjectDoesNotExist
import sys
import random
import string
import time
import logging
import os
from rest_framework.test import APIRequestFactory
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericForeignKey
2025-03-26 12:26:20 +08:00
from django.http import Http404, HttpResponse
from django.db import IntegrityError
from channels.exceptions import ChannelFull
from django.conf import settings
from django.shortcuts import get_object_or_404
from django.db import models
from rest_framework.views import APIView
from django.core.validators import validate_email
# from django.core.exceptions import ValidationError
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
import uuid
from rest_framework import serializers
import traceback
2025-03-26 12:26:20 +08:00
import requests
import json
# 添加模型导入
from .models import (
User,
Data, # 替换原来的 AdminData, LeaderData, MemberData
Permission, # 替换原来的 DataPermission, TablePermission
ChatHistory,
KnowledgeBase,
Notification,
KnowledgeBasePermission as KBPermissionModel
)
from .serializers import (
UserSerializer,
DataSerializer, # 需要更新
PermissionSerializer, # 需要更新
ChatHistorySerializer,
KnowledgeBaseSerializer,
KnowledgePermissionSerializer, # 添加这个导入
NotificationSerializer
)
# 导入自定义权限类
from .permissions import ResourceCRUDPermission, PermissionRequestPermission, DataPermission, KnowledgeBasePermission as KBPermissionClass
from .exceptions import ExternalAPIError
# 获取正确的用户模型
User = get_user_model()
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler() # 输出到控制台
]
)
class ChatHistoryViewSet(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated]
queryset = ChatHistory.objects.all()
def get_queryset(self):
"""确保用户只能看到自己的未删除的聊天记录"""
return ChatHistory.objects.filter(
user=self.request.user,
is_deleted=False
)
def list(self, request):
2025-03-26 12:26:20 +08:00
"""获取对话列表概览"""
try:
# 获取查询参数
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
2025-03-26 12:26:20 +08:00
# 获取所有对话的概览
latest_chats = self.get_queryset().values(
'conversation_id'
).annotate(
latest_id=Max('id'),
message_count=Count('id'),
last_message=Max('created_at')
).order_by('-last_message')
2025-03-26 12:26:20 +08:00
# 计算分页
total = latest_chats.count()
start = (page - 1) * page_size
end = start + page_size
chats = latest_chats[start:end]
2025-03-26 12:26:20 +08:00
results = []
for chat in chats:
# 获取最新消息记录
latest_record = ChatHistory.objects.get(id=chat['latest_id'])
# 从metadata中获取完整的知识库信息
dataset_info = []
if latest_record.metadata:
dataset_id_list = latest_record.metadata.get('dataset_id_list', [])
dataset_names = latest_record.metadata.get('dataset_names', [])
# 如果有知识库ID列表
if dataset_id_list:
# 如果同时有名称列表且长度匹配
if dataset_names and len(dataset_names) == len(dataset_id_list):
dataset_info = [{
'id': str(id),
'name': name
} for id, name in zip(dataset_id_list, dataset_names)]
else:
# 如果没有名称列表则只返回ID
datasets = KnowledgeBase.objects.filter(id__in=dataset_id_list)
dataset_info = [{
'id': str(ds.id),
'name': ds.name
} for ds in datasets]
results.append({
'conversation_id': chat['conversation_id'],
'message_count': chat['message_count'],
'last_message': latest_record.content,
'last_time': chat['last_message'].strftime('%Y-%m-%d %H:%M:%S'),
'dataset_id_list': [ds['id'] for ds in dataset_info], # 添加完整的知识库ID列表
'datasets': dataset_info # 包含ID和名称的完整信息
})
2025-03-26 12:26:20 +08:00
return Response({
'code': 200,
'message': '获取成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'results': results
}
})
except Exception as e:
logger.error(f"获取聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
2025-03-26 12:26:20 +08:00
@action(detail=False, methods=['get'])
def conversation_detail(self, request):
"""获取特定对话的详细信息"""
try:
conversation_id = request.query_params.get('conversation_id')
if not conversation_id:
return Response({
'code': 400,
'message': '缺少conversation_id参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取对话历史
messages = self.get_queryset().filter(
conversation_id=conversation_id
).order_by('created_at')
if not messages.exists():
return Response({
'code': 404,
'message': '对话不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 获取知识库信息
first_message = messages.first()
dataset_info = []
if first_message and first_message.metadata:
if 'dataset_id_list' in first_message.metadata:
datasets = KnowledgeBase.objects.filter(
id__in=first_message.metadata['dataset_id_list']
)
dataset_info = [{
'id': str(ds.id),
'name': ds.name,
'type': ds.type
} for ds in datasets]
return Response({
'code': 200,
'message': '获取成功',
'data': {
'conversation_id': conversation_id,
'datasets': dataset_info,
'messages': [{
'id': str(msg.id),
'role': msg.role,
'content': msg.content,
'created_at': msg.created_at.strftime('%Y-%m-%d %H:%M:%S')
} for msg in messages]
}
})
except Exception as e:
logger.error(f"获取对话详情失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取对话详情失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def available_datasets(self, request):
"""获取用户可访问的知识库列表"""
try:
user = request.user
# 获取用户有权限访问的知识库
accessible_datasets = []
# 1. 获取用户通过权限表直接授权的知识库
permission_datasets = KnowledgeBase.objects.filter(
id__in=KBPermissionModel.objects.filter(
user=user,
can_read=True,
status='active'
).values_list('knowledge_base_id', flat=True)
)
# 2. 获取用户根据角色可以访问的知识库
role_datasets = KnowledgeBase.objects.filter(
Q(type='member', department=user.department) | # 成员级知识库
Q(type='leader', department=user.department) | # 部门级知识库,组长可访问
Q(type='admin') # 管理级知识库,管理员可访问
).exclude(
Q(type='private') & ~Q(user_id=str(user.id)) # 排除不属于自己的私人知识库
)
# 3. 合并并去重
accessible_datasets = list(set(list(permission_datasets) + list(role_datasets)))
return Response({
'code': 200,
'message': '获取成功',
'data': [{
'id': str(ds.id),
'name': ds.name,
'type': ds.type,
'department': ds.department,
'description': ds.desc
} for ds in accessible_datasets]
})
except Exception as e:
logger.error(f"获取可用知识库列表失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取可用知识库列表失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def create(self, request):
"""创建聊天记录"""
try:
data = request.data
2025-03-26 12:26:20 +08:00
# 检查必填字段 - 支持单知识库或多知识库模式
if 'question' not in data:
return Response({
'code': 400,
'message': '缺少必填字段: question',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 检查知识库ID支持dataset_id或dataset_id_list格式
dataset_ids = []
if 'dataset_id' in data:
dataset_id = data['dataset_id']
# 直接使用标准UUID格式
dataset_ids.append(str(dataset_id))
elif 'dataset_id_list' in data and isinstance(data['dataset_id_list'], (list, str)):
# 处理可能的字符串格式
if isinstance(data['dataset_id_list'], str):
try:
# 尝试解析JSON字符串
dataset_list = json.loads(data['dataset_id_list'])
if isinstance(dataset_list, list):
dataset_ids = [str(id) for id in dataset_list]
except json.JSONDecodeError:
# 如果解析失败可能是单个ID
dataset_ids = [str(data['dataset_id_list'])]
else:
# 如果已经是列表直接使用标准UUID格式
dataset_ids = [str(id) for id in data['dataset_id_list']]
else:
return Response({
'code': 400,
'message': '缺少必填字段: dataset_id 或 dataset_id_list',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
if not dataset_ids:
return Response({
'code': 400,
'message': '至少需要提供一个知识库ID',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证所有知识库并收集external_ids
external_id_list = []
user = request.user
knowledge_bases = [] # 存储所有知识库对象
for kb_id in dataset_ids:
try:
knowledge_base = KnowledgeBase.objects.filter(id=kb_id).first()
if not knowledge_base:
return Response({
'code': 404,
'message': f'知识库不存在: {kb_id}',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
knowledge_bases.append(knowledge_base)
# 检查知识库权限
can_read = False
# 检查权限表
permission = KBPermissionModel.objects.filter(
knowledge_base=knowledge_base,
user=user,
can_read=True,
status='active'
).first()
if permission:
can_read = True
else:
can_read = self._can_read(
type=knowledge_base.type,
user=user,
department=knowledge_base.department,
group=knowledge_base.group,
creator_id=knowledge_base.user_id
)
if not can_read:
return Response({
'code': 403,
'message': f'无权访问知识库: {knowledge_base.name}',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 添加知识库的external_id到列表
if knowledge_base.external_id:
external_id_list.append(knowledge_base.external_id)
else:
logger.warning(f"知识库 {knowledge_base.id} ({knowledge_base.name}) 没有external_id")
except Exception as e:
return Response({
'code': 400,
2025-03-26 12:26:20 +08:00
'message': f'处理知识库ID出错: {str(e)}',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
2025-03-26 12:26:20 +08:00
if not external_id_list:
return Response({
'code': 400,
2025-03-26 12:26:20 +08:00
'message': '没有有效的知识库external_id',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
2025-03-26 12:26:20 +08:00
# 获取或创建对话ID
conversation_id = data.get('conversation_id')
# 如果没有提供 conversation_id根据知识库组合生成新的ID
if not conversation_id:
# 对知识库ID列表排序以确保相同组合生成相同的hash
sorted_kb_ids = sorted(dataset_ids)
# 使用知识库ID组合生成唯一的conversation_id
conversation_id = str(uuid.uuid5(
uuid.NAMESPACE_DNS,
'-'.join(sorted_kb_ids)
))
logger.info(f"为知识库组合 {sorted_kb_ids} 生成新的conversation_id: {conversation_id}")
else:
logger.info(f"使用现有conversation_id: {conversation_id}")
# 调用外部API获取答案 (传递多个knowledge base的external_id)
answer = self._get_answer_from_external_api(
dataset_external_id_list=external_id_list,
question=data['question']
)
if not answer:
return Response({
'code': 500,
'message': '获取AI回答失败',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# 准备完整的metadata
metadata = {
'model_id': data.get('model_id', '58c5deb4-f2e2-11ef-9a1b-0242ac120009'),
'dataset_id_list': [str(id) for id in dataset_ids],
'dataset_external_id_list': [str(id) for id in external_id_list],
'dataset_names': [kb.name for kb in knowledge_bases] # 添加知识库名称列表
}
# 创建用户问题记录
question_record = ChatHistory.objects.create(
user=request.user,
2025-03-26 12:26:20 +08:00
knowledge_base=knowledge_bases[0], # 仍然需要一个主知识库,使用第一个
conversation_id=str(conversation_id),
role='user',
content=data['question'],
2025-03-26 12:26:20 +08:00
metadata=metadata
)
# 创建AI回答记录
answer_record = ChatHistory.objects.create(
user=request.user,
2025-03-26 12:26:20 +08:00
knowledge_base=knowledge_bases[0], # 仍然需要一个主知识库,使用第一个
conversation_id=str(conversation_id),
parent_id=str(question_record.id),
role='assistant',
2025-03-26 12:26:20 +08:00
content=answer,
metadata=metadata
)
2025-03-26 12:26:20 +08:00
# 返回完整的响应
return Response({
'code': 200,
'message': '创建成功',
'data': {
2025-03-26 12:26:20 +08:00
'id': str(answer_record.id),
'conversation_id': str(conversation_id),
'dataset_id_list': [str(id) for id in dataset_ids],
'dataset_names': [kb.name for kb in knowledge_bases], # 返回所有知识库名称
'role': 'assistant',
'content': answer_record.content,
'created_at': answer_record.created_at.strftime('%Y-%m-%d %H:%M:%S')
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
logger.error(f"创建聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'创建聊天记录失败: {str(e)}',
'data': None
2025-03-26 12:26:20 +08:00
}, status.HTTP_500_INTERNAL_SERVER_ERROR)
def _get_answer_from_external_api(self, dataset_external_id_list, question):
"""调用外部API获取AI回答"""
try:
# 确保所有ID都是字符串
dataset_external_ids = [str(id) if isinstance(id, uuid.UUID) else id for id in dataset_external_id_list]
logger.info(f"准备调用外部API知识库ID列表: {dataset_external_ids}")
# 第一个API调用创建聊天
chat_request_data = {
"id": "65031f4d-c86d-430e-8089-d8ff2731a837",
"model_id": "58c5deb4-f2e2-11ef-9a1b-0242ac120009",
"dataset_id_list": dataset_external_ids,
"multiple_rounds_dialogue": False,
"dataset_setting": {
"top_n": 10,
"similarity": "0.3",
"max_paragraph_char_number": 10000,
"search_mode": "blend",
"no_references_setting": {
"value": "{question}",
"status": "ai_questioning"
}
},
"model_setting": {
"prompt": "**相关文档内容**{data} **回答要求**:如果相关文档内容中没有可用信息,请回答\"没有在知识库中查找到相关信息,建议咨询相关技术支持或参考官方文档进行操作\"。请根据相关文档内容回答用户问题。不要输出与用户问题无关的内容。请使用中文回答客户问题。**用户问题**{question}"
},
"problem_optimization": False
}
logger.info(f"发送创建聊天请求:{settings.API_BASE_URL}/api/application/chat/open")
try:
# 测试JSON序列化提前捕获可能的错误
json_data = json.dumps(chat_request_data)
logger.debug(f"请求数据序列化成功,长度: {len(json_data)}")
except TypeError as e:
logger.error(f"JSON序列化失败: {str(e)}")
return None
chat_response = requests.post(
url=f"{settings.API_BASE_URL}/api/application/chat/open",
json=chat_request_data,
headers={"Content-Type": "application/json"},
timeout=30
)
logger.info(f"API响应状态码: {chat_response.status_code}")
if chat_response.status_code != 200:
logger.error(f"外部API调用失败: {chat_response.text}")
return None
chat_data = chat_response.json()
logger.debug(f"API响应数据: {chat_data}")
if chat_data.get('code') != 200 or not chat_data.get('data'):
logger.error(f"外部API返回错误: {chat_data}")
return None
chat_id = chat_data['data']
logger.info(f"聊天创建成功chat_id: {chat_id}")
# 第二个API调用发送消息
message_request_data = {
"message": question,
"re_chat": False,
"stream": True
}
logger.info(f"发送聊天消息请求: {settings.API_BASE_URL}/api/application/chat_message/{chat_id}")
message_response = requests.post(
url=f"{settings.API_BASE_URL}/api/application/chat_message/{chat_id}",
json=message_request_data,
headers={"Content-Type": "application/json"},
stream=True,
timeout=60
)
if message_response.status_code != 200:
logger.error(f"外部API聊天消息调用失败: {message_response.status_code}, {message_response.text}")
return None
# 拼接流式响应 - 修复SSE格式解析
full_content = ""
try:
for line in message_response.iter_lines():
if line:
line_text = line.decode('utf-8')
# 处理SSE格式 (data: {...})
if line_text.startswith('data: '):
json_str = line_text[6:] # 去掉 "data: " 前缀
logger.debug(f"处理SSE数据: {json_str}")
try:
chunk = json.loads(json_str)
if 'content' in chunk:
content_part = chunk['content']
full_content += content_part
logger.debug(f"追加内容: '{content_part}'")
if chunk.get('is_end', False):
logger.debug("收到结束标记")
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {str(e)}, 原始数据: {json_str}")
else:
logger.debug(f"收到非SSE格式数据: {line_text}")
except Exception as e:
logger.error(f"处理流式响应出错: {str(e)}")
if full_content:
logger.info(f"已接收部分内容: {len(full_content)} 字符")
return full_content.strip()
return None
logger.info(f"聊天回答拼接完成,总长度: {len(full_content)}")
return full_content.strip() if full_content else "未能获取到有效回答"
except Exception as e:
logger.error(f"调用外部API获取回答失败: {str(e)}")
logger.error(traceback.format_exc())
return None
def update(self, request, pk=None):
"""更新聊天记录"""
try:
record = self.get_queryset().filter(id=pk).first()
if not record:
return Response({
'code': 404,
'message': '记录不存在或无权限',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
data = request.data
updateable_fields = ['content', 'metadata']
if 'content' in data:
record.content = data['content']
if 'metadata' in data:
current_metadata = record.metadata or {}
current_metadata.update(data['metadata'])
record.metadata = current_metadata
record.save()
return Response({
'code': 200,
'message': '更新成功',
'data': {
'id': record.id,
'conversation_id': record.conversation_id,
'role': record.role,
'content': record.content,
'metadata': record.metadata,
'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
})
except Exception as e:
logger.error(f"更新聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'更新聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def destroy(self, request, pk=None):
"""删除聊天记录(软删除)"""
try:
record = self.get_queryset().filter(id=pk).first()
if not record:
return Response({
'code': 404,
'message': '记录不存在或无权限',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
record.soft_delete()
return Response({
'code': 200,
'message': '删除成功',
'data': None
})
except Exception as e:
logger.error(f"删除聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'删除聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def search(self, request):
"""搜索聊天记录"""
try:
# 获取查询参数
keyword = request.query_params.get('keyword', '').strip()
dataset_id = request.query_params.get('dataset_id')
start_date = request.query_params.get('start_date')
end_date = request.query_params.get('end_date')
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
# 基础查询
query = self.get_queryset()
# 添加过滤条件
if keyword:
query = query.filter(
Q(content__icontains=keyword) |
Q(knowledge_base__name__icontains=keyword)
)
if dataset_id:
query = query.filter(knowledge_base__id=dataset_id)
if start_date:
query = query.filter(created_at__gte=start_date)
if end_date:
query = query.filter(created_at__lte=end_date)
# 计算分页
total = query.count()
start = (page - 1) * page_size
end = start + page_size
# 获取分页数据
records = query.order_by('-created_at')[start:end]
# 序列化数据
results = []
for record in records:
result = {
'id': record.id,
'conversation_id': record.conversation_id,
'dataset_id': str(record.knowledge_base.id),
'dataset_name': record.knowledge_base.name,
'role': record.role,
'content': record.content,
'created_at': record.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'metadata': record.metadata
}
if keyword:
result['highlights'] = {
'content': self._highlight_keyword(record.content, keyword)
}
results.append(result)
return Response({
'code': 200,
'message': '搜索成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'results': results
}
})
except Exception as e:
logger.error(f"搜索聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'搜索失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
2025-03-26 12:26:20 +08:00
@action(detail=False, methods=['get'])
def export(self, request):
"""导出聊天记录为Excel文件"""
try:
# 获取查询参数
conversation_id = request.query_params.get('conversation_id')
dataset_id = request.query_params.get('dataset_id')
history_days = request.query_params.get('history_days', '7') # 默认导出最近7天
# 至少需要一个筛选条件
if not conversation_id and not dataset_id:
return Response({
'code': 400,
'message': '需要提供conversation_id或dataset_id参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证权限
user = request.user
if dataset_id:
knowledge_base = KnowledgeBase.objects.filter(id=dataset_id).first()
if not knowledge_base:
return Response({
'code': 404,
'message': '知识库不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 检查是否有读取权限
can_read = False
permission = KBPermissionModel.objects.filter(
knowledge_base=knowledge_base,
user=user,
can_read=True,
status='active'
).first()
if permission:
can_read = True
else:
can_read = self._can_read(
type=knowledge_base.type,
user=user,
department=knowledge_base.department,
group=knowledge_base.group,
creator_id=knowledge_base.user_id
)
if not can_read:
return Response({
'code': 403,
'message': '无权访问该知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 查询确认有聊天记录存在
query = self.get_queryset()
if conversation_id:
records = query.filter(conversation_id=conversation_id)
elif dataset_id:
records = query.filter(knowledge_base__id=dataset_id)
if not records.exists():
return Response({
'code': 404,
'message': '未找到相关对话记录',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 调用外部API导出Excel文件 - 使用GET请求
application_id = "65031f4d-c86d-430e-8089-d8ff2731a837" # 固定值
export_url = f"{settings.API_BASE_URL}/api/application/{application_id}/chat/export?history_day={history_days}"
logger.info(f"发送导出请求:{export_url}")
export_response = requests.get(
url=export_url,
timeout=60,
stream=True # 使用流式传输处理大文件
)
# 检查响应状态
if export_response.status_code != 200:
logger.error(f"导出API调用失败: {export_response.status_code}, {export_response.text}")
return Response({
'code': 500,
'message': '导出失败,外部服务返回错误',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# 创建响应对象并设置文件下载头
response = HttpResponse(
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
response['Content-Disposition'] = 'attachment; filename="data.xlsx"'
# 将API响应内容写入响应对象
for chunk in export_response.iter_content(chunk_size=8192):
if chunk:
response.write(chunk)
logger.info("导出成功完成")
return response
except Exception as e:
logger.error(f"导出聊天记录失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'导出聊天记录失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def chat_list(self, request):
"""获取对话列表"""
try:
# 获取查询参数
history_days = request.query_params.get('history_days', '7') # 默认7天
# 构建API请求
application_id = "65031f4d-c86d-430e-8089-d8ff2731a837"
api_url = f"{settings.API_BASE_URL}/api/application/{application_id}/chat"
# 添加查询参数
params = {
'history_day': history_days
}
logger.info(f"发送获取对话列表请求:{api_url}")
# 调用外部API
response = requests.get(
url=api_url,
params=params,
timeout=30
)
if response.status_code != 200:
logger.error(f"获取对话列表失败: {response.status_code}, {response.text}")
return Response({
'code': 500,
'message': '获取对话列表失败,外部服务返回错误',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# 解析响应数据
try:
result = response.json()
if result.get('code') != 200:
logger.error(f"外部API返回错误: {result}")
return Response({
'code': result.get('code', 500),
'message': result.get('message', '获取对话列表失败'),
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# 处理返回的数据
chat_list = result.get('data', [])
# 格式化返回数据
formatted_chats = []
for chat in chat_list:
formatted_chat = {
'id': chat['id'],
'chat_id': chat['chat_id'],
'abstract': chat['abstract'],
'message_count': chat['chat_record_count'],
'created_at': datetime.fromisoformat(chat['create_time'].replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.fromisoformat(chat['update_time'].replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S'),
'star_count': chat['star_num'],
'trample_count': chat['trample_num'],
'mark_sum': chat['mark_sum'],
'is_deleted': chat['is_deleted']
}
formatted_chats.append(formatted_chat)
return Response({
'code': 200,
'message': '获取成功',
'data': {
'total': len(formatted_chats),
'results': formatted_chats
}
})
except json.JSONDecodeError as e:
logger.error(f"解析响应数据失败: {str(e)}")
return Response({
'code': 500,
'message': '解析响应数据失败',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
logger.error(f"获取对话列表失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取对话列表失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def _highlight_keyword(self, text, keyword):
"""高亮关键词"""
if not keyword or not text:
return text
return text.replace(
keyword,
f'<em class="highlight">{keyword}</em>'
)
class KnowledgeBaseViewSet(viewsets.ModelViewSet):
serializer_class = KnowledgeBaseSerializer
permission_classes = [IsAuthenticated]
def list(self, request, *args, **kwargs):
try:
queryset = self.get_queryset()
2025-03-10 17:03:58 +08:00
# 获取搜索关键字
keyword = request.query_params.get('keyword', '')
# 如果有关键字,构建搜索条件
if keyword:
query = Q(name__icontains=keyword) | \
Q(desc__icontains=keyword) | \
Q(department__icontains=keyword) | \
Q(group__icontains=keyword)
queryset = queryset.filter(query)
# 获取分页参数
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
except ValueError:
page = 1
page_size = 10
# 获取用户所有有效的知识库权限
user = request.user
active_permissions = KBPermissionModel.objects.filter(
user=user,
status='active',
expires_at__gt=timezone.now()
).select_related('knowledge_base')
# 创建权限映射字典
permission_map = {
str(perm.knowledge_base.id): {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
}
for perm in active_permissions
}
# 计算总数量
total = queryset.count()
# 分页处理
start = (page - 1) * page_size
end = start + page_size
paginated_queryset = queryset[start:end]
# 序列化知识库数据
serializer = self.get_serializer(paginated_queryset, many=True)
data = serializer.data
# 为每个知识库添加权限信息
for item in data:
kb_id = item['id']
permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 检查知识库特定权限
if kb_id in permission_map:
permissions.update(permission_map[kb_id])
# 如果没有特定权限使用_can_read/_can_edit/_can_delete方法设置默认权限
else:
# 获取必要的知识库属性
kb_type = item['type']
department = item.get('department')
group = item.get('group')
creator_id = item.get('user_id')
# 使用方法判断权限
permissions['can_read'] = self._can_read(kb_type, user, department, group, creator_id)
permissions['can_edit'] = self._can_edit(kb_type, user, department, group, creator_id)
permissions['can_delete'] = self._can_delete(kb_type, user, department, group, creator_id)
# 添加权限信息到知识库数据
item['permissions'] = permissions
# 如果有关键字,添加高亮信息
if keyword:
# 处理name高亮
if 'name' in item and keyword.lower() in item['name'].lower():
highlighted = item['name'].replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_name'] = highlighted
# 处理desc高亮注意处理None值
if 'desc' in item and item.get('desc') is not None:
desc_text = str(item['desc'])
if keyword.lower() in desc_text.lower():
highlighted = desc_text.replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_desc'] = highlighted
return Response({
"code": 200,
"message": "获取知识库列表成功",
2025-03-10 17:03:58 +08:00
"data": {
"total": total,
"page": page,
"page_size": page_size,
"keyword": keyword if keyword else None,
"items": data
}
})
except Exception as e:
logger.error(f"获取知识库列表失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"获取知识库列表失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def get_queryset(self):
"""获取用户有权限查看的知识库列表"""
user = self.request.user
2025-03-10 17:03:58 +08:00
all_knowledge_bases = KnowledgeBase.objects.all()
2025-03-10 17:03:58 +08:00
# 首先过滤出用户创建的或有显式读权限的知识库
explicit_permission_kbs = KnowledgeBase.objects.filter(
Q(user_id=user.id) | # 是创建者
Q( # 或在权限表中有读权限
id__in=KBPermissionModel.objects.filter(
user=user,
can_read=True,
status='active'
).values_list('knowledge_base_id', flat=True)
)
2025-03-10 17:03:58 +08:00
)
# 获取显式权限知识库的ID集合
explicit_kb_ids = set(explicit_permission_kbs.values_list('id', flat=True))
# 对于没有显式权限的知识库使用_can_read方法判断
result_kbs = list(explicit_permission_kbs)
# 如果用户是管理员,可以查看所有知识库
if user.role == 'admin':
return all_knowledge_bases
# 否则,根据角色和部门/组筛选
for kb in all_knowledge_bases:
if kb.id not in explicit_kb_ids:
has_read_permission = self._can_read(
type=kb.type,
user=user,
department=kb.department,
group=kb.group,
creator_id=kb.user_id
)
if has_read_permission:
result_kbs.append(kb)
# 转换为queryset并去重
kb_ids = [kb.id for kb in result_kbs]
return KnowledgeBase.objects.filter(id__in=kb_ids).distinct()
def create(self, request, *args, **kwargs):
try:
# 1. 验证知识库名称
name = request.data.get('name')
if not name:
return Response({
'code': 400,
'message': '知识库名称不能为空',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
if KnowledgeBase.objects.filter(name=name).exists():
return Response({
'code': 400,
'message': f'知识库名称 "{name}" 已存在',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 2. 验证用户权限和必填字段
user = request.user
type = request.data.get('type', 'private')
department = request.data.get('department')
group = request.data.get('group')
2025-03-26 12:26:20 +08:00
# 修改权限验证
if type == 'admin':
2025-03-26 12:26:20 +08:00
# 移除管理员权限检查,允许所有用户创建
department = None
group = None
elif type == 'secret':
if user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以创建保密级知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
2025-03-13 13:39:02 +08:00
department = None
group = None
elif type == 'leader':
if user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以创建组长级知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
if not department:
return Response({
'code': 400,
'message': '创建组长级知识库时必须指定部门',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
elif type == 'member':
if user.role not in ['admin', 'leader']:
return Response({
'code': 403,
'message': '只有管理员和组长可以创建成员级知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
if user.role == 'admin' and not department:
return Response({
'code': 400,
'message': '管理员创建成员知识库时必须指定部门',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
elif user.role == 'leader':
department = user.department
if not group:
return Response({
'code': 400,
'message': '创建成员知识库时必须指定组',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
2025-03-13 13:39:02 +08:00
elif type == 'private':
# 对于private类型不保存department和group
department = None
group = None
# 3. 验证请求数据
data = request.data.copy()
data['department'] = department
data['group'] = group
# 不需要手动设置 user_id由序列化器自动处理
serializer = self.get_serializer(data=data)
if not serializer.is_valid():
logger.error(f"数据验证失败: {serializer.errors}")
return Response({
'code': 400,
'message': '数据验证失败',
'data': serializer.errors
}, status=status.HTTP_400_BAD_REQUEST)
with transaction.atomic():
# 4. 创建知识库
try:
knowledge_base = serializer.save()
logger.info(f"知识库创建成功: id={knowledge_base.id}, name={knowledge_base.name}, user_id={knowledge_base.user_id}")
except Exception as e:
logger.error(f"知识库创建失败: {str(e)}")
raise
# 5. 调用外部API创建知识库
try:
external_response = self._create_external_dataset(knowledge_base)
logger.info(f"外部知识库创建响应: {external_response}")
# 处理外部API响应
if isinstance(external_response, str):
knowledge_base.external_id = external_response
knowledge_base.save()
logger.info(f"更新knowledge_base的external_id为: {external_response}")
else:
if external_response.get('code') == 200:
external_id = external_response.get('data', {}).get('id')
if external_id:
knowledge_base.external_id = external_id
knowledge_base.save()
logger.info(f"更新knowledge_base的external_id为: {external_id}")
else:
raise ValueError("外部API响应中未找到知识库ID")
else:
raise ValueError(f"外部API调用失败: {external_response.get('message', '未知错误')}")
except Exception as e:
logger.error(f"外部知识库创建失败: {str(e)}")
logger.error(f"外部API响应内容: {external_response if locals().get('external_response') else 'No response'}")
raise ExternalAPIError(f"外部知识库创建失败: {str(e)}")
# 6. 创建权限记录
try:
# 创建者权限
KBPermissionModel.objects.create(
knowledge_base=knowledge_base,
user=request.user,
can_read=True,
can_edit=True,
can_delete=True,
granted_by=request.user,
status='active'
)
logger.info(f"创建者权限创建成功")
# 根据类型批量创建其他用户权限
if type == 'admin':
users_query = User.objects.exclude(id=request.user.id)
elif type == 'secret':
users_query = User.objects.filter(role='admin').exclude(id=request.user.id)
elif type == 'leader':
users_query = User.objects.filter(
Q(role='admin') |
Q(role='leader', department=department)
).exclude(id=request.user.id)
elif type == 'member':
users_query = User.objects.filter(
Q(role='admin') |
Q(department=department, role='leader') |
Q(department=department, group=group, role='member')
).exclude(id=request.user.id)
else: # private
users_query = User.objects.none()
if users_query.exists():
permissions = [
KBPermissionModel(
knowledge_base=knowledge_base,
user=user,
can_read=True,
can_edit=self._can_edit(type, user),
can_delete=self._can_delete(type, user),
granted_by=request.user,
status='active'
) for user in users_query
]
KBPermissionModel.objects.bulk_create(permissions)
logger.info(f"{type}类型权限创建完成: {len(permissions)}条记录")
except Exception as e:
logger.error(f"权限创建失败: {str(e)}")
logger.error(traceback.format_exc())
raise
return Response({
'code': 200,
'message': '知识库创建成功',
'data': {
'knowledge_base': serializer.data,
'external_id': knowledge_base.external_id
}
})
except Exception as e:
logger.error(f"创建知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'创建知识库失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
2025-03-10 17:03:58 +08:00
def _can_edit(self, type, user, department=None, group=None, creator_id=None):
"""判断用户是否有编辑权限"""
2025-03-10 17:03:58 +08:00
if user.role == 'admin':
return True # 管理员对所有知识库都有编辑权限
if type == 'secret':
return False # 除管理员外其他人无权编辑secret类型知识库
if type == 'leader':
# 同部门组长可编辑leader知识库
return user.role == 'leader' and user.department == department
if type == 'member':
# 同部门组长可编辑所有member知识库
if user.role == 'leader' and user.department == department:
return True
# 成员只能编辑同部门同组的member知识库
return user.role == 'member' and user.department == department and user.group == group
if type == 'private':
# 私有知识库只有创建者可编辑
return str(user.id) == str(creator_id)
return False
2025-03-10 17:03:58 +08:00
def _can_delete(self, type, user, department=None, group=None, creator_id=None):
"""判断用户是否有删除权限"""
2025-03-10 17:03:58 +08:00
if user.role == 'admin':
return True # 管理员对所有知识库都有删除权限
if type in ['admin', 'secret']:
2025-03-10 17:03:58 +08:00
return False # 除管理员外其他人无权删除admin和secret类型知识库
if type == 'leader':
# 同部门组长可删除leader知识库
return user.role == 'leader' and user.department == department
if type == 'member':
# 只有组长可以删除成员知识库(组员不能删除)
return user.role == 'leader' and user.department == department
if type == 'private':
# 私有知识库只有创建者可删除
return str(user.id) == str(creator_id)
return False
def _can_read(self, type, user, department=None, group=None, creator_id=None):
"""判断用户是否有读取权限"""
if user.role == 'admin':
return True # 管理员可以读取所有知识库
if type == 'secret':
return False # 除管理员外secret类型知识库对其他人不可读
if type == 'leader':
# 同部门的leader和member都可以读取
return user.department == department
if type == 'member':
# 同部门组长可以读取所有member知识库
if user.role == 'leader' and user.department == department:
return True
# 成员可以读取同部门同组的member知识库
return user.role == 'member' and user.department == department and user.group == group
if type == 'private':
# 私有知识库只有创建者可读
return str(user.id) == str(creator_id)
return False
def _create_external_dataset(self, instance):
"""创建外部知识库"""
try:
api_data = {
"name": instance.name,
"desc": instance.desc,
"type": "0", # 添加必要的type字段
"meta": {}, # 添加必要的meta字段
"documents": [] # 初始化为空列表
}
response = requests.post(
f'{settings.API_BASE_URL}/api/dataset',
json=api_data,
headers={'Content-Type': 'application/json'},
timeout=30
)
if response.status_code != 200:
raise ExternalAPIError(f"创建失败,状态码: {response.status_code}, 响应: {response.text}")
api_response = response.json()
if not api_response.get('code') == 200:
raise ExternalAPIError(f"业务处理失败: {api_response.get('message', '未知错误')}")
dataset_id = api_response.get('data', {}).get('id')
if not dataset_id:
raise ExternalAPIError("响应数据中缺少dataset id")
return dataset_id
except requests.exceptions.Timeout:
raise ExternalAPIError("请求超时,请稍后重试")
except requests.exceptions.RequestException as e:
raise ExternalAPIError(f"API请求失败: {str(e)}")
except Exception as e:
raise ExternalAPIError(f"创建外部知识库失败: {str(e)}")
def _delete_external_dataset(self, external_id):
"""删除外部知识库"""
try:
if not external_id:
raise ExternalAPIError("外部知识库ID不能为空")
response = requests.delete(
f'{settings.API_BASE_URL}/api/dataset/{external_id}',
headers={'Content-Type': 'application/json'},
timeout=30
)
logger.info(f"删除外部知识库响应: status_code={response.status_code}, response={response.text}")
# 检查响应状态码
if response.status_code == 404:
logger.warning(f"外部知识库不存在: {external_id}")
return True # 如果知识库不存在,也视为删除成功
elif response.status_code not in [200, 204]:
raise ExternalAPIError(f"删除失败,状态码: {response.status_code}, 响应: {response.text}")
# 如果是 204 状态码,说明删除成功但无返回内容
if response.status_code == 204:
logger.info(f"外部知识库删除成功: {external_id}")
return True
# 如果是 200 状态码,检查响应内容
try:
api_response = response.json()
if api_response.get('code') != 200:
raise ExternalAPIError(f"业务处理失败: {api_response.get('message', '未知错误')}")
logger.info(f"外部知识库删除成功: {external_id}")
return True
except ValueError:
# 如果无法解析 JSON但状态码是 200也认为成功
logger.warning(f"外部知识库删除响应无法解析JSON但状态码为200视为成功: {external_id}")
return True
except requests.exceptions.Timeout:
logger.error(f"删除外部知识库超时: {external_id}")
raise ExternalAPIError("请求超时,请稍后重试")
except requests.exceptions.RequestException as e:
logger.error(f"删除外部知识库请求异常: {external_id}, error={str(e)}")
raise ExternalAPIError(f"API请求失败: {str(e)}")
except Exception as e:
logger.error(f"删除外部知识库其他错误: {external_id}, error={str(e)}")
raise ExternalAPIError(f"删除外部知识库失败: {str(e)}")
def update(self, request, *args, **kwargs):
"""更新知识库"""
try:
instance = self.get_object()
2025-03-10 17:03:58 +08:00
user = request.user
2025-03-10 17:03:58 +08:00
# 首先检查权限表中是否有显式权限
permission = KBPermissionModel.objects.filter(
knowledge_base=instance,
2025-03-10 17:03:58 +08:00
user=user,
can_edit=True,
status='active'
).first()
2025-03-10 17:03:58 +08:00
# 如果权限表中没有记录则使用_can_edit方法判断
has_permission = bool(permission)
if not has_permission:
# 根据知识库类型和用户角色判断权限
has_permission = self._can_edit(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
if not has_permission:
return Response({
"code": 403,
"message": "没有编辑权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
with transaction.atomic():
# 执行本地更新
serializer = self.get_serializer(instance, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
# 更新外部知识库
if instance.external_id:
try:
api_data = {
"name": serializer.validated_data.get('name', instance.name),
"desc": serializer.validated_data.get('desc', instance.desc),
"type": "0", # 保持与创建时一致
"meta": {}, # 保持与创建时一致
"documents": [] # 保持与创建时一致
}
response = requests.put(
f'{settings.API_BASE_URL}/api/dataset/{instance.external_id}',
json=api_data,
headers={'Content-Type': 'application/json'},
timeout=30
)
if response.status_code != 200:
raise ExternalAPIError(f"更新外部知识库失败,状态码: {response.status_code}, 响应: {response.text}")
api_response = response.json()
if not api_response.get('code') == 200:
raise ExternalAPIError(f"更新外部知识库失败: {api_response.get('message', '未知错误')}")
logger.info(f"外部知识库更新成功: {instance.external_id}")
except requests.exceptions.Timeout:
raise ExternalAPIError("请求超时,请稍后重试")
except requests.exceptions.RequestException as e:
raise ExternalAPIError(f"API请求失败: {str(e)}")
except Exception as e:
raise ExternalAPIError(f"更新外部知识库失败: {str(e)}")
return Response({
"code": 200,
"message": "知识库更新成功",
"data": serializer.data
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except ExternalAPIError as e:
logger.error(f"更新外部知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": str(e),
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
logger.error(f"更新知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"更新知识库失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def destroy(self, request, *args, **kwargs):
"""删除知识库"""
try:
instance = self.get_object()
2025-03-10 17:03:58 +08:00
user = request.user
# 检查删除权限
permission = KBPermissionModel.objects.filter(
knowledge_base=instance,
2025-03-10 17:03:58 +08:00
user=user,
can_delete=True,
status='active'
).first()
2025-03-10 17:03:58 +08:00
# 如果权限表中没有记录则使用_can_delete方法判断
has_permission = bool(permission)
if not has_permission:
# 根据知识库类型和用户角色判断权限
has_permission = self._can_delete(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
if not has_permission:
return Response({
"code": 403,
"message": "没有删除权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
with transaction.atomic():
# 删除外部知识库
if instance.external_id:
try:
self._delete_external_dataset(instance.external_id)
logger.info(f"外部知识库删除成功: {instance.external_id}")
except ExternalAPIError as e:
logger.error(f"删除外部知识库失败: {str(e)}")
return Response({
"code": 500,
"message": f"删除外部知识库失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# 删除本地知识库
self.perform_destroy(instance)
logger.info(f"本地知识库删除成功: id={instance.id}, name={instance.name}")
return Response({
"code": 200,
"message": "知识库删除成功",
"data": None
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"删除知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"删除知识库失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['get'])
def permissions(self, request, pk=None):
"""获取用户对特定知识库的权限"""
try:
instance = self.get_object()
2025-03-10 17:03:58 +08:00
user = request.user
2025-03-10 17:03:58 +08:00
# 构建默认权限数据
permissions_data = {
"can_read": False,
"can_edit": False,
"can_delete": False
}
2025-03-10 17:03:58 +08:00
# 从权限表获取权限信息
permission = KBPermissionModel.objects.filter(
knowledge_base=instance,
user=user,
status='active'
).first()
2025-03-10 17:03:58 +08:00
# 如果在权限表中有记录,则使用表中的权限
if permission:
permissions_data.update({
"can_read": permission.can_read,
"can_edit": permission.can_edit,
"can_delete": permission.can_delete
})
2025-03-10 17:03:58 +08:00
# 否则使用_can_read/_can_edit/_can_delete方法判断权限
else:
permissions_data.update({
"can_read": self._can_read(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
"can_edit": self._can_edit(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
"can_delete": self._can_delete(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
})
return Response({
"code": 200,
"message": "获取权限信息成功",
"data": {
"knowledge_base_id": instance.id,
"knowledge_base_name": instance.name,
"permissions": permissions_data
}
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"获取权限信息失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"获取权限信息失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def summary(self, request):
"""获取所有可见知识库的概要信息除了secret类型"""
try:
user = request.user
# 基础查询排除secret类型的知识库
queryset = KnowledgeBase.objects.exclude(type='secret')
# 获取用户所有有效的知识库权限
active_permissions = KBPermissionModel.objects.filter(
user=user,
status='active',
expires_at__gt=timezone.now()
).select_related('knowledge_base')
# 创建权限映射字典
permission_map = {
str(perm.knowledge_base.id): {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
}
for perm in active_permissions
}
summaries = []
for kb in queryset:
# 获取基础权限
permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 检查知识库特定权限
kb_id = str(kb.id)
if kb_id in permission_map:
permissions.update(permission_map[kb_id])
# 如果没有特定权限,根据角色和部门设置默认权限
else:
if user.role == 'admin':
permissions.update({
'can_read': True,
'can_edit': True,
'can_delete': True
})
elif kb.type == 'leader':
if user.role == 'leader' and user.department == kb.department:
permissions.update({
'can_read': True,
'can_edit': True,
'can_delete': True
})
elif user.role == 'member' and user.department == kb.department:
permissions.update({
'can_read': True,
'can_edit': False,
'can_delete': False
})
elif kb.type == 'member':
if user.role == 'leader' and user.department == kb.department:
permissions.update({
'can_read': True,
'can_edit': True,
'can_delete': True
})
elif user.role == 'member' and user.department == kb.department:
permissions.update({
'can_read': True,
'can_edit': False,
'can_delete': False
})
elif kb.type == 'private' and str(kb.user_id) == str(user.id):
permissions.update({
'can_read': True,
'can_edit': True,
'can_delete': True
})
# 只返回概要信息
summary = {
'id': str(kb.id),
'name': kb.name,
'desc': kb.desc,
'type': kb.type,
'department': kb.department,
'permissions': permissions
}
summaries.append(summary)
return Response({
'code': 200,
'message': '获取知识库概要信息成功',
'data': summaries
})
except Exception as e:
return Response({
'code': 500,
'message': f'获取知识库概要信息失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
2025-03-10 17:03:58 +08:00
def retrieve(self, request, *args, **kwargs):
try:
# 获取知识库对象
instance = self.get_object()
serializer = self.get_serializer(instance)
data = serializer.data
# 获取用户
user = request.user
# 默认权限设置
permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 尝试获取特定权限记录
try:
permission_record = KBPermissionModel.objects.filter(
knowledge_base=instance,
user=user,
status='active',
expires_at__gt=timezone.now()
).first()
if permission_record:
permissions.update({
'can_read': permission_record.can_read,
'can_edit': permission_record.can_edit,
'can_delete': permission_record.can_delete
})
else:
# 使用_can_read/_can_edit/_can_delete方法判断权限
permissions.update({
'can_read': self._can_read(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
'can_edit': self._can_edit(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
),
'can_delete': self._can_delete(
type=instance.type,
user=user,
department=instance.department,
group=instance.group,
creator_id=instance.user_id
)
})
except Exception as e:
logger.error(f"获取权限信息失败: {str(e)}")
# 权限获取失败时使用默认权限
# 添加权限信息到返回数据
data['permissions'] = permissions
return Response({
'code': 200,
'message': '获取知识库详情成功',
'data': data
})
except Exception as e:
logger.error(f"获取知识库详情失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取知识库详情失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def search(self, request):
"""搜索知识库功能"""
try:
# 获取搜索关键字
keyword = request.query_params.get('keyword', '')
if not keyword:
return Response({
"code": 400,
"message": "搜索关键字不能为空",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取分页参数
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
except ValueError:
page = 1
page_size = 10
# 构建搜索条件
query = Q(name__icontains=keyword) | \
Q(desc__icontains=keyword) | \
Q(department__icontains=keyword) | \
Q(group__icontains=keyword)
# 排除 secret 类型的知识库
queryset = KnowledgeBase.objects.filter(query).exclude(type='secret')
# 获取用户
user = request.user
# 获取用户所有有效的知识库权限
active_permissions = KBPermissionModel.objects.filter(
user=user,
status='active',
expires_at__gt=timezone.now()
).select_related('knowledge_base')
# 创建权限映射字典
permission_map = {
str(perm.knowledge_base.id): {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
}
for perm in active_permissions
}
# 计算总数量
total = queryset.count()
# 分页处理
start = (page - 1) * page_size
end = start + page_size
paginated_queryset = queryset[start:end]
# 序列化知识库数据
serializer = self.get_serializer(paginated_queryset, many=True)
data = serializer.data
# 处理每个知识库项的权限和返回内容
result_items = []
for item in data:
kb_id = item['id']
kb_permissions = {
'can_read': False,
'can_edit': False,
'can_delete': False
}
# 检查知识库特定权限
if kb_id in permission_map:
kb_permissions.update(permission_map[kb_id])
# 如果没有特定权限使用_can方法判断默认权限
else:
kb_type = item['type']
department = item.get('department')
group = item.get('group')
creator_id = item.get('user_id')
kb_permissions.update({
'can_read': self._can_read(kb_type, user, department, group, creator_id),
'can_edit': self._can_edit(kb_type, user, department, group, creator_id),
'can_delete': self._can_delete(kb_type, user, department, group, creator_id)
})
# 添加权限信息
item['permissions'] = kb_permissions
# 根据权限返回不同级别的信息
if kb_permissions['can_read']:
# 有读取权限,返回完整信息
result_items.append(item)
else:
# 无读取权限,只返回概要信息
summary_info = {
'id': item['id'],
'name': item['name'],
'type': item['type'],
'department': item.get('department'),
'permissions': kb_permissions
}
result_items.append(summary_info)
# 高亮搜索关键字
for item in result_items:
if 'name' in item and keyword.lower() in item['name'].lower():
highlighted = item['name'].replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_name'] = highlighted
# 确保desc不为None并且是字符串
if 'desc' in item and item.get('desc') is not None:
desc_text = str(item['desc']) # 转换为字符串以确保安全
if keyword.lower() in desc_text.lower():
highlighted = desc_text.replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
item['highlighted_desc'] = highlighted
return Response({
"code": 200,
"message": "搜索知识库成功",
"data": {
"total": total,
"page": page,
"page_size": page_size,
"keyword": keyword,
"items": result_items
}
})
except Exception as e:
logger.error(f"搜索知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"搜索知识库失败: {str(e)}",
"data": None
2025-03-13 13:39:02 +08:00
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['post'])
def change_type(self, request, pk=None):
"""修改知识库类型"""
try:
instance = self.get_object()
user = request.user
# 判断角色和权限
is_creator = str(user.id) == str(instance.user_id)
is_admin = user.role == 'admin'
is_leader = user.role == 'leader'
is_member = user.role == 'member' or user.role == 'user' # 组员或普通用户
# 组员无权修改知识库类型
if is_member and not (is_admin or is_leader):
return Response({
"code": 403,
"message": "组员无权修改知识库类型只能使用private类型",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 权限检查
if not is_creator:
# 非创建者无法修改知识库类型
return Response({
"code": 403,
"message": "只有知识库创建者可以修改知识库类型",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 获取新类型
new_type = request.data.get('type')
if not new_type:
return Response({
"code": 400,
"message": "新类型不能为空",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证类型是否有效
valid_types = ['private', 'admin', 'secret', 'leader', 'member']
if new_type not in valid_types:
return Response({
"code": 400,
"message": f"无效的知识库类型,可选值: {', '.join(valid_types)}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 角色特定的类型限制
if is_leader and not is_admin: # 组长且不是管理员
# 组长只能在private和member类型之间切换
if new_type not in ['private', 'member']:
return Response({
"code": 403,
"message": "组长只能将知识库设置为private或member类型",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 处理department和group字段
department = request.data.get('department')
group = request.data.get('group')
# 组长只能设置自己部门
if is_leader and not is_admin and new_type == 'member':
if department and department != user.department:
return Response({
"code": 403,
"message": "组长只能为本部门设置知识库",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 如果未指定部门,强制设置为组长的部门
department = user.department
# 根据类型验证必填字段
if new_type == 'leader':
if not department:
return Response({
"code": 400,
"message": "组长级知识库必须指定部门",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
if new_type == 'member':
if not department:
return Response({
"code": 400,
"message": "成员级知识库必须指定部门",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
if not group:
return Response({
"code": 400,
"message": "成员级知识库必须指定组",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 如果是admin或secret类型清除department和group
if new_type in ['admin', 'secret']:
department = None
group = None
# 如果是private类型但未指定department和group使用原值
if new_type == 'private':
if department is None:
department = instance.department
if group is None:
group = instance.group
# 更新知识库类型和相关字段
instance.type = new_type
instance.department = department
instance.group = group
instance.save()
return Response({
"code": 200,
"message": f"知识库类型已更新为{new_type}",
"data": {
"id": instance.id,
"name": instance.name,
"type": instance.type,
"department": instance.department,
"group": instance.group
}
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"修改知识库类型失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"修改知识库类型失败: {str(e)}",
"data": None
2025-03-10 17:03:58 +08:00
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class PermissionViewSet(viewsets.ModelViewSet):
serializer_class = PermissionSerializer
permission_classes = [IsAuthenticated]
def can_manage_knowledge_base(self, user, knowledge_base):
"""检查用户是否是知识库的创建者"""
return str(knowledge_base.user_id) == str(user.id)
def get_queryset(self):
"""
获取权限申请列表:
2025-03-26 12:26:20 +08:00
1. applicant_id 是当前用户 (看到自己发起的申请)
2. approver_id 是当前用户 (看到自己需要审批的申请)
"""
2025-03-26 12:26:20 +08:00
user_id = str(self.request.user.id)
2025-03-26 12:26:20 +08:00
# 构建查询条件:申请人是自己 或 审批人是自己
query = Q(applicant_id=user_id) | Q(approver_id=user_id)
2025-03-26 12:26:20 +08:00
return Permission.objects.filter(query).select_related(
'knowledge_base',
'applicant',
'approver'
)
2025-03-26 12:26:20 +08:00
def list(self, request, *args, **kwargs):
"""获取权限申请列表,包含详细信息"""
try:
queryset = self.get_queryset()
user_id = str(request.user.id)
# 获取分页参数
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
# 计算总数
total = queryset.count()
# 手动分页
start = (page - 1) * page_size
end = start + page_size
permissions = queryset[start:end]
# 构建响应数据
data = []
for permission in permissions:
# 检查当前用户是否是申请人或审批人
if user_id not in [str(permission.applicant_id), str(permission.approver_id)]:
continue
# 构建响应数据
permission_data = {
'id': str(permission.id),
'knowledge_base': {
'id': str(permission.knowledge_base.id),
'name': permission.knowledge_base.name,
'type': permission.knowledge_base.type,
},
'applicant': {
'id': str(permission.applicant.id),
'username': permission.applicant.username,
'name': permission.applicant.name,
'department': permission.applicant.department,
},
'approver': {
'id': str(permission.approver.id) if permission.approver else '',
'username': permission.approver.username if permission.approver else '',
'name': permission.approver.name if permission.approver else '',
'department': permission.approver.department if permission.approver else '',
},
'permissions': permission.permissions,
'status': permission.status,
'created_at': permission.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'expires_at': permission.expires_at.strftime('%Y-%m-%d %H:%M:%S') if permission.expires_at else None,
'response_message': permission.response_message or '',
# 添加角色标识,用于前端展示
'role': 'applicant' if str(permission.applicant_id) == user_id else 'approver'
}
data.append(permission_data)
return Response({
'code': 200,
'message': '获取权限申请列表成功',
'data': {
'total': len(data), # 使用过滤后的实际数量
'page': page,
'page_size': page_size,
'results': data
}
})
except Exception as e:
logger.error(f"获取权限申请列表失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取权限申请列表失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def perform_create(self, serializer):
"""创建权限申请并发送通知给知识库创建者"""
# 获取知识库
2025-03-26 12:26:20 +08:00
# 获取知识库
knowledge_base = serializer.validated_data['knowledge_base']
2025-03-26 12:26:20 +08:00
# 检查是否是申请访问自己的知识库
if str(knowledge_base.user_id) == str(self.request.user.id):
raise ValidationError({
"code": 400,
"message": "您是此知识库的创建者,无需申请权限",
"data": None
})
# 获取知识库创建者作为审批者
approver = User.objects.get(id=knowledge_base.user_id)
# 验证权限请求
requested_permissions = serializer.validated_data.get('permissions', {})
expires_at = serializer.validated_data.get('expires_at')
if not any([requested_permissions.get('can_read'),
requested_permissions.get('can_edit'),
requested_permissions.get('can_delete')]):
raise ValidationError("至少需要申请一种权限(读/改/删)")
if not expires_at:
raise ValidationError("请指定权限到期时间")
# 检查是否已有未过期的权限申请
existing_request = Permission.objects.filter(
knowledge_base=knowledge_base,
applicant=self.request.user,
status='pending'
).first()
if existing_request:
raise ValidationError("您已有一个待处理的权限申请")
# 检查是否已有有效的权限
existing_permission = Permission.objects.filter(
knowledge_base=knowledge_base,
applicant=self.request.user,
status='approved',
expires_at__gt=timezone.now()
).first()
if existing_permission:
raise ValidationError("您已有此知识库的访问权限")
2025-03-26 12:26:20 +08:00
# 保存权限申请,设置审批者
permission = serializer.save(
applicant=self.request.user,
2025-03-26 12:26:20 +08:00
status='pending',
approver=approver # 创建时就设置审批者
)
# 获取权限类型字符串
permission_types = []
if requested_permissions.get('can_read'):
permission_types.append('读取')
if requested_permissions.get('can_edit'):
permission_types.append('编辑')
if requested_permissions.get('can_delete'):
permission_types.append('删除')
permission_str = ''.join(permission_types)
# 发送通知给知识库创建者
owner = User.objects.get(id=knowledge_base.user_id)
self.send_notification(
user=owner,
title="新的权限申请",
content=f"用户 {self.request.user.name} 申请了知识库 '{knowledge_base.name}'{permission_str}权限",
notification_type="permission_request",
related_object_id=permission.id
)
def send_notification(self, user, title, content, notification_type, related_object_id):
"""发送通知"""
try:
notification = Notification.objects.create(
sender=self.request.user,
receiver=user,
title=title,
content=content,
type=notification_type,
related_resource=related_object_id,
)
# 通过WebSocket发送实时通知
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"notification_user_{user.id}",
{
"type": "notification",
"data": {
"id": str(notification.id),
"title": notification.title,
"content": notification.content,
"type": notification.type,
"created_at": notification.created_at.isoformat(),
"sender": {
"id": str(notification.sender.id),
"name": notification.sender.name
}
}
}
)
except Exception as e:
logger.error(f"发送通知时发生错误: {str(e)}")
@action(detail=True, methods=['post'])
def approve(self, request, pk=None):
try:
# 获取权限申请记录
permission = self.get_object()
# 只检查是否是知识库创建者
if not self.can_manage_knowledge_base(request.user, permission.knowledge_base):
logger.warning(f"用户 {request.user.username} 尝试审批知识库 {permission.knowledge_base.name} 的权限申请,但不是创建者")
return Response({
'code': 403,
'message': '只有知识库创建者可以审批此申请',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 获取审批意见
response_message = request.data.get('response_message', '')
with transaction.atomic():
# 更新权限申请状态
permission.status = 'approved'
permission.approver = request.user
permission.response_message = response_message
permission.save()
# 检查是否已存在权限记录
kb_permission = KBPermissionModel.objects.filter(
knowledge_base=permission.knowledge_base,
user=permission.applicant
).first()
if kb_permission:
# 更新现有权限
kb_permission.can_read = permission.permissions.get('can_read', False)
kb_permission.can_edit = permission.permissions.get('can_edit', False)
kb_permission.can_delete = permission.permissions.get('can_delete', False)
kb_permission.granted_by = request.user
kb_permission.status = 'active'
kb_permission.expires_at = permission.expires_at
kb_permission.save()
logger.info(f"更新知识库权限记录: {kb_permission.id}")
else:
# 创建新的权限记录
kb_permission = KBPermissionModel.objects.create(
knowledge_base=permission.knowledge_base,
user=permission.applicant,
can_read=permission.permissions.get('can_read', False),
can_edit=permission.permissions.get('can_edit', False),
can_delete=permission.permissions.get('can_delete', False),
granted_by=request.user,
status='active',
expires_at=permission.expires_at
)
logger.info(f"创建新的知识库权限记录: {kb_permission.id}")
# 发送通知给申请人
self.send_notification(
user=permission.applicant,
title="权限申请已通过",
content=f"您对知识库 '{permission.knowledge_base.name}' 的权限申请已通过",
notification_type="permission_approved",
related_object_id=permission.id
)
return Response({
'code': 200,
'message': '权限申请已批准',
'data': None
})
except Permission.DoesNotExist:
return Response({
'code': 404,
'message': '权限申请不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"处理权限申请失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'处理权限申请失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['post'])
def reject(self, request, pk=None):
"""拒绝权限申请"""
permission = self.get_object()
# 检查是否是知识库创建者
if str(permission.knowledge_base.user_id) != str(request.user.id):
return Response({
'code': 403,
'message': '只有知识库创建者可以审批此申请',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 检查申请是否已被处理
if permission.status != 'pending':
return Response({
'code': 400,
'message': '该申请已被处理',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证拒绝原因
response_message = request.data.get('response_message')
if not response_message:
return Response({
'code': 400,
'message': '请填写拒绝原因',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 更新权限状态
permission.status = 'rejected'
permission.approver = request.user
permission.response_message = response_message
permission.save()
# 发送通知给申请人
self.send_notification(
user=permission.applicant,
title="权限申请已拒绝",
content=f"您对知识库 '{permission.knowledge_base.name}' 的权限申请已被拒绝\n"
f"拒绝原因:{response_message}",
notification_type="permission_rejected",
related_object_id=permission.id
)
return Response({
'code': 200,
'message': '权限申请已拒绝',
'data': PermissionSerializer(permission).data
})
@action(detail=True, methods=['post'])
def extend(self, request, pk=None):
"""延长权限有效期"""
instance = self.get_object()
user = request.user
# 检查是否有权限延长
if not self.check_extend_permission(instance, user):
return Response({
"code": 403,
"message": "您没有权限延长此权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
new_expires_at = request.data.get('expires_at')
if not new_expires_at:
return Response({
"code": 400,
"message": "请设置新的过期时间",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
try:
with transaction.atomic():
# 更新权限申请表的过期时间
instance.expires_at = new_expires_at
instance.save()
# 同步更新知识库权限表的过期时间
kb_permission = KBPermissionModel.objects.get(
knowledge_base=instance.knowledge_base,
user=instance.applicant
)
kb_permission.expires_at = new_expires_at
kb_permission.save()
return Response({
"code": 200,
"message": "权限有效期延长成功",
"data": PermissionSerializer(instance).data
})
except Exception as e:
return Response({
"code": 500,
"message": f"延长权限有效期失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def check_extend_permission(self, permission, user):
"""检查是否有权限延长权限有效期"""
knowledge_base = permission.knowledge_base
# 私人知识库只有拥有者能延长
if knowledge_base.type == 'private':
return knowledge_base.owner == user
# 组长知识库只有管理员能延长
if knowledge_base.type == 'leader':
return user.role == 'admin'
# 组员知识库可以由管理员或本部门组长延长
if knowledge_base.type == 'member':
return (
user.role == 'admin' or
(user.role == 'leader' and user.department == knowledge_base.department)
)
return False
2025-03-26 12:26:20 +08:00
@action(detail=False, methods=['get'])
def user_permissions(self, request):
"""获取指定用户的所有知识库权限"""
try:
# 获取用户名参数
username = request.query_params.get('username')
if not username:
return Response({
'code': 400,
'message': '请提供用户名',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取用户
try:
target_user = User.objects.get(username=username)
except User.DoesNotExist:
return Response({
'code': 404,
'message': f'用户 {username} 不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 获取该用户的所有权限记录
permissions = KBPermissionModel.objects.filter(
user=target_user,
status='active'
).select_related('knowledge_base', 'granted_by')
# 构建响应数据
permissions_data = []
for perm in permissions:
perm_data = {
'id': str(perm.id),
'knowledge_base': {
'id': str(perm.knowledge_base.id),
'name': perm.knowledge_base.name,
'type': perm.knowledge_base.type,
'department': perm.knowledge_base.department,
'group': perm.knowledge_base.group
},
'permissions': {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
},
'granted_by': {
'id': str(perm.granted_by.id) if perm.granted_by else None,
'username': perm.granted_by.username if perm.granted_by else None,
'name': perm.granted_by.name if perm.granted_by else None
},
'created_at': perm.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'expires_at': perm.expires_at.strftime('%Y-%m-%d %H:%M:%S') if perm.expires_at else None,
'status': perm.status
}
permissions_data.append(perm_data)
return Response({
'code': 200,
'message': '获取用户权限成功',
'data': {
'user': {
'id': str(target_user.id),
'username': target_user.username,
'name': target_user.name,
'department': target_user.department,
'role': target_user.role
},
'permissions': permissions_data
}
})
except Exception as e:
logger.error(f"获取用户权限失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取用户权限失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def all_permissions(self, request):
"""管理员获取所有用户的知识库权限(不包括私有知识库)"""
try:
# 检查是否是管理员
if request.user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以查看所有权限',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 获取查询参数
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
status_filter = request.query_params.get('status')
department = request.query_params.get('department')
kb_type = request.query_params.get('kb_type')
# 构建基础查询
queryset = KBPermissionModel.objects.filter(
~Q(knowledge_base__type='private')
).select_related(
'user',
'knowledge_base',
'granted_by'
)
# 应用过滤条件
if status_filter == 'active':
queryset = queryset.filter(
Q(expires_at__gt=timezone.now()) | Q(expires_at__isnull=True),
status='active'
)
elif status_filter == 'expired':
queryset = queryset.filter(
Q(expires_at__lte=timezone.now()) | Q(status='inactive')
)
if department:
queryset = queryset.filter(user__department=department)
if kb_type:
queryset = queryset.filter(knowledge_base__type=kb_type)
# 按用户分组处理数据
user_permissions = {}
for perm in queryset:
user_id = str(perm.user.id)
if user_id not in user_permissions:
user_permissions[user_id] = {
'user_info': {
'id': user_id,
'username': perm.user.username,
'name': getattr(perm.user, 'name', perm.user.username),
'department': getattr(perm.user, 'department', None),
'role': getattr(perm.user, 'role', None)
},
'permissions': [],
'stats': {
'total': 0,
'by_type': {
'admin': 0,
'secret': 0,
'leader': 0,
'member': 0
},
'by_permission': {
'read_only': 0,
'read_write': 0,
'full_access': 0
}
}
}
# 添加权限信息
perm_data = {
'id': str(perm.id),
'knowledge_base': {
'id': str(perm.knowledge_base.id),
'name': perm.knowledge_base.name,
'type': perm.knowledge_base.type,
'department': perm.knowledge_base.department,
'group': perm.knowledge_base.group,
'creator': {
'id': str(perm.knowledge_base.user_id),
'name': getattr(User.objects.filter(id=perm.knowledge_base.user_id).first(), 'name', None),
'username': getattr(User.objects.filter(id=perm.knowledge_base.user_id).first(), 'username', None)
}
},
'permissions': {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
},
'granted_by': {
'id': str(perm.granted_by.id) if perm.granted_by else None,
'username': perm.granted_by.username if perm.granted_by else None,
'name': getattr(perm.granted_by, 'name', None) if perm.granted_by else None
},
'granted_at': perm.granted_at.strftime('%Y-%m-%d %H:%M:%S'),
'expires_at': perm.expires_at.strftime('%Y-%m-%d %H:%M:%S') if perm.expires_at else None,
'status': perm.status
}
user_permissions[user_id]['permissions'].append(perm_data)
# 更新统计信息
stats = user_permissions[user_id]['stats']
stats['total'] += 1
stats['by_type'][perm.knowledge_base.type] += 1
# 统计权限级别
if perm.can_delete:
stats['by_permission']['full_access'] += 1
elif perm.can_edit:
stats['by_permission']['read_write'] += 1
elif perm.can_read:
stats['by_permission']['read_only'] += 1
# 转换为列表并分页
users_list = list(user_permissions.values())
total = len(users_list)
start = (page - 1) * page_size
end = start + page_size
paginated_users = users_list[start:end]
return Response({
'code': 200,
'message': '获取权限列表成功',
'data': {
'total': total,
'page': page,
'page_size': page_size,
'results': paginated_users
}
})
except Exception as e:
logger.error(f"获取所有权限失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取所有权限失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['post'])
def update_permission(self, request):
"""管理员更新用户的知识库权限"""
try:
# 检查是否是管理员
if request.user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以直接修改权限',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 验证必要参数
user_id = request.data.get('user_id')
knowledge_base_id = request.data.get('knowledge_base_id')
permissions = request.data.get('permissions')
expires_at_str = request.data.get('expires_at')
if not all([user_id, knowledge_base_id, permissions]):
return Response({
'code': 400,
'message': '缺少必要参数',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证权限参数格式
required_permission_fields = ['can_read', 'can_edit', 'can_delete']
if not all(field in permissions for field in required_permission_fields):
return Response({
'code': 400,
'message': '权限参数格式错误,必须包含 can_read、can_edit、can_delete',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 获取用户和知识库
try:
user = User.objects.get(id=user_id)
knowledge_base = KnowledgeBase.objects.get(id=knowledge_base_id)
except User.DoesNotExist:
return Response({
'code': 404,
'message': f'用户ID {user_id} 不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
except KnowledgeBase.DoesNotExist:
return Response({
'code': 404,
'message': f'知识库ID {knowledge_base_id} 不存在',
'data': None
}, status=status.HTTP_404_NOT_FOUND)
# 检查知识库类型和用户角色的匹配
if knowledge_base.type == 'private' and str(knowledge_base.user_id) != str(user.id):
return Response({
'code': 403,
'message': '不能修改其他用户的私有知识库权限',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
# 处理过期时间
expires_at = None
if expires_at_str:
try:
# 将字符串转换为datetime对象
expires_at = timezone.datetime.strptime(
expires_at_str,
'%Y-%m-%dT%H:%M:%SZ'
)
# 确保时区感知
expires_at = timezone.make_aware(expires_at)
# 检查是否早于当前时间
if expires_at <= timezone.now():
return Response({
'code': 400,
'message': '过期时间不能早于或等于当前时间',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
except ValueError:
return Response({
'code': 400,
'message': '过期时间格式错误,应为 ISO 格式 (YYYY-MM-DDThh:mm:ssZ)',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 根据用户角色限制权限
if user.role == 'member' and permissions.get('can_delete'):
return Response({
'code': 400,
'message': '普通成员不能获得删除权限',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
# 更新或创建权限记录
try:
with transaction.atomic():
permission, created = KBPermissionModel.objects.update_or_create(
user=user,
knowledge_base=knowledge_base,
defaults={
'can_read': permissions.get('can_read', False),
'can_edit': permissions.get('can_edit', False),
'can_delete': permissions.get('can_delete', False),
'granted_by': request.user,
'status': 'active',
'expires_at': expires_at
}
)
# 发送通知给用户
self.send_notification(
user=user,
title="知识库权限更新",
content=f"管理员已{created and '授予' or '更新'}您对知识库 '{knowledge_base.name}' 的权限",
notification_type="permission_updated",
related_object_id=permission.id
)
except IntegrityError as e:
return Response({
'code': 500,
'message': f'数据库操作失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({
'code': 200,
'message': f"{'创建' if created else '更新'}权限成功",
'data': {
'id': str(permission.id),
'user': {
'id': str(user.id),
'username': user.username,
'name': user.name,
'department': user.department,
'role': user.role
},
'knowledge_base': {
'id': str(knowledge_base.id),
'name': knowledge_base.name,
'type': knowledge_base.type,
'department': knowledge_base.department,
'group': knowledge_base.group
},
'permissions': {
'can_read': permission.can_read,
'can_edit': permission.can_edit,
'can_delete': permission.can_delete
},
'granted_by': {
'id': str(request.user.id),
'username': request.user.username,
'name': request.user.name
},
'expires_at': permission.expires_at.strftime('%Y-%m-%d %H:%M:%S') if permission.expires_at else None,
'created': created
}
})
except Exception as e:
logger.error(f"更新权限失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'更新权限失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class NotificationViewSet(viewsets.ModelViewSet):
"""通知视图集"""
queryset = Notification.objects.all()
serializer_class = NotificationSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""只返回用户自己的通知"""
return Notification.objects.filter(receiver=self.request.user)
@action(detail=True, methods=['post'])
def mark_as_read(self, request, pk=None):
"""标记通知为已读"""
notification = self.get_object()
notification.is_read = True
notification.save()
return Response({'status': 'marked as read'})
@action(detail=False, methods=['post'])
def mark_all_as_read(self, request):
"""标记所有通知为已读"""
self.get_queryset().update(is_read=True)
return Response({'status': 'all marked as read'})
@action(detail=False, methods=['get'])
def unread_count(self, request):
"""获取未读通知数量"""
count = self.get_queryset().filter(is_read=False).count()
return Response({'unread_count': count})
@action(detail=False, methods=['get'])
def latest(self, request):
"""获取最新通知"""
notifications = self.get_queryset().filter(
is_read=False
).order_by('-created_at')[:5]
serializer = self.get_serializer(notifications, many=True)
return Response(serializer.data)
def perform_create(self, serializer):
"""创建通知时自动设置发送者"""
serializer.save(sender=self.request.user)
@method_decorator(csrf_exempt, name='dispatch')
class LoginView(APIView):
"""用户登录视图"""
authentication_classes = [] # 清空认证类
permission_classes = [AllowAny]
def post(self, request):
try:
username = request.data.get('username')
password = request.data.get('password')
# 参数验证
if not username or not password:
return Response({
"code": 400,
"message": "请提供用户名和密码",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证用户
user = authenticate(request, username=username, password=password)
if user is not None:
# 获取或创建token
token, _ = Token.objects.get_or_create(user=user)
return Response({
"code": 200,
"message": "登录成功",
"data": {
"id": str(user.id),
"username": user.username,
"email": user.email,
"role": user.role,
"department": user.department,
"name": user.name,
"group": user.group,
"token": token.key
}
})
else:
return Response({
"code": 401,
"message": "用户名或密码错误",
"data": None
}, status=status.HTTP_401_UNAUTHORIZED)
except Exception as e:
import traceback
logger.error(f"登录失败: {str(e)}")
logger.error(f"错误类型: {type(e)}")
logger.error(f"错误堆栈: {traceback.format_exc()}")
return Response({
"code": 500,
"message": "登录失败,请稍后重试",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@method_decorator(csrf_exempt, name='dispatch')
class RegisterView(APIView):
"""用户注册视图"""
permission_classes = [AllowAny]
def post(self, request):
try:
data = request.data
# 检查必填字段
required_fields = ['username', 'password', 'email', 'role', 'department', 'name']
for field in required_fields:
if not data.get(field):
return Response({
"code": 400,
"message": f"缺少必填字段: {field}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证角色
valid_roles = ['admin', 'leader', 'member']
roles_str = ', '.join(valid_roles) # 先构造角色字符串
if data['role'] not in valid_roles:
return Response({
"code": 400,
"message": f"无效的角色,必须是: {roles_str}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证部门是否存在
if data['department'] not in settings.DEPARTMENT_GROUPS:
return Response({
"code": 400,
"message": f"无效的部门,可选部门: {', '.join(settings.DEPARTMENT_GROUPS.keys())}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 如果是组员,验证小组
if data['role'] == 'member':
if not data.get('group'):
return Response({
"code": 400,
"message": "组员必须指定所属小组",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证小组是否存在且属于指定部门
valid_groups = settings.DEPARTMENT_GROUPS.get(data['department'], [])
if data['group'] not in valid_groups:
return Response({
"code": 400,
"message": f"无效的小组,{data['department']}的可选小组: {', '.join(valid_groups)}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 检查用户名是否已存在
if User.objects.filter(username=data['username']).exists():
return Response({
"code": 400,
"message": "用户名已存在",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 检查邮箱是否已存在
if User.objects.filter(email=data['email']).exists():
return Response({
"code": 400,
"message": "邮箱已被注册",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证密码强度
if len(data['password']) < 8:
return Response({
"code": 400,
"message": "密码长度必须至少为8位",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证邮箱格式
try:
validate_email(data['email'])
except ValidationError:
return Response({
"code": 400,
"message": "邮箱格式不正确",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 创建用户
user = User.objects.create_user(
username=data['username'],
email=data['email'],
password=data['password'],
role=data['role'],
department=data['department'],
name=data['name'],
group=data.get('group') if data['role'] == 'member' else None,
is_staff=False,
is_superuser=False
)
# 生成认证令牌
token, _ = Token.objects.get_or_create(user=user)
return Response({
"code": 200,
"message": "注册成功",
"data": {
"id": user.id,
"username": user.username,
"email": user.email,
"role": user.role,
"department": user.department,
"name": user.name,
"group": user.group,
"token": token.key,
"created_at": user.date_joined.strftime('%Y-%m-%d %H:%M:%S')
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
print(f"注册失败: {str(e)}")
print(f"错误类型: {type(e)}")
print(f"错误堆栈: {traceback.format_exc()}")
return Response({
"code": 500,
"message": f"注册失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@method_decorator(csrf_exempt, name='dispatch')
class LogoutView(APIView):
"""用户登出视图"""
permission_classes = [IsAuthenticated]
def post(self, request):
try:
# 删除用户的token
request.user.auth_token.delete()
# 执行django的登出
logout(request)
return Response({
"code": 200,
"message": "登出成功",
"data": None
})
except Exception as e:
return Response({
"code": 500,
"message": f"登出失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET', 'PUT'])
@permission_classes([IsAuthenticated])
def user_profile(request):
"""获取或更新用户信息"""
if request.method == 'GET':
data = {
'id': request.user.id,
'username': request.user.username,
'email': request.user.email,
'role': request.user.role,
'department': request.user.department,
'phone': request.user.phone,
'date_joined': request.user.date_joined
}
return Response(data)
elif request.method == 'PUT':
user = request.user
# 只允许更新特定字段
allowed_fields = ['email', 'phone', 'department']
for field in allowed_fields:
if field in request.data:
setattr(user, field, request.data[field])
user.save()
return Response({'message': '用户信息更新成功'})
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def change_password(request):
"""修改密码"""
try:
old_password = request.data.get('old_password')
new_password = request.data.get('new_password')
# 验证参数
if not old_password or not new_password:
return Response({
"code": 400,
"message": "请提供旧密码和新密码",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证旧密码
user = request.user
if not user.check_password(old_password):
return Response({
"code": 400,
"message": "旧密码错误",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证新密码长度
if len(new_password) < 8:
return Response({
"code": 400,
"message": "新密码长度必须至少为8位",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 修改密码
user.set_password(new_password)
user.save()
# 更新token
user.auth_token.delete()
token, _ = Token.objects.get_or_create(user=user)
return Response({
"code": 200,
"message": "密码修改成功",
"data": {
"token": token.key
}
})
except Exception as e:
return Response({
"code": 500,
"message": f"密码修改失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
@permission_classes([AllowAny])
def user_register(request):
"""用户注册"""
try:
data = request.data
# 检查必填字段
required_fields = ['username', 'password', 'email', 'role', 'department', 'name']
for field in required_fields:
if not data.get(field):
return Response({
'error': f'缺少必填字段: {field}'
}, status=status.HTTP_400_BAD_REQUEST)
# 验证角色
valid_roles = ['admin', 'leader', 'member']
if data['role'] not in valid_roles:
return Response({
'error': f'无效的角色,必须是: {", ".join(valid_roles)}'
}, status=status.HTTP_400_BAD_REQUEST)
# 如果是组员,必须指定小组
if data['role'] == 'member' and not data.get('group'):
return Response({
'error': '组员必须指定所属小组'
}, status=status.HTTP_400_BAD_REQUEST)
# 检查用户名是否已存在
if User.objects.filter(username=data['username']).exists():
return Response({
'error': '用户名已存在'
}, status=status.HTTP_400_BAD_REQUEST)
# 检查邮箱是否已存在
if User.objects.filter(email=data['email']).exists():
return Response({
'error': '邮箱已被注册'
}, status=status.HTTP_400_BAD_REQUEST)
# 验证密码强度
if len(data['password']) < 8:
return Response({
'error': '密码长度必须至少为8位'
}, status=status.HTTP_400_BAD_REQUEST)
# 验证邮箱格式
try:
validate_email(data['email'])
except ValidationError:
return Response({
'error': '邮箱格式不正确'
}, status=status.HTTP_400_BAD_REQUEST)
# 创建用户
user = User.objects.create_user(
username=data['username'],
email=data['email'],
password=data['password'],
role=data['role'],
department=data['department'],
name=data['name'],
group=data.get('group') if data['role'] == 'member' else None,
is_staff=False,
is_superuser=False
)
# 生成认证令牌
token, _ = Token.objects.get_or_create(user=user)
return Response({
'message': '注册成功',
'data': {
'id': user.id,
'username': user.username,
'email': user.email,
'role': user.role,
'department': user.department,
'name': user.name,
'group': user.group,
'token': token.key,
'created_at': user.date_joined.strftime('%Y-%m-%d %H:%M:%S')
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
print(f"注册失败: {str(e)}")
print(f"错误类型: {type(e)}")
print(f"错误堆栈: {traceback.format_exc()}")
return Response({
'error': f'注册失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@csrf_exempt
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def verify_token(request):
"""验证令牌有效性"""
try:
return Response({
"code": 200,
"message": "令牌有效",
"data": {
"is_valid": True,
"user": {
"id": request.user.id,
"username": request.user.username,
"email": request.user.email,
"role": request.user.role,
"department": request.user.department,
"name": request.user.name,
"group": request.user.group
}
}
})
except Exception as e:
return Response({
"code": 500,
"message": f"验证失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def user_list(request):
"""获取用户列表"""
user = request.user
if user.role == 'admin':
users = User.objects.all()
elif user.role == 'leader':
users = User.objects.filter(department=user.department)
else:
users = User.objects.filter(id=user.id)
data = [{
'id': u.id,
'username': u.username,
'email': u.email,
'role': u.role,
'department': u.department,
'is_active': u.is_active,
'date_joined': u.date_joined
} for u in users]
return Response(data)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def user_detail(request, pk):
"""获取用户详情"""
try:
# 尝试转换为 UUID
if not isinstance(pk, uuid.UUID):
try:
pk = uuid.UUID(pk)
except ValueError:
return Response({
"code": 400,
"message": "无效的用户ID格式",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
user = get_object_or_404(User, pk=pk)
return Response({
"code": 200,
"message": "获取用户信息成功",
"data": {
"id": str(user.id),
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"department": user.department,
"group": user.group
}
})
except Exception as e:
return Response({
"code": 500,
"message": f"获取用户信息失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['PUT'])
@permission_classes([IsAdminUser])
def user_update(request, pk):
"""更新用户信息"""
try:
user = User.objects.get(pk=pk)
# 只允许更新特定字段
allowed_fields = ['email', 'role', 'department', 'is_active', 'phone']
for field in allowed_fields:
if field in request.data:
setattr(user, field, request.data[field])
user.save()
return Response({'message': '用户信息更新成功'})
except User.DoesNotExist:
return Response({'message': '用户不存在'}, status=404)
@api_view(['DELETE'])
@permission_classes([IsAdminUser])
def user_delete(request, pk):
"""删除用户"""
try:
user = User.objects.get(pk=pk)
user.delete()
return Response({'message': '用户删除成功'})
except User.DoesNotExist:
return Response({'message': '用户不存在'}, status=404)