operations_project/apps/knowledge_base/views.py

1270 lines
52 KiB
Python
Raw Permalink Normal View History

2025-05-07 18:01:48 +08:00
# apps/knowledge_base/views.py
import logging
import json
import traceback
from django.db.models import Q
from django.db import transaction
from django.utils import timezone
from django.http import Http404
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework.decorators import action
import requests
from apps.accounts.models import User
from apps.knowledge_base.models import KnowledgeBase, KnowledgeBaseDocument
from apps.permissions.models import KnowledgeBasePermission as KBPermissionModel
from apps.permissions.services.permission_service import KnowledgeBasePermissionMixin
from apps.knowledge_base.serializers import KnowledgeBaseSerializer, KnowledgeBaseDocumentSerializer
from apps.common.services.external_api_service import (
create_external_dataset, delete_external_dataset, call_split_api_multiple,
call_upload_api, call_delete_document_api, ExternalAPIError,
get_external_document_list, get_external_document_paragraphs,
call_delete_document_api
)
from daren_project import settings
logger = logging.getLogger(__name__)
class KnowledgeBaseViewSet(KnowledgeBasePermissionMixin, viewsets.ModelViewSet):
serializer_class = KnowledgeBaseSerializer
permission_classes = [IsAuthenticated]
def list(self, request, *args, **kwargs):
try:
queryset = self.get_queryset()
keyword = request.query_params.get('keyword', '')
if keyword:
query = Q(name__icontains=keyword) | Q(desc__icontains=keyword) | \
Q(department__icontains=keyword) | Q(group__icontains=keyword)
queryset = queryset.filter(query)
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
except ValueError:
page = 1
page_size = 10
total = queryset.count()
start = (page - 1) * page_size
end = start + page_size
paginated_queryset = queryset[start:end]
serializer = self.get_serializer(paginated_queryset, many=True)
data = serializer.data
user = request.user
for item in data:
kb_type = item['type']
department = item.get('department')
group = item.get('group')
creator_id = item.get('user_id')
kb_id = item['id']
explicit_permission = KBPermissionModel.objects.filter(
knowledge_base_id=kb_id,
user=user,
status='active'
).first()
if explicit_permission:
item['permissions'] = {
'can_read': explicit_permission.can_read,
'can_edit': explicit_permission.can_edit,
'can_delete': explicit_permission.can_delete
}
item['expires_at'] = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None
else:
item['permissions'] = {
'can_read': self._can_read(kb_type, user, department, group, creator_id, kb_id),
'can_edit': self._can_edit(kb_type, user, department, group, creator_id, kb_id),
'can_delete': self._can_delete(kb_type, user, department, group, creator_id, kb_id)
}
item['expires_at'] = None if kb_type == 'admin' else None
if keyword:
if 'name' in item and keyword.lower() in item['name'].lower():
item['highlighted_name'] = item['name'].replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
if 'desc' in item and item.get('desc') is not None:
desc_text = str(item['desc'])
if keyword.lower() in desc_text.lower():
item['highlighted_desc'] = desc_text.replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
return Response({
"code": 200,
"message": "获取知识库列表成功",
"data": {
"total": total,
"page": page,
"page_size": page_size,
"keyword": keyword if keyword else None,
"items": data
}
})
except Exception as e:
logger.error(f"获取知识库列表失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"获取知识库列表失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def get_queryset(self):
user = self.request.user
queryset = KnowledgeBase.objects.all()
permission_conditions = Q()
permission_conditions |= Q(type='admin')
permission_conditions |= Q(user_id=user.id)
active_permissions = KBPermissionModel.objects.filter(
user=user,
can_read=True,
status='active',
expires_at__gt=timezone.now()
).values_list('knowledge_base_id', flat=True)
if active_permissions:
permission_conditions |= Q(id__in=active_permissions)
if user.role == 'admin':
permission_conditions |= ~Q(type='private') | Q(user_id=user.id)
elif user.role == 'leader':
permission_conditions |= Q(type__in=['leader', 'member'], department=user.department)
elif user.role in ['member', 'user']:
permission_conditions |= Q(type='leader', department=user.department)
permission_conditions |= Q(type='member', department=user.department, group=user.group)
return queryset.filter(permission_conditions).distinct()
def create(self, request, *args, **kwargs):
try:
name = request.data.get('name')
if not name:
return Response({
'code': 400,
'message': '知识库名称不能为空',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
if KnowledgeBase.objects.filter(name=name).exists():
return Response({
'code': 400,
'message': f'知识库名称 "{name}" 已存在',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
user = request.user
type = request.data.get('type', 'private')
department = request.data.get('department')
group = request.data.get('group')
if type == 'admin':
department = None
group = None
elif type == 'secret':
if user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以创建保密级知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
department = None
group = None
elif type == 'leader':
if user.role != 'admin':
return Response({
'code': 403,
'message': '只有管理员可以创建组长级知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
if not department:
return Response({
'code': 400,
'message': '创建组长级知识库时必须指定部门',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
elif type == 'member':
if user.role not in ['admin', 'leader']:
return Response({
'code': 403,
'message': '只有管理员和组长可以创建成员级知识库',
'data': None
}, status=status.HTTP_403_FORBIDDEN)
if user.role == 'admin' and not department:
return Response({
'code': 400,
'message': '管理员创建成员知识库时必须指定部门',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
elif user.role == 'leader':
department = user.department
if not group:
return Response({
'code': 400,
'message': '创建成员知识库时必须指定组',
'data': None
}, status=status.HTTP_400_BAD_REQUEST)
elif type == 'private':
department = None
group = None
data = request.data.copy()
data['department'] = department
data['group'] = group
serializer = self.get_serializer(data=data, context={'request': request})
if not serializer.is_valid():
logger.error(f"数据验证失败: {serializer.errors}")
return Response({
'code': 400,
'message': '数据验证失败',
'data': serializer.errors
}, status=status.HTTP_400_BAD_REQUEST)
with transaction.atomic():
knowledge_base = serializer.save()
logger.info(f"知识库创建成功: id={knowledge_base.id}, name={knowledge_base.name}, user_id={knowledge_base.user_id}")
external_id = create_external_dataset(knowledge_base)
logger.info(f"外部知识库创建成功获取ID: {external_id}")
knowledge_base.external_id = external_id
knowledge_base.save()
logger.info(f"更新knowledge_base的external_id为: {external_id}")
KBPermissionModel.objects.create(
knowledge_base=knowledge_base,
user=request.user,
can_read=True,
can_edit=True,
can_delete=True,
granted_by=request.user,
status='active'
)
logger.info(f"创建者权限创建成功")
permissions = []
if type == 'admin':
users_query = User.objects.exclude(id=request.user.id)
permissions = [
KBPermissionModel(
knowledge_base=knowledge_base,
user=user,
can_read=True,
can_edit=True,
can_delete=True,
granted_by=request.user,
status='active'
) for user in users_query
]
elif type == 'secret':
users_query = User.objects.filter(role='admin').exclude(id=request.user.id)
permissions = [
KBPermissionModel(
knowledge_base=knowledge_base,
user=user,
can_read=True,
can_edit=self._can_edit(type, user),
can_delete=self._can_delete(type, user),
granted_by=request.user,
status='active'
) for user in users_query
]
elif type == 'leader':
users_query = User.objects.filter(
Q(role='admin') | Q(role='leader', department=department)
).exclude(id=request.user.id)
permissions = [
KBPermissionModel(
knowledge_base=knowledge_base,
user=user,
can_read=True,
can_edit=self._can_edit(type, user),
can_delete=self._can_delete(type, user),
granted_by=request.user,
status='active'
) for user in users_query
]
elif type == 'member':
users_query = User.objects.filter(
Q(role='admin') | Q(department=department, role='leader') |
Q(department=department, group=group, role='member')
).exclude(id=request.user.id)
permissions = [
KBPermissionModel(
knowledge_base=knowledge_base,
user=user,
can_read=True,
can_edit=self._can_edit(type, user),
can_delete=self._can_delete(type, user),
granted_by=request.user,
status='active'
) for user in users_query
]
if permissions:
KBPermissionModel.objects.bulk_create(permissions)
logger.info(f"{type}类型权限创建完成: {len(permissions)}条记录")
return Response({
'code': 200,
'message': '知识库创建成功',
'data': {
'knowledge_base': serializer.data,
'external_id': knowledge_base.external_id
}
})
except ExternalAPIError as e:
logger.error(f"外部知识库创建失败: {str(e)}")
return Response({
'code': 500,
'message': f'创建知识库失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
logger.error(f"创建知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'创建知识库失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def update(self, request, *args, **kwargs):
try:
instance = self.get_object()
user = request.user
if not self.check_knowledge_base_permission(instance, user, 'edit'):
return Response({
"code": 403,
"message": "没有编辑权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
with transaction.atomic():
serializer = self.get_serializer(instance, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
if instance.external_id:
try:
api_data = {
"name": serializer.validated_data.get('name', instance.name),
"desc": serializer.validated_data.get('desc', instance.desc),
"type": "0",
"meta": {},
"documents": []
}
response = requests.put(
f'{settings.API_BASE_URL}/api/dataset/{instance.external_id}',
json=api_data,
headers={'Content-Type': 'application/json'},
)
if response.status_code != 200:
raise ExternalAPIError(f"更新外部知识库失败,状态码: {response.status_code}, 响应: {response.text}")
api_response = response.json()
if not api_response.get('code') == 200:
raise ExternalAPIError(f"更新外部知识库失败: {api_response.get('message', '未知错误')}")
logger.info(f"外部知识库更新成功: {instance.external_id}")
except requests.exceptions.Timeout:
raise ExternalAPIError("请求超时,请稍后重试")
except requests.exceptions.RequestException as e:
raise ExternalAPIError(f"API请求失败: {str(e)}")
except Exception as e:
raise ExternalAPIError(f"更新外部知识库失败: {str(e)}")
return Response({
"code": 200,
"message": "知识库更新成功",
"data": serializer.data
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except ExternalAPIError as e:
logger.error(f"更新外部知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": str(e),
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
logger.error(f"更新知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"更新知识库失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def destroy(self, request, *args, **kwargs):
try:
instance = self.get_object()
user = request.user
if not self.check_knowledge_base_permission(instance, user, 'delete'):
return Response({
"code": 403,
"message": "没有删除权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
external_delete_success = True
external_error_message = None
if instance.external_id:
external_delete_success = delete_external_dataset(instance.external_id)
if not external_delete_success:
external_error_message = "外部知识库删除失败"
logger.warning(f"外部知识库删除失败,将继续删除本地知识库: {external_error_message}")
self.perform_destroy(instance)
logger.info(f"本地知识库删除成功: id={instance.id}, name={instance.name}")
if not external_delete_success:
return Response({
"code": 200,
"message": f"知识库已删除,但外部知识库删除失败: {external_error_message}",
"data": None
})
return Response({
"code": 200,
"message": "知识库删除成功",
"data": None
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"删除知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"删除知识库失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['get'])
def permissions(self, request, pk=None):
try:
instance = self.get_object()
user = request.user
permissions_data = {
"can_read": self.check_knowledge_base_permission(instance, user, 'read'),
"can_edit": self.check_knowledge_base_permission(instance, user, 'edit'),
"can_delete": self.check_knowledge_base_permission(instance, user, 'delete')
}
return Response({
"code": 200,
"message": "获取权限信息成功",
"data": {
"knowledge_base_id": instance.id,
"knowledge_base_name": instance.name,
"permissions": permissions_data
}
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"获取权限信息失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"获取权限信息失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def summary(self, request):
try:
user = request.user
queryset = KnowledgeBase.objects.exclude(type='secret')
summaries = []
for kb in queryset:
permissions = {
'can_read': self.check_knowledge_base_permission(kb, user, 'read'),
'can_edit': self.check_knowledge_base_permission(kb, user, 'edit'),
'can_delete': self.check_knowledge_base_permission(kb, user, 'delete')
}
explicit_permission = KBPermissionModel.objects.filter(
knowledge_base_id=kb.id,
user=user,
status='active'
).first()
expires_at = None
if explicit_permission:
expires_at = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None
elif kb.type == 'admin':
expires_at = None
summary = {
'id': str(kb.id),
'name': kb.name,
'desc': kb.desc,
'type': kb.type,
'department': kb.department,
'permissions': permissions,
'expires_at': expires_at
}
summaries.append(summary)
return Response({
'code': 200,
'message': '获取知识库概要信息成功',
'data': summaries
})
except Exception as e:
logger.error(f"获取知识库概要信息失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取知识库概要信息失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def retrieve(self, request, *args, **kwargs):
try:
instance = self.get_object()
serializer = self.get_serializer(instance)
data = serializer.data
user = request.user
data['permissions'] = {
'can_read': self.check_knowledge_base_permission(instance, user, 'read'),
'can_edit': self.check_knowledge_base_permission(instance, user, 'edit'),
'can_delete': self.check_knowledge_base_permission(instance, user, 'delete')
}
explicit_permission = KBPermissionModel.objects.filter(
knowledge_base_id=instance.id,
user=user,
status='active'
).first()
if explicit_permission:
data['expires_at'] = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None
else:
data['expires_at'] = None if instance.type == 'admin' else None
return Response({
'code': 200,
'message': '获取知识库详情成功',
'data': data
})
except Exception as e:
logger.error(f"获取知识库详情失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
'code': 500,
'message': f'获取知识库详情失败: {str(e)}',
'data': None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['get'])
def search(self, request):
try:
keyword = request.query_params.get('keyword', '')
if not keyword:
return Response({
"code": 400,
"message": "搜索关键字不能为空",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
try:
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
except ValueError:
page = 1
page_size = 10
query = Q(name__icontains=keyword) | Q(desc__icontains=keyword) | \
Q(department__icontains=keyword) | Q(group__icontains=keyword)
queryset = KnowledgeBase.objects.filter(query).exclude(type='secret')
user = request.user
active_permissions = KBPermissionModel.objects.filter(
user=user,
status='active',
expires_at__gt=timezone.now()
).select_related('knowledge_base')
permission_map = {
str(perm.knowledge_base.id): {
'can_read': perm.can_read,
'can_edit': perm.can_edit,
'can_delete': perm.can_delete
}
for perm in active_permissions
}
total = queryset.count()
start = (page - 1) * page_size
end = start + page_size
paginated_queryset = queryset[start:end]
serializer = self.get_serializer(paginated_queryset, many=True)
data = serializer.data
result_items = []
for item in data:
temp_kb = KnowledgeBase(
id=item['id'],
type=item['type'],
department=item.get('department'),
group=item.get('group'),
user_id=item.get('user_id')
)
explicit_permission = KBPermissionModel.objects.filter(
knowledge_base_id=item['id'],
user=user,
status='active'
).first()
if explicit_permission:
kb_permissions = {
'can_read': explicit_permission.can_read,
'can_edit': explicit_permission.can_edit,
'can_delete': explicit_permission.can_delete
}
item['expires_at'] = explicit_permission.expires_at.strftime("%Y-%m-%d %H:%M:%S") if explicit_permission.expires_at else None
else:
kb_permissions = {
'can_read': self.check_knowledge_base_permission(temp_kb, user, 'read'),
'can_edit': self.check_knowledge_base_permission(temp_kb, user, 'edit'),
'can_delete': self.check_knowledge_base_permission(temp_kb, user, 'delete')
}
item['expires_at'] = None if item['type'] == 'admin' else None
item['permissions'] = kb_permissions
if kb_permissions['can_read']:
result_items.append(item)
else:
summary_info = {
'id': item['id'],
'name': item['name'],
'type': item['type'],
'department': item.get('department'),
'permissions': kb_permissions
}
result_items.append(summary_info)
if 'name' in item and keyword.lower() in item['name'].lower():
item['highlighted_name'] = item['name'].replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
if 'desc' in item and item.get('desc') is not None:
desc_text = str(item['desc'])
if keyword.lower() in desc_text.lower():
item['highlighted_desc'] = desc_text.replace(
keyword, f'<em class="highlight">{keyword}</em>'
)
return Response({
"code": 200,
"message": "搜索知识库成功",
"data": {
"total": total,
"page": page,
"page_size": page_size,
"keyword": keyword,
"items": result_items
}
})
except Exception as e:
logger.error(f"搜索知识库失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"搜索知识库失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['post'])
def change_type(self, request, pk=None):
try:
instance = self.get_object()
user = request.user
if not self.check_knowledge_base_permission(instance, user, 'edit'):
return Response({
"code": 403,
"message": "没有修改权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
new_type = request.data.get('type')
if not new_type:
return Response({
"code": 400,
"message": "新类型不能为空",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
valid_types = ['private', 'admin', 'secret', 'leader', 'member']
if new_type not in valid_types:
return Response({
"code": 400,
"message": f"无效的知识库类型,可选值: {', '.join(valid_types)}",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
if new_type == 'leader' and not user.role == 'admin':
if new_type not in ['private', 'member']:
return Response({
"code": 403,
"message": "组长只能将知识库设置为private或member类型",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
department = request.data.get('department')
group = request.data.get('group')
if new_type == 'leader' and not user.role == 'admin':
if department and department != user.department:
return Response({
"code": 403,
"message": "组长只能为本部门设置知识库",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
department = user.department
if new_type == 'leader':
if not department:
return Response({
"code": 400,
"message": "组长级知识库必须指定部门",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
if new_type == 'member':
if not department:
return Response({
"code": 400,
"message": "成员级知识库必须指定部门",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
if not group:
return Response({
"code": 400,
"message": "成员级知识库必须指定组",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
if new_type in ['admin', 'secret']:
department = None
group = None
if new_type == 'private':
if department is None:
department = instance.department
if group is None:
group = instance.group
instance.type = new_type
instance.department = department
instance.group = group
instance.save()
return Response({
"code": 200,
"message": f"知识库类型已更新为{new_type}",
"data": {
"id": instance.id,
"name": instance.name,
"type": instance.type,
"department": instance.department,
"group": instance.group
}
})
except Http404:
return Response({
"code": 404,
"message": "知识库不存在",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"修改知识库类型失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"修改知识库类型失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['post'])
def upload_document(self, request, pk=None):
try:
instance = self.get_object()
user = request.user
if not self.check_knowledge_base_permission(instance, user, 'edit'):
return Response({
"code": 403,
"message": "没有编辑权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
logger.info(f"请求内容: {request.data}")
logger.info(f"请求FILES: {request.FILES}")
files = []
if 'files' in request.FILES:
files = request.FILES.getlist('files')
elif 'file' in request.FILES:
files = request.FILES.getlist('file')
elif any(key.startswith('files[') for key in request.FILES):
files = [file for key, file in request.FILES.items() if key.startswith('files[')]
elif any(key.startswith('file[') for key in request.FILES):
files = [file for key, file in request.FILES.items() if key.startswith('file[')]
elif len(request.FILES) > 0:
files = list(request.FILES.values())
if not files:
return Response({
"code": 400,
"message": "未找到上传文件,请确保表单字段名为'files''file'",
"data": {
"available_fields": list(request.FILES.keys())
}
}, status=status.HTTP_400_BAD_REQUEST)
logger.info(f"接收到 {len(files)} 个文件上传请求")
saved_documents = []
failed_documents = []
if not instance.external_id:
return Response({
"code": 400,
"message": "知识库没有有效的external_id请先创建知识库",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
try:
verify_url = f'{settings.API_BASE_URL}/api/dataset/{instance.external_id}'
verify_response = requests.get(verify_url)
if verify_response.status_code != 200:
logger.error(f"外部知识库不存在或无法访问: {instance.external_id}, 状态码: {verify_response.status_code}")
return Response({
"code": 404,
"message": f"外部知识库不存在或无法访问: {instance.external_id}",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
verify_data = verify_response.json()
if verify_data.get('code') != 200:
logger.error(f"验证外部知识库失败: {verify_data.get('message')}")
return Response({
"code": verify_data.get('code', 500),
"message": f"验证外部知识库失败: {verify_data.get('message', '未知错误')}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
logger.info(f"外部知识库验证成功: {instance.external_id}")
except Exception as e:
logger.error(f"验证外部知识库时出错: {str(e)}")
return Response({
"code": 500,
"message": f"验证外部知识库时出错: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# 批量处理所有文件
split_response = call_split_api_multiple(files)
if not split_response or split_response.get('code') != 200:
error_msg = f"文件分割失败: {split_response.get('message', '未知错误') if split_response else '请求失败'}"
logger.error(error_msg)
return Response({
"code": 400,
"message": error_msg,
"data": {
"uploaded_count": 0,
"failed_count": len(files),
"total_files": len(files),
"documents": [],
"failed_documents": [{"name": file.name, "error": error_msg} for file in files]
}
}, status=status.HTTP_400_BAD_REQUEST)
# 处理分割结果
documents_data = split_response.get('data', [])
if not documents_data:
logger.warning(f"批量分割API未返回文档数据")
return Response({
"code": 400,
"message": "文件分割未返回有效数据",
"data": {
"uploaded_count": 0,
"failed_count": len(files),
"total_files": len(files),
"documents": [],
"failed_documents": [{"name": file.name, "error": "分割未返回有效数据"} for file in files]
}
}, status=status.HTTP_400_BAD_REQUEST)
logger.info(f"成功分割出 {len(documents_data)} 个文档,准备上传")
# 处理每个文档
for doc in documents_data:
doc_name = doc.get('name', '未命名文档')
doc_content = doc.get('content', [])
logger.info(f"处理文档: {doc_name}, 包含 {len(doc_content)} 个段落")
if not doc_content:
doc_content = [{
'title': '文档内容',
'content': '文件内容无法自动分割,请检查文件格式。'
}]
doc_data = {
"name": doc_name,
"paragraphs": []
}
for paragraph in doc_content:
doc_data["paragraphs"].append({
"content": paragraph.get('content', ''),
"title": paragraph.get('title', ''),
"is_active": True,
"problem_list": []
})
upload_response = call_upload_api(instance.external_id, doc_data)
if upload_response and upload_response.get('code') == 200 and upload_response.get('data'):
document_id = upload_response['data']['id']
doc_record = KnowledgeBaseDocument.objects.create(
knowledge_base=instance,
document_id=document_id,
document_name=doc_name,
external_id=document_id,
uploader_name=user.name
)
saved_documents.append({
"id": str(doc_record.id),
"name": doc_record.document_name,
"external_id": doc_record.external_id
})
logger.info(f"文档 '{doc_name}' 上传成功ID: {document_id}")
else:
error_msg = upload_response.get('message', '未知错误') if upload_response else '上传API调用失败'
logger.error(f"文档 '{doc_name}' 上传失败: {error_msg}")
failed_documents.append({
"name": doc_name,
"error": error_msg
})
if saved_documents:
return Response({
"code": 200,
"message": f"文档上传完成,成功: {len(saved_documents)},失败: {len(failed_documents)}",
"data": {
"uploaded_count": len(saved_documents),
"failed_count": len(failed_documents),
"total_files": len(files),
"documents": saved_documents,
"failed_documents": failed_documents
}
})
else:
return Response({
"code": 400,
"message": f"所有文档上传失败",
"data": {
"uploaded_count": 0,
"failed_count": len(failed_documents),
"total_files": len(files),
"documents": [],
"failed_documents": failed_documents
}
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"文档上传失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"文档上传失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['get'])
def documents(self, request, pk=None):
"""获取知识库的文档列表"""
try:
instance = self.get_object()
user = request.user
# 权限检查
if not self.check_knowledge_base_permission(instance, user, 'read'):
return Response({
"code": 403,
"message": "没有查看权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 检查external_id是否存在
if not instance.external_id:
return Response({
"code": 400,
"message": "知识库没有有效的external_id",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 调用外部API获取文档列表
try:
external_documents = get_external_document_list(instance.external_id)
# 同步外部文档到本地数据库
for doc in external_documents:
external_id = doc.get('id')
doc_name = doc.get('name')
if external_id and doc_name:
kb_doc, created = KnowledgeBaseDocument.objects.update_or_create(
knowledge_base=instance,
external_id=external_id,
defaults={
'document_id': external_id,
'document_name': doc_name,
'status': 'active' if doc.get('is_active', True) else 'deleted'
}
)
if created:
logger.info(f"同步创建文档: {doc_name}, ID: {external_id}")
else:
logger.info(f"同步更新文档: {doc_name}, ID: {external_id}")
# 获取最新的本地文档数据
documents = KnowledgeBaseDocument.objects.filter(
knowledge_base=instance,
status='active'
).order_by('-create_time')
# 构建响应数据
documents_data = [{
"id": str(doc.id),
"document_id": doc.document_id,
"name": doc.document_name,
"external_id": doc.external_id,
"created_at": doc.create_time.strftime('%Y-%m-%d %H:%M:%S'),
"char_length": next((d.get('char_length', 0) for d in external_documents if d.get('id') == doc.external_id), 0),
"paragraph_count": next((d.get('paragraph_count', 0) for d in external_documents if d.get('id') == doc.external_id), 0),
"is_active": next((d.get('is_active', True) for d in external_documents if d.get('id') == doc.external_id), True),
"uploader_name": doc.uploader_name
} for doc in documents]
return Response({
"code": 200,
"message": "获取文档列表成功",
"data": documents_data
})
except ExternalAPIError as e:
logger.error(f"获取文档列表失败: {str(e)}")
return Response({
"code": 500,
"message": str(e),
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
logger.error(f"获取文档列表失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"获取文档列表失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['get'])
def document_content(self, request, pk=None):
"""获取文档内容 - 段落列表"""
try:
knowledge_base = self.get_object()
user = request.user
# 权限检查
if not self.check_knowledge_base_permission(knowledge_base, user, 'read'):
return Response({
"code": 403,
"message": "没有查看权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 获取文档ID
document_id = request.query_params.get('document_id')
if not document_id:
return Response({
"code": 400,
"message": "缺少document_id参数",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证文档存在
document = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
document_id=document_id,
status='active'
).first()
if not document:
return Response({
"code": 404,
"message": "文档不存在或已删除",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
# 调用外部API获取文档段落内容
try:
paragraphs = get_external_document_paragraphs(knowledge_base.external_id, document.external_id)
# 直接返回外部API的段落数据
return Response({
"code": 200,
"message": "获取文档内容成功",
"data": {
"document_id": document_id,
"name": document.document_name,
"paragraphs": paragraphs
}
})
except ExternalAPIError as e:
logger.error(f"获取文档段落内容失败: {str(e)}")
return Response({
"code": 500,
"message": str(e),
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
logger.error(f"获取文档内容失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"获取文档内容失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['delete'])
def delete_document(self, request, pk=None):
"""删除知识库文档"""
try:
knowledge_base = self.get_object()
user = request.user
# 权限检查
if not self.check_knowledge_base_permission(knowledge_base, user, 'edit'):
return Response({
"code": 403,
"message": "没有编辑权限",
"data": None
}, status=status.HTTP_403_FORBIDDEN)
# 获取文档ID
document_id = request.query_params.get('document_id')
if not document_id:
return Response({
"code": 400,
"message": "缺少document_id参数",
"data": None
}, status=status.HTTP_400_BAD_REQUEST)
# 验证文档存在
document = KnowledgeBaseDocument.objects.filter(
knowledge_base=knowledge_base,
document_id=document_id,
status='active'
).first()
if not document:
return Response({
"code": 404,
"message": "文档不存在或已删除",
"data": None
}, status=status.HTTP_404_NOT_FOUND)
# 调用外部API删除文档
external_id = document.external_id
delete_result = call_delete_document_api(knowledge_base.external_id, external_id)
# 无论外部API结果如何都更新本地状态
document.status = 'deleted'
document.save()
# 检查外部API结果
if delete_result.get('code') != 200:
logger.warning(f"外部API删除文档失败但本地标记已更新: {delete_result.get('message')}")
return Response({
"code": 200,
"message": "文档在系统中已标记为删除但外部API调用失败",
"data": {
"document_id": document_id,
"name": document.document_name,
"error": delete_result.get('message')
}
})
return Response({
"code": 200,
"message": "文档删除成功",
"data": {
"document_id": document_id,
"name": document.document_name
}
})
except Exception as e:
logger.error(f"删除文档失败: {str(e)}")
logger.error(traceback.format_exc())
return Response({
"code": 500,
"message": f"删除文档失败: {str(e)}",
"data": None
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)